2011-01-06 2 views
20

У меня есть клиент JMS, который создает сообщения и отправляет очередь JMS своему уникальному потребителю.JMS - Переход от одного к нескольким потребителям

Что я хочу, это больше, чем один потребитель, получающий эти сообщения. Первое, что приходит мне на ум, - превратить очередь в тему, так что нынешние и новые пользователи могут подписаться и получить одно и то же сообщение, доставленное всем из них.

Это, очевидно, будет связано с изменением кода текущих клиентов как на стороне производителя, так и на стороне потребителей.

Я хотел бы также посмотреть другие варианты, такие как создание второй очереди, так что мне не нужно изменять существующего пользователя. Я считаю, что есть преимущества в этом подходе, например (поправьте меня, если я ошибаюсь), балансируя нагрузку между двумя разными очередями, а не одной, что может оказать положительное влияние на производительность.

Я хотел бы получить информацию об этих вариантах и ​​минусах/плюсах, которые вы можете увидеть. Любая обратная связь высоко ценится.

ответ

45

У вас есть несколько вариантов, как вы заявили.

Если вы конвертируете его в тему, чтобы получить тот же эффект, вам нужно будет сделать потребителей постоянными потребителями. Одна вещь, предлагаемая в очереди, - это настойчивость, если ваш потребитель не жив. Это будет зависеть от используемой системы MQ.

Если вы хотите придерживаться очередей, вы создадите очередь для каждого потребителя и диспетчера, который будет прослушивать исходную очередь.

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 
             -> Queue_Consumer_2 <- Consumer_2 
             -> Queue_Consumer_3 <- Consumer_3 

Плюсы Темы

  • Легче динамически добавлять новых потребителей. Все потребители получат новые сообщения без какой-либо работы.
  • Вы можете создавать круговые темы, чтобы Consumer_1 получил сообщение, затем Consumer_2, затем Consumer_3
  • Потребителям могут быть предложены новые сообщения, вместо того чтобы запрашивать очередь, делая их реактивными.

Против Темы

  • сообщения не являются постоянными, если ваш брокер не поддерживает эту конфигурацию. Если потребитель уходит в отставку и возвращается, можно пропустить пропущенные сообщения, если не настроены постоянные потребители.
  • Сложно разрешить Consumer_1 и Consumer_2 получать сообщение, но не Consumer_3.При диспетчере и очередях диспетчер не может отправить сообщение в очередь Consumer_3.

Плюсов Очереди

  • сообщения являются постоянными, пока потребитель не удаляет их
  • диспетчесркие можно отфильтровать потребители получают какие сообщения, не размещать сообщения в соответствующие потребителях очереди. Тем не менее это можно сделать с помощью тем через фильтры.

Против Очереди

  • Дополнительные Очереди должны быть созданы для поддержки нескольких потребителей. В динамической среде это было бы неэффективно.

При разработке системы обмена сообщениями я предпочитаю темы, поскольку это дает мне максимальную мощность, но, увидев, что вы уже используете очереди, вам потребуется изменить, как ваша система работает для реализации тем вместо.

Проектирование и реализация очереди системы с несколькими потребителями

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1 
             -> Queue_Consumer_2 <- Consumer_2 
             -> Queue_Consumer_3 <- Consumer_3 

Источник

Имейте в виду, что есть другие вещи, которые вы должны будете заботиться о таких как обработка исключений проблема, пересоединение к соединению и очереди, если вы потеряете соединение и т. д. Это просто предназначено, чтобы дать вам представление о том, как выполнить то, что я описал.

В реальной системе я, вероятно, не выйду из первого исключения. Я бы позволил системе продолжать работать, насколько это было возможно, и регистрировать ошибки. Поскольку этот код стоит в том случае, если сообщение в одной очереди потребителей терпит неудачу, весь диспетчер остановится.

Dispatcher.java

/* 
* To change this template, choose Tools | Templates 
* and open the template in the editor. 
*/ 
package stackoverflow_4615895; 

import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageProducer; 
import javax.jms.Queue; 
import javax.jms.QueueConnection; 
import javax.jms.QueueConnectionFactory; 
import javax.jms.QueueSession; 
import javax.jms.Session; 

public class Dispatcher { 

    private static long QUEUE_WAIT_TIME = 1000; 
    private boolean mStop = false; 
    private QueueConnectionFactory mFactory; 
    private String mSourceQueueName; 
    private String[] mConsumerQueueNames; 

    /** 
    * Create a dispatcher 
    * @param factory 
    *  The QueueConnectionFactory in which new connections, session, and consumers 
    *  will be created. This is needed to ensure the connection is associated 
    *  with the correct thread. 
    * @param source 
    * 
    * @param consumerQueues 
    */ 
    public Dispatcher(
     QueueConnectionFactory factory, 
     String sourceQueue, 
     String[] consumerQueues) { 

     mFactory = factory; 
     mSourceQueueName = sourceQueue; 
     mConsumerQueueNames = consumerQueues; 
    } 

    public void start() { 
     Thread thread = new Thread(new Runnable() { 

      public void run() { 
       Dispatcher.this.run(); 
      } 
     }); 
     thread.setName("Queue Dispatcher"); 
     thread.start(); 
    } 

    public void stop() { 
     mStop = true; 
    } 

    private void run() { 

     QueueConnection connection = null; 
     MessageProducer producer = null; 
     MessageConsumer consumer = null; 
     QueueSession session = null; 
     try { 
      // Setup connection and queues for receiving the messages 
      connection = mFactory.createQueueConnection(); 
      session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE); 
      Queue sourceQueue = session.createQueue(mSourceQueueName); 
      consumer = session.createConsumer(sourceQueue); 

      // Create a null producer allowing us to send messages 
      // to any queue. 
      producer = session.createProducer(null); 

      // Create the destination queues based on the consumer names we 
      // were given. 
      Queue[] destinationQueues = new Queue[mConsumerQueueNames.length]; 
      for (int index = 0; index < mConsumerQueueNames.length; ++index) { 
       destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]); 
      } 

      connection.start(); 

      while (!mStop) { 

       // Only wait QUEUE_WAIT_TIME in order to give 
       // the dispatcher a chance to see if it should 
       // quit 
       Message m = consumer.receive(QUEUE_WAIT_TIME); 
       if (m == null) { 
        continue; 
       } 

       // Take the message we received and put 
       // it in each of the consumers destination 
       // queues for them to process 
       for (Queue q : destinationQueues) { 
        producer.send(q, m); 
       } 
      } 

     } catch (JMSException ex) { 
      // Do wonderful things here 
     } finally { 
      if (producer != null) { 
       try { 
        producer.close(); 
       } catch (JMSException ex) { 
       } 
      } 
      if (consumer != null) { 
       try { 
        consumer.close(); 
       } catch (JMSException ex) { 
       } 
      } 
      if (session != null) { 
       try { 
        session.close(); 
       } catch (JMSException ex) { 
       } 
      } 
      if (connection != null) { 
       try { 
        connection.close(); 
       } catch (JMSException ex) { 
       } 
      } 
     } 
    } 
} 

Main.java

QueueConnectionFactory factory = ...; 

    Dispatcher dispatcher = 
      new Dispatcher(
      factory, 
      "Queue_Original", 
      new String[]{ 
       "Consumer_Queue_1", 
       "Consumer_Queue_2", 
       "Consumer_Queue_3"}); 
    dispatcher.start(); 
+0

+1 хороший ответ. – skaffman

+0

Это был отличный ответ. Я использую реализацию MOM от JBoss, которая является HornetQ. –

+0

@Anonimo Последний раз, когда я проверил JBoss, полностью придерживается спецификации JMS. Это вызвало некоторое разочарование для меня в прошлом, потому что я динамически создаю темы, которые спецификация JMS не учитывает. Другие, такие как ActiveMQ, позволяют динамически создавать темы, и требуется только одна строка изменения кода в JBoss, чтобы обеспечить ту же функциональность. –

4

Возможно, вам не придется изменять код; это зависит от того, как вы его написали.

Например, если ваш код отправляет сообщения с использованием MessageProducer, а не QueueSender, то он будет работать как для тем, так и для очередей. Аналогично, если вы использовали MessageConsumer, а не QueueReceiver.

По сути, это хорошая практика в приложениях JMS использовать неспецифические интерфейсы для взаимодействия с системой JMS, такие как MessageProducer, MessageConsumer, Destination и т.д. Если это так, то это «просто» вопрос конфигурации.

+0

Это было бы хорошим вариантом. К сожалению, мы используем определенные интерфейсы, такие как QueueSender. Это определенно то, что я буду иметь в виду, если мы реорганизуем. –

Смежные вопросы