1

Я хочу реализовать множество шаблонов издателя/подписчика с использованием Java и в настоящее время исчерпывает идеи.Параллельная реализация шаблона издателя/подписчика

Существует 1 подписчик издателя и N, издатель публикует объекты, тогда каждый абонент должен обрабатывать каждый из объектов один раз и только один раз в правильном порядке. Издатель и каждый подписчик работают в своем потоке.

В моей первоначальной реализации каждый абонент имеет свою собственную блокирующую очередь, а издатель помещает объекты в каждую из абонентских очередей. Это работает нормально, но издатель будет заблокирован, если какая-либо из очереди абонента будет заполнена. Это приводит к снижению производительности, поскольку каждый абонент занимает различное время при обработке объекта.

Затем в другой реализации издатель удерживает объект в своей очереди. Наряду с объектом с ним связан счетчик AtomicInteger с количеством подписчиков. Затем каждый подписчик заглядывает в очередь и уменьшает счетчик и удаляет его из очереди, когда счетчик достигает нуля.

Таким образом, издатель свободен от блокировки, но теперь подписчику придется ждать друг друга, чтобы обработать объект, удалив объект из очереди, до того, как следующий объект может быть заархивирован.

Есть ли лучший способ сделать это? Я предполагаю, что это должна быть довольно распространенная картина.

+0

мне интересно, если Акку с асинхронным сообщениями объект, может помочь в этом случае? – Wudong

ответ

0

Реализация вашего «большого количества очередей» - это путь. Я не думаю, что вам обязательно нужно иметь дело с одной полной очередью, блокирующей производителя, потому что общее время до завершения не будет затронуто. Предположим, у вас есть три потребителя, два из них потребляют со скоростью 1 в секунду, а третий потребляет со скоростью 1 на пять секунд, между тем производитель производит со скоростью 1 раз в две секунды. В конце концов третья очередь будет заполнена, и поэтому производитель заблокирует ее, а также прекратит помещать элементы в первую и вторую очереди. Есть способы обойти это, но они не собираются менять тот факт, что третьим потребителем будет всегда быть узким местом. Если вы производят/потребляете 100 предметов, то это займет не менее 500 секунд из-за третьего потребителя (5 секунд, умноженного на 100 пунктов), и это будет так, даже если первый и второй потребители закончат через 200 секунд (потому что вы сделали что-то умное, чтобы продюсер мог продолжать заполнять свои очереди даже после того, как третья очередь заполнена), или если они заканчиваются через 500 секунд (потому что продюсер заблокирован в третьей очереди).

+0

Хотя это зависит от прецедента - возможно, подпункт 3 разрешает молча сдавать или отбрасывать предметы, если он отстает. Возможно, важным показателем является среднее время между публикацией и потреблением. Но я согласен с тем, что это, по-видимому, лучший подход с чисто пропускной точки зрения. – selig

+0

спасибо за ответ. Однако моя проблема заключается в том, что весь потребитель в конечном итоге будет обрабатывать все объекты за аналогичный период времени. однако каждый потребитель занимал разное время при обработке одного объекта. Не должно быть ни одного потребителя, блокирующего все остальные. – Wudong

+0

@Wudong В этом случае вам нужно будет использовать блокирующие очереди с неограниченной пропускной способностью, то есть «LinkedBlockingQueues» - продлить продление продлить невозможно, если одна из ваших очередей заполнена. (Вы можете хранить лишние предметы где-то в другом месте, но это просто более сложная версия использования блокирующих очередей с неограниченной пропускной способностью.) –

0

Definately

каждый абонент имеет свою собственную очередь блокирования, и издатель поставил объекты в каждый из queue.` абонента

это путь. вы можете использовать поточный подход, чтобы поместить его в очередь ... так что если одна очередь будет полной, издатель не будет ждать.

например.

s1 s2 s3 являются подписчиками, а addToQueue - это метод в каждом подписчике, который добавляет к коррумпирующей очереди. addQueue Метод, который ждет, пока очередь не будет пуста .. поэтому вызов addQueue будет блокирующим вызовом ideally synchronised code ...

Тогда в издателю вы можете сделать что-то подобное ниже код

Примечания: код не может быть в рабочем состоянии, как это .. но должны дать вам идею.

List<subscriber> slist;// Assume its initialised 
public void publish(final String message){ 

    for (final subscriber s: slist){ 


      Thread t=new Thread(new Runnable(){ 
      public void run(){ 
       s.addToQueue(message); 
      } 
      }); 

     t.start(); 
    } 

} 
+0

Это интересная идея, но будет ли она выполнена? каждый поток для отправки сообщения кажется слишком большим. – Wudong

+0

@Wudong Я никогда не пробовал ... вы можете попробовать и сообщить нам. но я думаю, что он должен сделать трюк. Также обратите внимание: потоки завершатся, как только сообщение добавляется в очередь ... так же, как очередь не заполнена, не будет много ожидающих потоков .... –

0

Существует один издатель и подписчики N, издатель публиковать объекты затем каждый абонент должен обрабатывать каждый из объектов один раз и только один раз в правильном порядке. Издатель и каждый подписчик работают в своем потоке.

Я бы изменил эту архитектуру. Сначала я рассматривал очередь на подписчика, но мне не нравится этот механизм. Например, если первый абонент занимает больше времени для запуска, все задания будут попадать в очередь, и вы будете выполнять только один поток работы.

Поскольку вы должны запускать подписчиков по порядку, у меня будет пул потоков, которые запускают каждое сообщение через всех подписчиков. Призывы к абонентам должны быть реентерабельными, что может быть невозможно.

Таким образом, вы бы иметь пул из 10 нитей (скажем так) и каждый из них dequeues из очереди издателя, и делает что-то вроде следующего:

public void run() { 
    while (!shutdown && !Thread.currentThread().isInterrupted()) { 
     Article article = publisherQueue.take(); 
     for (Subscriber subscriber : subscriberList) { 
      subscriber.process(article); 
     } 
    } 
} 
+0

Как я могу гарантировать порядок, который каждый абонент обрабатывает «статью» в этой реализации? Если каждое изделие обрабатывается потоком, очень возможно, что они обрабатываются в другом порядке, который они производятся. – Wudong

+0

Ничего себе. Я не понял с вашего поста, что пришлось пройти через всех подписчиков в том же порядке. Действительно ли порядок подписчиков имеет значение или это только окончательный порядок статей? – Gray

+0

это просто порядок «статей», порядок подписчиков не имеет значения. – Wudong

Смежные вопросы