Я борюсь с лучшим способом реализации моего конвейера обработки.очереди производителей/потребителей
Мои производители подают работу на BlockingQueue. Со стороны потребителя я просматриваю очередь, обертываю то, что получаю в задаче Runnable, и отправляю ее в ExecutorService.
while (!isStopping())
{
String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (work == null)
{
break;
}
executorService.execute(new Worker(work)); // needs to block if no threads!
}
Это не идеал; Конечно, у ExecutorService есть своя очередь, так что на самом деле происходит то, что я всегда полностью истощаю свою рабочую очередь и заполняю очередь задач, которая медленно исчезает по мере завершения задач.
Я понимаю, что я мог ставить задачи на стороне продюсера, но я бы действительно не хотел этого делать - мне нравится, что привязка/изоляция моей рабочей очереди является немой строкой; это действительно не какой-то бизнес продюсера, что с ними будет. Принуждение производителя к очереди Runnable или Callable прерывает абстракцию, IMHO.
Но я хочу, чтобы общая рабочая очередь представляла текущее состояние обработки. Я хочу, чтобы блокировать производителей, если потребители не поддерживают.
Я хотел бы использовать Исполнителей, но я чувствую, что я борюсь с их дизайном. Могу ли я частично выпить Kool-ade, или мне нужно его проглотить? Неужели я ошибаюсь в противостоянии задачам очередей? (Я подозреваю, что я мог бы установить ThreadPoolExecutor для использования очереди из 1 задачи и переопределить ее метод выполнения, чтобы блокировать, а не отклонять-на-очередь, но это грубо.)
Предложения?
Thanks; моя предыдущая реализация была очень похожа на это, хотя она просто использовала ThreadFactory - как только вы уменьшаете ее до фиксированного набора потоков, которые пытаются слить рабочую очередь, нет смысла использовать ExecutorService. Я переключился на ExecutorService, чтобы воспользоваться более настраиваемым пулом потоков, с семантикой «найти доступный существующий рабочий поток, если он существует, создать один, если необходимо, убить их, если они простаивают». –
Executors.newCachedThreadPool() будет делать что-то похожее на это. Вы также можете настроить политику пула на самом ThreadPoolExecutor. Что вам нужно? – Kevin
Это идея ... она может быть настроена именно так, как мне нравится, если бы я был готов использовать свою рабочую очередь задачи. То, что я действительно хочу, - это вырезать пул потоков от исполнителя и реализовать свой собственный клиент пула потоков, но для этого он не настроен. –