2014-01-13 3 views
2

У нас есть код на Java с помощью ThreadPoolExecutor и CompletionService. Задачи подаются большими партиями в пул; результаты идут на службу завершения, где мы собираем завершенные задачи, когда доступны, не дожидаясь всей партии для завершения:сбор результатов асинхронно от gpars parallel executor

ThreadPoolExecutor _executorService = 
      new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, new LinkedBlockingQueue(20)); 
CompletionService _completionService = 
      new ExecutorCompletionService<Callable>(_executorService) 

//submit tasks 
_completionService.submit(some task); 

//get results 
while(...){ 
    Future result = _completionService.poll(timeout); 
    if(result) 
     //process result 
} 

Общее число работников в бассейне MAX_NUMBER_OF_WORKERS; задачи, представленные без доступного рабочего, помещаются в очередь; до 20 задач могут быть поставлены в очередь, после чего задачи отклонены.

Что такое Gpars Аналогию этому подходу?

Чтение в documentation на gpars параллелизм, я нашел много возможных вариантов: collectManyParallel(), anyParallel(), fork/join и т.д., и я не уверен, какие из них даже тест. Я надеялся найти упоминание о «завершении» или «завершении службы» в качестве сравнения в документах, но ничего не нашел. Я ищу некоторые направления/указатели, где начинать с тех, кто сталкивается с gpars.

ответ

1

Сбор результатов на лету, дросселирование производителей - это требует решения для потока данных. Пожалуйста, найдите пробную версию демонстрационного примера ниже:

import groovyx.gpars.dataflow.DataflowQueue 
import groovyx.gpars.group.DefaultPGroup 
import groovyx.gpars.scheduler.DefaultPool 

import java.util.concurrent.LinkedBlockingQueue 
import java.util.concurrent.ThreadPoolExecutor 
import java.util.concurrent.TimeUnit 

int MAX_NUMBER_OF_WORKERS = 10 

ThreadPoolExecutor _executorService = 
     new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, MAX_NUMBER_OF_WORKERS, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(200)); 

final group = new DefaultPGroup(new DefaultPool(_executorService)) 
final results = new DataflowQueue() 

//submit tasks 
30.times {value -> 
    group.task(new Runnable() { 
     @Override 
     void run() { 
      println 'Starting ' + Thread.currentThread() 
      sleep 5000 
      println 'Finished ' + Thread.currentThread() 
      results.bind(value) 
     } 
    }); 
} 
group.task { 
    results << -1 //stop the consumer eventually 
} 

//get results 
while (true) { 
    def result = results.val 
    println result 
    if (result == -1) break 
    //process result 
} 

group.shutdown() 
+0

выглядит очень интересно, я проверю это, – raffian

Смежные вопросы