2013-09-12 3 views
0

У меня есть три потока Продюсер, Процессор, Потребитель и все имеют блокирующие очереди для обмена данными между ними. Я хотел бы присоединиться к этой теме, и я использую будущее, что так выглядит код -Как выйти изящно с ExecutorService и Future

public class Test { 


    private static class Producer implements Runnable { 

     private final BlockingQueue<Integer> queue; 

     private Producer(BlockingQueue<Integer> queue) { 
      this.queue = checkNotNull(queue); 
     } 

     @Override public void run() { 
      try { 
       int i = 0; 
       while (++i < Integer.MAX_VALUE) { 
        addEntry(i); 
       } 
      } finally { 
       addEntry(-1); 
      } 
     } 

     private void addEntry(int i) { 
      try { 
       queue.put(i); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
      } 
     } 
    } 

    private static class Processor implements Runnable { 

     private final BlockingQueue<Integer> readQueue; 
     private final BlockingQueue<Integer> writeQueue; 

     private Processor(BlockingQueue<Integer> readQueue, BlockingQueue<Integer> writeQueue) { 
      this.readQueue = checkNotNull(readQueue); 
      this.writeQueue = checkNotNull(writeQueue); 
     } 

     @Override public void run() { 
      try { 
       int i = readQueue.take(); 
       while (i != -1) { 
        writeQueue.put(i); 
        i = readQueue.take(); 
        if(i==1000){ 
         throw new NullPointerException(); 
        } 
       } 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
      } finally { 
       addEntry(-1); 
      } 
     } 

     private void addEntry(int i) { 
      try { 
       writeQueue.put(i); 
      } catch (InterruptedException e) { 
       throw new RuntimeException(e); 
      } 
     } 
    } 

    private static class Consumer implements Runnable { 

     private final BlockingQueue<Integer> queue; 

     private Consumer(BlockingQueue<Integer> queue) { 
      this.queue = checkNotNull(queue); 
     } 

     @Override public void run() { 
      try { 
       int i = queue.take(); 
       while (i != -1) { 
        System.out.println(i); 
        i = queue.take(); 
       } 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
      } 

     } 
    } 

    public static void main(String[] args) { 
     BlockingQueue<Integer> readQueue = new ArrayBlockingQueue<>(1000); 
     BlockingQueue<Integer> writeQueue = new ArrayBlockingQueue<>(1000); 
     ExecutorService executorService = Executors.newFixedThreadPool(3); 
     Runnable[] runnables = new Runnable[]{new Producer(readQueue), new Processor(readQueue, writeQueue), new Consumer(writeQueue)}; 
     List<Future<?>> futures = Lists.newArrayList(); 
     for (Runnable runnable : runnables) { 
      futures.add(executorService.submit(runnable)); 
     } 
     executorService.shutdown(); 
     for (Future<?> future : futures) { 
      try { 
       future.get(); 
      } catch (InterruptedException e) { 
       executorService.shutdownNow(); 
       Thread.currentThread().interrupt(); 
      } catch(ExecutionException e){ 
       executorService.shutdownNow(); 
       throw new RuntimeException(e); 
      }finally{ 
       future.cancel(true); 
      } 
     } 
     System.out.println("Done.."); 
    } 
} 

Теперь, если Futute # Get() бросает исключение (NPE в процессоре) Я хотел бы, чтобы остановить все темы (Producer , Процессор, Потребитель) и изящно выйти.

Как я могу это достичь?

+1

Вы можете 'Shutdown()' 'в ExecutorService'. –

+0

'executorService.shutdownNow();' будет прерывать все запущенные потоки. Как * грациозно * вам нужно? – OldCurmudgeon

+0

Я вызываю shutdown() и shutdownnow() - проверьте код основного метода .. Вы имеете в виду что-то еще? – Premraj

ответ

0

Вы не можете заставить поток выйти. Вы можете только сигнализировать об этом и закодировать поток для выхода, когда он получает сигнал. Вы можете использовать прерывание потока как способ вывести потоки для выхода. Если вы хотите, чтобы потоки выполнялись при вызове executorService.shutdownNow(), поток должен выйти, когда он прерывается.

Например, в вашем Consumer:

@Override public void run() { 
     try { 
      int i = queue.take(); 
      while (i != -1) { 
       System.out.println(i); 
       i = queue.take(); 
      } 
     } catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
     } 

Когда поток прерывается, он просто вызывает interrupt() на себя снова. Это ничего не делает. Метод run должен вернуться, когда он получает InterruptedException:

@Override public void run() { 
     try { 
      int i = queue.take(); 
      while (i != -1) { 
       System.out.println(i); 
       i = queue.take(); 
      } 
     } catch (InterruptedException e) { 
      System.err.println("Consumer interrupted"); 
      return; 
     } 
+0

Это не важно в Процессор - я бросаю NPE и ловушку InterruptedException - так идеально, что поток должен выйти. Я беспокоюсь о других двух потоках - они должны выйти. – Premraj

Смежные вопросы