2010-07-16 3 views
146

Что такое самый простой способ дождаться завершения всех задач ExecutorService? Моя задача в первую очередь вычисляется, поэтому я просто хочу запустить большое количество заданий - по одному на каждое ядро. Сейчас моя установка выглядит следующим образом:ExecutorService, как дождаться завершения всех задач

ExecutorService es = Executors.newFixedThreadPool(2); 
for (DataTable singleTable : uniquePhrases) { 
    es.execute(new ComputeDTask(singleTable)); 
} 
try{ 
    es.wait(); 
} 
catch (InterruptedException e){ 
    e.printStackTrace(); 
} 

ComputeDTask реализует работоспособной. Кажется, что выполнение задач выполняется правильно, но код падает на wait() с IllegalMonitorStateException. Это странно, потому что я играл с некоторыми примерами игрушек и, похоже, работал.

uniquePhrases содержит несколько десятков тысяч элементов. Должен ли я использовать другой метод? Я ищу что-то максимально простое

+1

Если вы хотите использовать wait (ответы говорят вам, что нет): вам всегда нужно синхронизировать объект (в данном случае 'es'), когда вы хотите его дождаться - блокировка будет автоматически выпущена при ожидании – mihi

+8

лучшим способом инициализации ThreadPool является «Исполнители».newFixedThreadPool (System.getRuntime(). availableProcessors()); ' –

+6

Runtime.getRuntime(). availableProcessors(); –

ответ

47

Если вы хотите дождаться завершения всех задач, используйте метод shutdown вместо wait. Затем следуйте за ним с помощью awaitTermination.

Кроме того, вы можете использовать Runtime.availableProcessors для получения количества аппаратных потоков, чтобы вы могли правильно инициализировать свой поток.

+18

shutdown() останавливает выполнение ExecutorService новых задач и закрывает простоя рабочие потоки. Не указано, ждать завершения завершения и выполнение в ThreadPoolExecutor не ждет. –

+0

@Alain - спасибо. Я должен был упомянуть об ожидании. Исправлена. –

+2

Что делать, если для завершения задачи необходимо запланировать дальнейшие задачи? Например, вы можете сделать многопоточный обход дерева, который отводит ветви к рабочим потокам. В этом случае, поскольку ExecutorService немедленно отключается, он не принимает никаких рекурсивно запланированных заданий. –

9

Если вы хотите дождаться завершения выполнения услуги исполнителя, позвоните по телефону shutdown(), а затем awaitTermination(units, unitType), например. awaitTermination(1, MINUTE). ExecutorService не блокирует на его собственном мониторе, так что вы не можете использовать wait и т.д.

+0

Я думаю, что это ждет. –

+0

@SB - Спасибо - я вижу, что моя память ошибочна! Я обновил имя и добавил ссылку, чтобы быть уверенным. – mdma

+0

Чтобы ждать «навсегда», используйте его как 'awaitTermination (Long.MAX_VALUE, TimeUnit.NANOSECONDS);' http://stackoverflow.com/a/1250655/32453 – rogerdpack

42

Если ждет всех задач в ExecutorService закончить это точно не ваша цель, а не ждет, пока конкретной партии задач, вы можете использовать CompletionService — специально, ExecutorCompletionService.

Идея заключается в том, чтобы создать ExecutorCompletionService оборачивать ваш Executor, submit некоторые известное число задач через CompletionService, затем сделать что одинаковое число результатов из очереди завершения с использованием либо take() (какие блоки) или poll() (который нет). После того, как вы нарисуете все ожидаемые результаты, соответствующие поставленным задачам, вы знаете, что все сделано.

Позвольте мне сказать это еще раз, потому что это не очевидно из интерфейса: вы должны знать, сколько вещей вы положили в CompletionService, чтобы узнать, сколько вещей попытаться извлечь. Это особенно важно с помощью метода take(): назовите его слишком много, и он заблокирует ваш вызывающий поток, пока какой-либо другой поток не отправит другое задание на тот же CompletionService.

В книге Java Concurrency in Practice есть some examples showing how to use CompletionService.

+0

Это хороший контрапункт к моему ответу - я бы сказал, что прямой ответ на вопрос invokeAll(); но @seh имеет право при отправке групп рабочих мест в ES и ожидании их завершения ... --JA – andersoj

+0

@ om-nom-nom, спасибо за обновление ссылок. Я рад видеть, что ответ по-прежнему полезен. – seh

+1

Хороший ответ, я не знал о «CompletionService» – Vic

6

Вы могли ждать, чтобы закончить работу на определенном интервале:

int maxSecondsPerComputeDTask = 20; 
try { 
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) { 
     // consider giving up with a 'break' statement under certain conditions 
    } 
} catch (InterruptedException e) { 
    throw new RuntimeException(e);  
} 

Или вы могли бы использовать ExecutorService. представить (Runnable) и собрать Future объекты, которые он возвращает и вызовите Get() на каждый, в свою очередь, чтобы ждать их, чтобы закончить.

ExecutorService es = Executors.newFixedThreadPool(2); 
Collection<Future<?>> futures = new LinkedList<<Future<?>>(); 
for (DataTable singleTable : uniquePhrases) { 
    futures.add(es.submit(new ComputeDTask(singleTable))); 
} 
for (Future<?> future : futures) { 
    try { 
     future.get(); 
    } catch (InterruptedException e) { 
     throw new RuntimeException(e); 
    } catch (ExecutionException e) { 
     throw new RuntimeException(e); 
    } 
} 

InterruptedException чрезвычайно важно обращаться правильно. Это то, что позволяет вам или пользователям вашей библиотеки безопасно завершить длительный процесс.

178

Самый простой способ - использовать ExecutorService.invokeAll(), который делает то, что вы хотите, в однострочном пространстве. На вашем языке вам нужно будет изменить или обернуть ComputeDTask, чтобы реализовать Callable<>, что может дать вам довольно большую гибкость. Вероятно, в вашем приложении есть значимая реализация Callable.call(), но вот способ ее обернуть, если не использовать Executors.callable().

ExecutorService es = Executors.newFixedThreadPool(2); 
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size()); 

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
} 

List<Future<Object>> answers = es.invokeAll(todo); 

Как уже отмечалось, можно использовать версию таймаута invokeAll(), если это необходимо. В этом примере answers будет содержать пучок Future s, который будет возвращать нули (см. Определение Executors.callable(). Возможно, что вы хотите сделать, это небольшой рефакторинг, чтобы вы могли получить полезный ответ или ссылку на базовые ComputeDTask , но я не могу сказать, из вашего примера.

Если не ясно, обратите внимание, что invokeAll() не вернется, пока все задачи не будут выполнены. (т.е. все Future s в вашем answers коллекции сообщит .isDone(), если). Это позволяет избежать всего ручного отключения, ожидания и т. д. ... и позволяет при необходимости повторно использовать этот ExecutorService для нескольких циклов.

Есть несколько связанных вопросов на SO:

Ни один из этих вопросов не является строгим для вашего вопроса, но они дают немного цвета о том, как люди думают, что должно использоваться Executor/ExecutorService.

+0

Downvoter: Любопытно, что вы считаете неправильным. – andersoj

+5

Это прекрасно, если вы добавляете все свои задания в пакетном режиме, и вы входите в список Callables, но это не сработает, если вы вызываете ExecutorService.submit() в ситуации обратного вызова или цикла событий. – Desty

+2

Я думаю, стоит упомянуть, что shutdown() еще нужно вызвать, когда ExecutorService больше не нужен, иначе потоки никогда не будут завершены (за исключением случаев, когда corePoolSize = 0 или allowCoreThreadTimeOut = true). – John29

3

У меня также есть ситуация, когда у меня есть набор документов для сканирования. Я начинаю с начального «семенного» документа, который должен быть обработан, этот документ содержит ссылки на другие документы, которые также должны быть обработаны, и так далее.

В моей основной программе я просто хочу написать что-то вроде следующего, где Crawler управляет связкой потоков.

Crawler c = new Crawler(); 
c.schedule(seedDocument); 
c.waitUntilCompletion() 

Такая же ситуация произошла бы, если бы я захотел перемещаться по дереву; я бы попал в корневой узел, процессор для каждого узла добавлял бы детей в очередь по мере необходимости, а группа потоков обрабатывала бы все узлы в дереве, пока их не было больше.

Я не мог найти ничего в JVM, который, как я думал, был немного удивителен. Поэтому я написал класс AutoStopThreadPool, который можно использовать непосредственно или подкласс для добавления методов, подходящих для домена, например. schedule(Document). Надеюсь, поможет!

AutoStopThreadPool Javadoc | Download

1

Вы можете использовать ThreadPoolExecutor с размером пула установлен в Runtime.getRuntime().availableProcessors() и .execute() все это задачи, которые вы хотите выполнить, а затем вызвать tpe.shutdown(), а затем ждать в while(!tpe.terminated()) { /* waiting for all tasks to complete */}, какие блоки для всех представленных задач для завершения. , где tpe является ссылкой на ваш пример ThreadPoolExecutor.

Или, если более целесообразно использовать ExecutorCompletionService A CompletionService, который использует предоставленный Исполнителем для выполнения задач. Этот класс организует отправку заданий по завершении, которые размещаются в очереди, доступной с помощью take. Класс является достаточно легким, чтобы быть пригодным для временного использования при обработке групп задач.

ПРИМЕЧАНИЕ: Это решение не блокируется, как ExecutorService.invokeAll() делает, ожидая завершения всех заданий. Так что в то время как простейший он также является самым ограниченным, так как он блокирует основной поток приложения. Нехорошо, если вы хотите получить статус о том, что происходит, или делать другие вещи во время выполнения этих заданий, например, после обработки результатов или создания и инкрементный агрегированный результат.

+1

Не во время (! Tpe.isTerminated()) называется busywait и потенциально очень интенсивно работает в CPU? – jontejj

1

Простой альтернативой этому является использование потоков вместе с соединением. Refer: Joining Threads

+2

ExecutorServices упрощает работу –

0

Я просто подожду, когда исполнитель закончит с указанным таймаутом, который, по вашему мнению, подходит для выполнения задач.

try { 
     //do stuff here 
     exe.execute(thread); 
    } finally { 
     exe.shutdown(); 
    } 
    boolean result = exe.awaitTermination(4, TimeUnit.HOURS); 
    if (!result) 

    { 
     LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour."); 
    } 
2

Добавить все темы в коллекции и отправить его с помощью invokeAll. Если вы можете использовать метод invokeAllExecutorService, JVM не перейдет к следующей строке, пока не будут выполнены все потоки.

Здесь есть хороший пример: invokeAll via ExecutorService

5

Просто используйте

latch = new CountDownLatch(noThreads) 

В каждом потоке

latch.countDown(); 

и в качестве барьера

latch.await(); 
0

Похоже, вам нужно ForkJoinPool и использовать глобальный пул для выполнения задач.

public static void main(String[] args) { 
    // the default `commonPool` should be sufficient for many cases. 
    ForkJoinPool pool = ForkJoinPool.commonPool(); 
    // The root of your task that may spawn other tasks. 
    // Make sure it submits the additional tasks to the same executor that it is in. 
    Runnable rootTask = new YourTask(pool); 
    pool.execute(rootTask); 
    pool.awaitQuiescence(...); 
    // that's it. 
} 

Красота в pool.awaitQuiescence, где этот метод будет блокировать использовать нить вызывающего абонента для выполнения своих задач, а затем вернуться, когда это действительно пуст.

3

Корневая причина IllegalMonitorStateException:

Брошенный, чтобы указать, что поток пытается ждать на мониторе объекта или уведомить другие потоки, ожидающие на мониторе объекта, не имея указанного монитора.

Из вашего кода вы только что вызвали wait() в ExecutorService, не имея блокировки.

Ниже кода исправит IllegalMonitorStateException

try 
{ 
    synchronized(es){ 
     es.wait(); // Add some condition before you call wait() 
    } 
} 

Выполните один из ниже подходов ждать завершения всех задач, которые были представлены в ExecutorService.

  1. перебрать все Future задач из submit на ExecutorService и проверить состояние с блокировкой вызова get() на Future объекта

  2. Использование invokeAll на ExecutorService

  3. Использование CountDownLatch

  4. Использование ForkJoinPool о г newWorkStealingPool из Executors (с Java 8)

  5. Прекращения работы бассейна, как это рекомендовано в оракуле документации page

    void shutdownAndAwaitTermination(ExecutorService pool) { 
        pool.shutdown(); // Disable new tasks from being submitted 
        try { 
        // Wait a while for existing tasks to terminate 
        if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { 
         pool.shutdownNow(); // Cancel currently executing tasks 
         // Wait a while for tasks to respond to being cancelled 
         if (!pool.awaitTermination(60, TimeUnit.SECONDS)) 
         System.err.println("Pool did not terminate"); 
        } 
    } catch (InterruptedException ie) { 
        // (Re-)Cancel if current thread also interrupted 
        pool.shutdownNow(); 
        // Preserve interrupt status 
        Thread.currentThread().interrupt(); 
    } 
    

    Если вы хотите корректно ждать все задачи для завершения, когда вы используете опцию 5 вместо опций от 1 до 4, изменить

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { 
    

    к

    while(condition), который проверяет каждую минуту.

1

Представьте свои задачи в Runner, а затем ждать вызова метода waitTillDone() так:

Runner runner = Runner.runner(2); 

for (DataTable singleTable : uniquePhrases) { 

    runner.run(new ComputeDTask(singleTable)); 
} 

// blocks until all tasks are finished (or failed) 
runner.waitTillDone(); 

runner.shutdown(); 

Чтобы использовать его добавить Gradle/Maven зависимость: 'com.github.matejtymes:javafixes:1.0'

Для получения дополнительной информации смотрите здесь: https://github.com/MatejTymes/JavaFixes или здесь: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

-1

Извините за поздний ответ, но вы можете использовать следующее, чтобы заставить threadPoolTaskExecutor ждать, пока все активные и ожидающие задачи будут обработаны.

ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) AppContext.getBean("threadPoolTaskExecutor"); 

// Код здесь, чтобы выполнить работоспособный пример:

while(iterator.hasNext()) 
{ 
threadPoolTaskExecutor.execute(new Runnable() ...); 
} 

    while(threadPoolTaskExecutor.getActiveCount() > 0) 
    { 
     //Hold until finish 
    } 

    threadPoolTaskExecutor.shutdown(); 

Убедитесь, что ThreadPoolTaskExecutor боб определяется с размахом = "Прототипом". Важно!

+0

Это занятое ожидание. Всегда избегайте ждать завершения задачи с помощью цикла. – mneri

Смежные вопросы