2012-07-04 2 views
7

Мне нужно создать N потоков потребителей, которые обрабатывают один и тот же InputStream одновременно, например, - каким-то образом преобразуют его, вычисляют контрольную сумму или цифровую подпись и т. Д. Эти потребители не зависят друг от друга, и все они используют сторонние библиотеки, которые принимают InputStream в качестве источника данных.Параллельная обработка одиночного InputStream с независимыми потребителями

Так что я могу сделать, это - создать некоторую реализацию InputStream, который

  • читать фрагмент данных из «родительского» потока
  • разблокирования потребителей
  • ожидания до каждого потребителя читать весь кусок
  • прочитать следующий фрагмент

в то же время ищет простой, он может подниматься различные проблемы, как livelo ck, когда определенный потребитель умирает, реализует все методы InputStream, управляет fork/join самими потребителями, используя барьеры/затворы и т. д.

Один приятель сказал мне, что это половина часа, чтобы осуществить, это сделало мой вечер.

Я бы предпочел либо использовать что-то достаточно зрелым (googling не пришел с результатами, таким образом, мой google-fu не достаточно хорош?) Или не беспокоить и копировать весь поток «источника» во временный файл и использовать его в качестве источника данных. Последнее решение кажется более надежным, но может закончиться созданием гигабайтных файлов (например, при обработке потокового аудио).

+0

Можете ли вы записать данные в файл и создать N FileInputStreams? –

+0

@JonLin Как он сказал в конце вопроса, он может. –

ответ

3

Способ, которым я его вижу, должен иметь по крайней мере некоторую буферизацию, так что разные потребители могут перемещаться по потоку в разном темпе, не все, что постоянно увязывается самым медленным потребителем. Это в основном обеспечивает наихудшую производительность и очень мало выгоды от параллелизма.

Вы можете, скажем, пометить каждый кусок потребителями, которые его использовали до сих пор, а затем удалить те, которые полностью израсходованы. Возможно, это может быть достигнуто каждым потребителем, имеющим ссылку на каждый кусок, который он еще не использовал, что позволило бы GC автоматически позаботиться об используемых кусках. Производитель может хранить список WeakReference s для кусков, поэтому у него есть ручка на количестве блоков, которые еще не используются, и основывать свое регулирование на этом.

Я также думаю о наличии отдельного экземпляра InputStream в потоке, который внутренне связывается с производителем InputStream. Таким образом, у вас есть легкое решение для вашей опасности для жизни: try ... finally { is.close(); } - умирающий потребитель закрывает свой собственный входной поток. Это сообщается продюсеру.

У меня есть идеи с использованием ArrayBlockingQueue для каждого потребителя. Было бы трудно обеспечить, чтобы все потребители были надлежащим образом поданы, без того, чтобы производитель блокировал или оживлял.

+0

Я бы не сказал, что это очень мало пользы - имея 5 потребителей, работающих на 1 секунду, и один потребитель работает в течение 2 секунд, одновременный вызов даст 2 секунды, а последовательный - 7 секунд. Или я чего-то не хватает? Имея помеченные куски и буферы, я поражу потребление памяти, чего я бы хотел избежать. – jdevelop

+0

Да, то, что вы говорите, неизбежно. Однако, если у вас есть потребители, сбалансированные в среднем, но их производительность сильно варьируется, вы потеряете возможность для достижения согласия, если вы всегда будете ждать каждого потребителя, который в настоящее время отстает. Буферизация поможет там. И если вы ввели балансировку приоритетов потоков, вы могли бы реально достичь такой ситуации. –

0

Рассматривали ли вы использование потоков труб? У вашего продюсера может быть один или несколько PipedOuputStream, в котором он выбрасывает все, что он читает из файла. На другой стороне труб у вас есть разные потребительские потоки, которые читаются на соответствующем PipedInputstream (который является InputStream, который вы можете поделиться с вашими библиотеками).

Ваша нить производителя может решить, через какие данные по трубам следует послать, посредством этого, предоставление данных для обработки для чтения потребительского потока на другой стороне трубы.

Если вам нужно вернуть данные из ваших потребительских потоков, вы можете создать другой канал в обратном направлении, чтобы отправить данные обратно вам.

+1

«PipedOutputStream» блокирует производителя, как только любой потребитель отстает, голодая все остальные потребители. –

0

Вы можете попробовать реализацию Java Messaging Service (JMS), например Apache ActiveMQ.

В вашем случае вам нужно создать так называемый Тема (см. Topics vs. Queues). Тема создается производителем и публикуется для N потребителей, которые могут выполняться одновременно, причем каждый потребитель получает точно такие же данные.

Поскольку вы хотите использовать InputStream, есть глава о том, как send messages are streams.

Я полагаю, что производители и потребители, как правило, были бы отдельными процессами, вероятно, работающими на разных машинах в сети. Я думаю, вы можете настроить его для полного запуска в одном JVM. Это будет зависеть от внедрения JMS. Они также довольно известны: HornetQ by JBoss, RabbitMQ, и целая куча других.

Смежные вопросы