2012-04-03 3 views
5

Я хочу использовать CompletionService для обработки результатов из серии потоков по мере их заполнения. У меня есть сервис в цикле, чтобы использовать объекты Future, которые он предоставляет по мере их появления, но я не знаю, как лучше всего определить, когда все потоки завершены (и, следовательно, выйти из цикла):Как узнать, когда CompletionService закончен, доставляя результаты?

import java.util.concurrent.Callable; 
import java.util.concurrent.CompletionService; 
import java.util.concurrent.ExecutorCompletionService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.ThreadPoolExecutor; 

public class Bar { 

    final static int MAX_THREADS = 4; 
    final static int TOTAL_THREADS = 20; 

    public static void main(String[] args) throws Exception{ 

     final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS); 
     final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool); 

     for (int i=0; i<TOTAL_THREADS; i++){ 
      service.submit(new MyCallable(i)); 
     } 

     int finished = 0; 
     Future<Integer> future = null; 
     do{ 
      future = service.take(); 
      int result = future.get(); 
      System.out.println(" took: " + result); 
      finished++;    

     }while(finished < TOTAL_THREADS); 

     System.out.println("Shutting down"); 
     threadPool.shutdown(); 
    } 


    public static class MyCallable implements Callable<Integer>{ 

     final int id; 

     public MyCallable(int id){ 
      this.id = id; 
      System.out.println("Submitting: " + id); 
     } 

     @Override 
     public Integer call() throws Exception { 
      Thread.sleep(1000); 
      System.out.println("finished: " + id); 
      return id; 
     } 
    } 
} 

Я попытался проверить состояние ThreadPoolExecutor, но я знаю, что методы getCompletedTaskCount и getTaskCount являются приблизительными и не следует полагаться. Есть ли лучший способ убедиться, что я получил все Фьючерсы от CompletionService, чем считать их самостоятельно?


Edit: Как связующее звено Nobeh при условии, и this link предполагают, что подсчет числа задач, представленных, то вызов взять(), что во много раз, это путь. Я просто удивлен, что нет способа спросить у CompletionService или его Executor, что осталось вернуть.

ответ

3

Ответ на эти вопросы дает вам ответ?

  • Выполняют ли ваши асинхронные задачи другие задачи, представленные в CompletionService?
  • Является ли service единственным объектом, который должен обрабатывать задачи, созданные в вашем приложении?

на основе reference documentation, CompletionService действует на подходе потребителя/производителя и имеет преимущество внутреннего Executor. Итак, до тех пор, пока вы выполняете задания в одном месте и потребляете их в другом месте, CompletionService.take() будет обозначать, есть ли еще какие-либо результаты для выдачи.

Я считаю, что this question также поможет вам.

+0

Thanks, nobeh. Похоже, что они также просто перебирают количество потоков в своем цикле for (int tasksHandled = 0; tasksHandled

+1

Пример в API использует тот же подход к выполнению take() n раз подряд. http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorCompletionService.html –

+0

Если результаты ExecutionService в другом потоке, кроме задач, были отправлены в Exector, является ли этот поток безопасным? – raffian

3

См. http://www.javaspecialists.eu/archive/Issue214.html за достойное предложение о том, как продлить службу-исполнитель, чтобы выполнить то, что вы ищете. Для вашего удобства я вставил соответствующий код ниже. Автор также предлагает сделать сервис Iterable, который, я думаю, будет хорошей идеей.

FWIW, я согласен с вами в том, что это действительно должно быть частью стандартной реализации, но, увы, это не так.

import java.util.concurrent.*; 
import java.util.concurrent.atomic.*; 

public class CountingCompletionService<V> extends ExecutorCompletionService<V> { 
    private final AtomicLong submittedTasks = new AtomicLong(); 
    private final AtomicLong completedTasks = new AtomicLong(); 

    public CountingCompletionService(Executor executor) { 
    super(executor); 
    } 

    public CountingCompletionService(
     Executor executor, BlockingQueue<Future<V>> queue) { 
    super(executor, queue); 
    } 

    public Future<V> submit(Callable<V> task) { 
    Future<V> future = super.submit(task); 
    submittedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> submit(Runnable task, V result) { 
    Future<V> future = super.submit(task, result); 
    submittedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> take() throws InterruptedException { 
    Future<V> future = super.take(); 
    completedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> poll() { 
    Future<V> future = super.poll(); 
    if (future != null) completedTasks.incrementAndGet(); 
    return future; 
    } 

    public Future<V> poll(long timeout, TimeUnit unit) 
     throws InterruptedException { 
    Future<V> future = super.poll(timeout, unit); 
    if (future != null) completedTasks.incrementAndGet(); 
    return future; 
    } 

    public long getNumberOfCompletedTasks() { 
    return completedTasks.get(); 
    } 

    public long getNumberOfSubmittedTasks() { 
    return submittedTasks.get(); 
    } 

    public boolean hasUncompletedTasks() { 
    return completedTasks.get() < submittedTasks.get(); 
    } 
} 
Смежные вопросы