2015-01-15 5 views
0

Приложенное серверное приложение получает несколько запросов для задач, которые я хочу обработать, используя систему задач.Исполнитель: Подождите, пока не закончится выполнение определенных задач.

Каждая задача представлена ​​в виде Runnable, которая потребует n количество потоков из пула потоков, где n меньше или равно размеру пула потоков. Пул потоков, конечно, необходим для того, чтобы не перегружал процессор слишком большим количеством потоков.

Однако некоторые из этих задач могут быть многопоточными, а некоторые не могут. Поэтому может потребоваться, чтобы одна задача дождалась завершения всех ее конкретных потоков, чтобы объединить результаты этих потоков для конечного результата.

Если один использует несколько Thread экземпляров один может присоединиться к тем, как это:

try { 
    // Wait for all threads to finish their tasks 
    for (Thread thread : threads) { 
     thread.join(); 
    } 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} 

// Finish job here .. 

, но мне нужно что-то вроде этого, используя java.util.concurrent.Executor или что-нибудь подобное, что работает с пулом потоков.

+0

Я могу ошибаться, но он чувствует, как вы используете Исполнителю на неправильном уровне абстракции. Если у вас есть задача, для которой требуется выполнение потоков * n *, я предлагаю вам позволить этой задаче создать собственный пул потоков или разбить ее на более мелкие задачи с одним потоком. – aioobe

+0

@aioobe у вас может оказаться слишком много исполнителей/пулов, которые не эффективны. – bachr

+0

. Перед началом работы каждая задача может сделать someSharedSemaphore.acquire (n). – aioobe

ответ

1

Если я вас правильно понял, вам нужно что-то вроде этого (но ваша архитектура кажется слишком сложным):

class MyTask implements Runnable { 
    @Override 
    public void run() { 
     // some work 
    } 
} 

После этого:

ExecutorService executorService = Executors.newFixedThreadPool(2000); 
ArrayList<Future> futures = new ArrayList<>(); 

futures.add(executorService.submit(new MyTask())); 
futures.add(executorService.submit(new MyTask())); 
futures.add(executorService.submit(new MyTask())); 

for (Future future: futures) { 
    try { 
     future.get(100, TimeUnit.SECONDS); 
    } catch (Throwable cause) { 
     // process cause 
    } 
} 

Каждый future.get() будет ждать для завершения задачи (не более 100 секунд в этом примере).

+0

Я думаю, что задача должна внедрить 'Callable' не' Runnable' для будущего – bachr

+0

@arbi Да, если результат важен. – DmitryKanunnikoff

+0

Но если для некоторых задач требуется только 1 поток, то бессмысленно ждать их завершения до запуска следующего. – aioobe

1

Вы можете использовать ExecutorService вместе с CyclicBarrier для каждой задачи следующим образом:

public class ThreadedTask implements Runnable { 
    CyclicBarrier barrier; 
    public ThreadedTask(CyclicBarrier barrier) { 
     this.barrier = barrier; 
    } 
    @Override 
    public void run() { 
     // do something 
     barrier.await(); 
    } 
} 
ExecutorService executor = Executors.newFixedThreadPool(pool_size); 
... 
CyclicBarrier barrier = new CyclicBarrier(n+1); 
for(int i=0; i<n; i++) { 
    // launch all tasks 
    executor.submit(new ThreadedTask(barrier)); 
} 
// waits for the tasks to finish or timeout 
barrier.await(seconds, TimeUnit.SECONDS); 
+0

Это не похоже на то, что мне нужно. Я не хочу ждать всех потоков. Я хочу дождаться пакета потоков, которые выполнялись по конкретной задаче. Ваш ответ, кажется, ждет выполнения всех заданий. – displayname

+0

На самом деле, часть после ... - это тело задачи (я полагаю, в 'run()', поскольку ваши задачи реализуют «Runnable»). Это просто альтернатива, если вы хотите использовать мощную концепцию барьера. – bachr

+0

Хорошо, теперь я понял!Это также действительное решение для моей проблемы, но я пойду за другим. +1 в любом случае :) – displayname

Смежные вопросы