2009-07-22 3 views
11

У меня проблема, которую я считаю классическим шаблоном мастера/работника, и я ищу совета по реализации. Вот что я сейчас думаю о проблеме:Шаблоны/принципы для потокобезопасных очередей и программы «мастер/работник» в Java

Существует глобальная «очередь», и это центральное место, где хранится «работа, которая должна быть выполнена». Предположительно, эта очередь будет управляться каким-то «главным» объектом. Темы будут созданы, чтобы найти работу, и когда они найдут работу, они расскажут мастеру (что бы это ни было) «добавить это в очередь выполняемой работы».

Мастер, возможно, с интервалом, будет порождать другие потоки, которые фактически выполняют выполняемую работу. Как только поток завершит свою работу, я хочу, чтобы он уведомил мастера о завершении работы. Затем мастер может удалить эту работу из очереди.

В прошлом я проделал большое количество программирования потоков на Java, но все это было до JDK 1.5, и поэтому я не знаком с соответствующими новыми API-интерфейсами для обработки этого случая. Я понимаю, что JDK7 будет иметь fork-join, и это может быть решением для меня, но я не могу использовать продукт раннего доступа в этом проекте.

Проблемы, как я их вижу, является:

1) как есть «тема, делая работу» общаться обратно к мастеру, говоря им, что их работа будет завершена, и что хозяин теперь может удалить работа из очереди

2) как эффективно иметь гарантию мастера, что работа запланирована только один раз. Например, предположим, что эта очередь имеет миллион элементов, и она хочет сказать работнику «пойти на эти 100 вещей». Каков наиболее эффективный способ гарантировать, что, когда он планирует работу для следующего работника, он получает «следующие 100 вещей», а не «100 вещей, которые я уже запланировал»?

3) выбор соответствующей структуры данных для очереди. Мое мышление здесь состоит в том, что «работа над поисками нитей» потенциально могла бы найти ту же работу, что и несколько раз, и они отправили сообщение хозяину, сказав «вот работа», и мастер поймет, что в работе есть уже запланированы и, следовательно, должны игнорировать сообщение. Я хочу, чтобы я выбрал нужную структуру данных, чтобы эти вычисления были как можно дешевле.

Традиционно я бы сделал это в базе данных, в виде машины с конечным автоматом, работая «задачи» от начала до конца. Однако в этой проблеме я не хочу использовать базу данных из-за большого объема и волатильности очереди. Кроме того, я хотел бы сохранить это как можно более легким. Я не хочу использовать любой сервер приложений, если этого можно избежать.

Весьма вероятно, что эта проблема, которую я описываю, является общей проблемой с известным именем и принятым набором решений, но я, с моей низкой степенью не-CS, не знаю, что это называется (т.е. будьте осторожны).

Спасибо за любые указатели.

+0

вы также можете посмотреть на http://lambda-the-ultimate.org/node/3521 «Ядро вилки/объединение» –

ответ

7

Насколько я понимаю ваши требования, вам необходимо ExecutorService. ExecutorService имеют

submit(Callable task) 

метод, который возвращается значение Future. Будущее - это блокирующий способ общения от рабочего к хозяину. Вы можете легко расширить этот механизм, чтобы работать асинхронно. И да, ExecutorService также поддерживает рабочую очередь, такую ​​как ThreadPoolExecutor. Поэтому в большинстве случаев вам не нужно беспокоиться о планировании. Пакет java.util.concurrent уже имеет эффективные реализации потоковой безопасности (ConcurrentLinked queue - неблокирование и блокировка LinkedBlockedQueue).

+0

Чтобы добавить к тому, что предлагает @dotsid, я хотел бы указать, что эта стандартная библиотека выполняет много, если не все, OP, и она проста в использовании, и она работает. Вы можете масштабировать до 100 или тысяч задач без особых усилий. –

+0

Спасибо всем за продуманные ответы. Я не уверен, является ли это «каноническим» ответом, но, в конце концов, после прочтения книги Гетца, то, что у меня получилось, очень похоже на этот ответ. –

4

Отъезд java.util.concurrent в библиотеке Java.

В зависимости от вашего приложения это может быть так же просто, как собрать некоторые блокирующие очереди и ThreadPoolExecutor.

Кроме того, полезной может быть книгаот Brian Goetz.

4

Во-первых, почему вы хотите держать предметы после того, как работник начал их делать?Как правило, у вас будет очередь работы, и рабочий берет элементы из этой очереди. Это также решило бы «как я могу помешать работникам получить тот же товар»?

на вопросы:

1) как есть «тема, делающая работы» общаться обратно к мастеру говоря им, что их работа завершена и что мастер теперь может удалить работы из очереди

Мастер мог слушать рабочих, используя listener/observer pattern

2) как эффективно иметь мастер гарантия эта работа только только планируется один раз. Например, допустим, эта очередь имеет миллион элементов, и она хочет сказать работнику, чтобы «пойти на эти 100 вещей». Что самое эффективное способ гарантировать, что когда он планирует работать следующему работнику, он получает «следующие 100 вещей», а не «100 вещей, которые у меня уже есть запланировано»?

См. Выше. Я бы позволил рабочим вытащить предметы из очереди.

3) выбор соответствующих данных структура для очереди. Мое мышление вот что «работа по поиску нитей делать» могла бы найти то же самое работать, чтобы сделать больше одного раза, и они бы отправить сообщение хозяину, сказав «вот работа», и мастер понять, что работа уже была запланирована и поэтому должна игнорировать сообщение. Я хочу обеспечить , что я выбираю нужную структуру данных так, чтобы этот расчет был как можно дешевле .

Есть Реализации в blocking queue поскольку Java 5

+0

Спасибо всем за ответ. Тим, к вашему первому вопросу, который является хорошим: я считаю, что мне нужно держать элементы в очереди, потому что «рабочие потоки, выходящие и находящиеся на работе», должны знать, какая работа уже запланирована. Для конкретного примера представьте себе программу, которая должна выйти и найти «старые файлы для перемещения». Темы найдут их, добавят в очередь. Но при последующих запусках, если эти файлы еще не были перемещены, потоки «finder» найдут те же файлы. Имеют смысл? Более подходящие способы решения этой проблемы? Еще раз спасибо. –

+0

Может быть, вам не нужно беспокоиться об этом. Существует одно хорошее качество асинхронных систем - идемпотентность. Система должна быть защищена от обработки двойных сообщений (говорящая в математике f (x) должна быть равна f (f (x)), поэтому состояние системы не изменяется, если одно сообщение обрабатывается дважды). Ваш пример - хороший пример идемпотенции в системе. Мы могли передавать сообщение об одном конкретном файле дважды работнику, и ничего плохого не происходит. Если файл уже перемещен, мы просто пропускаем задачу. –

+0

Вы можете определить рабочую очередь, а рядом с ней - рабочий список. Когда рабочий поток берет элемент из очереди, вы добавляете его в рабочий список. Когда рабочий закончится, вы можете удалить его из рабочего списка. Если элемент представлен как новый, вы можете проверить, не ли он уже в очереди или в списке игнорировать его. –

0

Если вы открыты к идее Spring, а затем проверить свой проект Spring Integration. Он дает вам полный набор сценариев очереди/потокового пула и позволяет сосредоточиться на бизнес-логике. Конфигурация сведена к минимуму с помощью @annotations.

btw, Goetz очень хорошо.

1

Не забудьте Jini и Javaspaces. То, что вы описываете, звучит очень похоже на классический образец производителя/потребителя, который превосходит архитектуры на основе пространства.

Производитель будет писать задания в пространстве. 1 или более потребителей будут получать задания (по транзакции) и работать над этим параллельно, а затем записывать результаты обратно. Поскольку он находится под транзакцией, если возникает проблема, задание снова становится доступным для другого потребителя.

Вы можете масштабировать это тривиально, добавляя больше потребителей. Это особенно хорошо работает, когда потребители являются отдельными виртуальными машинами и масштабируются по всей сети.

0

Это не похоже на проблему мастера-работника, а на специализированный клиент выше потока. Учитывая, что у вас много потоков очистки, а не много процессоров, может оказаться целесообразным просто сделать пропущенный пас, а затем вычислительный проход. Сохраняя рабочие элементы в наборе, ограничение уникальности удалит дубликаты. Второй проход может передать всю работу в ExecutorService для параллельного выполнения процесса.

Модель мастер-рабочего обычно предполагает, что поставщик данных выполняет всю работу и передает ее мастеру для управления. Мастер контролирует выполнение работы и имеет дело с распределенными вычислениями, тайм-аутами, сбоями, попытками и т. Д. Абстракция fork-join является рекурсивным, а не итеративным поставщиком данных. Абстракция с уменьшением масштаба - это многоступенчатый мастер-мастер, который полезен в определенных сценариях.

Хороший пример мастер-работника - это тривиально параллельные проблемы, такие как поиск простых чисел. Другая - это загрузка данных, где каждая запись является независимой (проверка, преобразование, этап). Необходимость обработки известного рабочего набора, сбоев обработки и т. Д. Делает модель мастера-работника отличной от пула потоков. Вот почему мастер должен контролировать и выталкивает рабочие единицы, тогда как threadpool позволяет работникам вытаскивать работу из общей очереди.

Смежные вопросы