7

Я борюсь с лучшим способом реализации моего конвейера обработки.очереди производителей/потребителей

Мои производители подают работу на BlockingQueue. Со стороны потребителя я просматриваю очередь, обертываю то, что получаю в задаче Runnable, и отправляю ее в ExecutorService.

while (!isStopping()) 
{ 
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS); 
    if (work == null) 
    { 
     break; 
    } 
    executorService.execute(new Worker(work)); // needs to block if no threads! 
} 

Это не идеал; Конечно, у ExecutorService есть своя очередь, так что на самом деле происходит то, что я всегда полностью истощаю свою рабочую очередь и заполняю очередь задач, которая медленно исчезает по мере завершения задач.

Я понимаю, что я мог ставить задачи на стороне продюсера, но я бы действительно не хотел этого делать - мне нравится, что привязка/изоляция моей рабочей очереди является немой строкой; это действительно не какой-то бизнес продюсера, что с ними будет. Принуждение производителя к очереди Runnable или Callable прерывает абстракцию, IMHO.

Но я хочу, чтобы общая рабочая очередь представляла текущее состояние обработки. Я хочу, чтобы блокировать производителей, если потребители не поддерживают.

Я хотел бы использовать Исполнителей, но я чувствую, что я борюсь с их дизайном. Могу ли я частично выпить Kool-ade, или мне нужно его проглотить? Неужели я ошибаюсь в противостоянии задачам очередей? (Я подозреваю, что я мог бы установить ThreadPoolExecutor для использования очереди из 1 задачи и переопределить ее метод выполнения, чтобы блокировать, а не отклонять-на-очередь, но это грубо.)

Предложения?

ответ

14

Я хочу, чтобы общая очередь работы представляет текущую обработку состояния.

Попробуйте использовать общий BlockingQueue и создайте пул рабочих потоков, беря рабочие элементы за пределы очереди.

Я хочу, чтобы иметь возможность блокировать производителей, если потребители не ногу.

Оба ArrayBlockingQueue и поддержка LinkedBlockingQueue ограниченные очереди таким образом, что они будут блокировать на сайте при полном заполнении. Использование методов блокировки put() гарантирует, что производители блокируются, если очередь заполнена.

Вот приблизительный старт. Вы можете настроить количество рабочих и размер очереди:

public class WorkerTest<T> { 

    private final BlockingQueue<T> workQueue; 
    private final ExecutorService service; 

    public WorkerTest(int numWorkers, int workQueueSize) { 
     workQueue = new LinkedBlockingQueue<T>(workQueueSize); 
     service = Executors.newFixedThreadPool(numWorkers); 

     for (int i=0; i < numWorkers; i++) { 
      service.submit(new Worker<T>(workQueue)); 
     } 
    } 

    public void produce(T item) { 
     try { 
      workQueue.put(item); 
     } catch (InterruptedException ex) { 
      Thread.currentThread().interrupt(); 
     } 
    } 


    private static class Worker<T> implements Runnable { 
     private final BlockingQueue<T> workQueue; 

     public Worker(BlockingQueue<T> workQueue) { 
      this.workQueue = workQueue; 
     } 

     @Override 
     public void run() { 
      while (!Thread.currentThread().isInterrupted()) { 
       try { 
        T item = workQueue.take(); 
        // Process item 
       } catch (InterruptedException ex) { 
        Thread.currentThread().interrupt(); 
        break; 
       } 
      } 
     } 
    } 
} 
+0

Thanks; моя предыдущая реализация была очень похожа на это, хотя она просто использовала ThreadFactory - как только вы уменьшаете ее до фиксированного набора потоков, которые пытаются слить рабочую очередь, нет смысла использовать ExecutorService. Я переключился на ExecutorService, чтобы воспользоваться более настраиваемым пулом потоков, с семантикой «найти доступный существующий рабочий поток, если он существует, создать один, если необходимо, убить их, если они простаивают». –

+0

Executors.newCachedThreadPool() будет делать что-то похожее на это. Вы также можете настроить политику пула на самом ThreadPoolExecutor. Что вам нужно? – Kevin

+0

Это идея ... она может быть настроена именно так, как мне нравится, если бы я был готов использовать свою рабочую очередь задачи. То, что я действительно хочу, - это вырезать пул потоков от исполнителя и реализовать свой собственный клиент пула потоков, но для этого он не настроен. –

0

Вы можете иметь ваш потребитель выполнить Runnable::run непосредственно вместо того, чтобы начать новую нить вверх. Объедините это с блокирующей очередью с максимальным размером, и я думаю, что вы получите то, что хотите. Ваш потребитель становится работником, выполняющим встроенные задачи на основе рабочих элементов в очереди. Они будут только деактивировать объекты так быстро, как они обрабатывают их, чтобы ваш продюсер, когда ваши потребители перестали потреблять.

+0

Это даст только одному работнику, и я бы хотел настроить его для максимальной обработки для данной конфигурации системы. Обработка заданий будет длиться между долей секунды и десятками секунд и быть смешанной работой с привязкой к I/O-привязке и ЦП. –

+0

Я принимал несколько потребителей, связанных с одним или несколькими производителями. Если у вас есть только один потребитель, то почему бы не заставить производителя сбрасывать работу непосредственно в исполнитель? –

1

«найти существующий рабочий поток, если он существует, создать его, если необходимо, убить их, если они простаивают».

Управление всеми этими рабочими государствами так же ненужно, как и опасно.Я бы создал один поток монитора, который постоянно работает в фоновом режиме, и только задача состоит в том, чтобы заполнить очередь и вызвать пользователей ... почему бы не сделать рабочие нитки демонов, чтобы они умерли, как только они закончили? Если вы присоедините их все к одной ThreadGroup, вы можете динамически изменить размер пула ... например:

**for(int i=0; i<queue.size()&&ThreadGroup.activeCount()<UPPER_LIMIT;i++ { 
     spawnDaemonWorkers(queue.poll()); 
    }** 
Смежные вопросы