У меня есть три потока Продюсер, Процессор, Потребитель и все имеют блокирующие очереди для обмена данными между ними. Я хотел бы присоединиться к этой теме, и я использую будущее, что так выглядит код -Как выйти изящно с ExecutorService и Future
public class Test {
private static class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
private Producer(BlockingQueue<Integer> queue) {
this.queue = checkNotNull(queue);
}
@Override public void run() {
try {
int i = 0;
while (++i < Integer.MAX_VALUE) {
addEntry(i);
}
} finally {
addEntry(-1);
}
}
private void addEntry(int i) {
try {
queue.put(i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private static class Processor implements Runnable {
private final BlockingQueue<Integer> readQueue;
private final BlockingQueue<Integer> writeQueue;
private Processor(BlockingQueue<Integer> readQueue, BlockingQueue<Integer> writeQueue) {
this.readQueue = checkNotNull(readQueue);
this.writeQueue = checkNotNull(writeQueue);
}
@Override public void run() {
try {
int i = readQueue.take();
while (i != -1) {
writeQueue.put(i);
i = readQueue.take();
if(i==1000){
throw new NullPointerException();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
addEntry(-1);
}
}
private void addEntry(int i) {
try {
writeQueue.put(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private static class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
private Consumer(BlockingQueue<Integer> queue) {
this.queue = checkNotNull(queue);
}
@Override public void run() {
try {
int i = queue.take();
while (i != -1) {
System.out.println(i);
i = queue.take();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
BlockingQueue<Integer> readQueue = new ArrayBlockingQueue<>(1000);
BlockingQueue<Integer> writeQueue = new ArrayBlockingQueue<>(1000);
ExecutorService executorService = Executors.newFixedThreadPool(3);
Runnable[] runnables = new Runnable[]{new Producer(readQueue), new Processor(readQueue, writeQueue), new Consumer(writeQueue)};
List<Future<?>> futures = Lists.newArrayList();
for (Runnable runnable : runnables) {
futures.add(executorService.submit(runnable));
}
executorService.shutdown();
for (Future<?> future : futures) {
try {
future.get();
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
} catch(ExecutionException e){
executorService.shutdownNow();
throw new RuntimeException(e);
}finally{
future.cancel(true);
}
}
System.out.println("Done..");
}
}
Теперь, если Futute # Get() бросает исключение (NPE в процессоре) Я хотел бы, чтобы остановить все темы (Producer , Процессор, Потребитель) и изящно выйти.
Как я могу это достичь?
Вы можете 'Shutdown()' 'в ExecutorService'. –
'executorService.shutdownNow();' будет прерывать все запущенные потоки. Как * грациозно * вам нужно? – OldCurmudgeon
Я вызываю shutdown() и shutdownnow() - проверьте код основного метода .. Вы имеете в виду что-то еще? – Premraj