У вас есть несколько вариантов, как вы заявили.
Если вы конвертируете его в тему, чтобы получить тот же эффект, вам нужно будет сделать потребителей постоянными потребителями. Одна вещь, предлагаемая в очереди, - это настойчивость, если ваш потребитель не жив. Это будет зависеть от используемой системы MQ.
Если вы хотите придерживаться очередей, вы создадите очередь для каждого потребителя и диспетчера, который будет прослушивать исходную очередь.
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
Плюсы Темы
- Легче динамически добавлять новых потребителей. Все потребители получат новые сообщения без какой-либо работы.
- Вы можете создавать круговые темы, чтобы Consumer_1 получил сообщение, затем Consumer_2, затем Consumer_3
- Потребителям могут быть предложены новые сообщения, вместо того чтобы запрашивать очередь, делая их реактивными.
Против Темы
- сообщения не являются постоянными, если ваш брокер не поддерживает эту конфигурацию. Если потребитель уходит в отставку и возвращается, можно пропустить пропущенные сообщения, если не настроены постоянные потребители.
- Сложно разрешить Consumer_1 и Consumer_2 получать сообщение, но не Consumer_3.При диспетчере и очередях диспетчер не может отправить сообщение в очередь Consumer_3.
Плюсов Очереди
- сообщения являются постоянными, пока потребитель не удаляет их
- диспетчесркие можно отфильтровать потребители получают какие сообщения, не размещать сообщения в соответствующие потребителях очереди. Тем не менее это можно сделать с помощью тем через фильтры.
Против Очереди
- Дополнительные Очереди должны быть созданы для поддержки нескольких потребителей. В динамической среде это было бы неэффективно.
При разработке системы обмена сообщениями я предпочитаю темы, поскольку это дает мне максимальную мощность, но, увидев, что вы уже используете очереди, вам потребуется изменить, как ваша система работает для реализации тем вместо.
Проектирование и реализация очереди системы с несколькими потребителями
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
Источник
Имейте в виду, что есть другие вещи, которые вы должны будете заботиться о таких как обработка исключений проблема, пересоединение к соединению и очереди, если вы потеряете соединение и т. д. Это просто предназначено, чтобы дать вам представление о том, как выполнить то, что я описал.
В реальной системе я, вероятно, не выйду из первого исключения. Я бы позволил системе продолжать работать, насколько это было возможно, и регистрировать ошибки. Поскольку этот код стоит в том случае, если сообщение в одной очереди потребителей терпит неудачу, весь диспетчер остановится.
Dispatcher.java
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package stackoverflow_4615895;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
public class Dispatcher {
private static long QUEUE_WAIT_TIME = 1000;
private boolean mStop = false;
private QueueConnectionFactory mFactory;
private String mSourceQueueName;
private String[] mConsumerQueueNames;
/**
* Create a dispatcher
* @param factory
* The QueueConnectionFactory in which new connections, session, and consumers
* will be created. This is needed to ensure the connection is associated
* with the correct thread.
* @param source
*
* @param consumerQueues
*/
public Dispatcher(
QueueConnectionFactory factory,
String sourceQueue,
String[] consumerQueues) {
mFactory = factory;
mSourceQueueName = sourceQueue;
mConsumerQueueNames = consumerQueues;
}
public void start() {
Thread thread = new Thread(new Runnable() {
public void run() {
Dispatcher.this.run();
}
});
thread.setName("Queue Dispatcher");
thread.start();
}
public void stop() {
mStop = true;
}
private void run() {
QueueConnection connection = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
QueueSession session = null;
try {
// Setup connection and queues for receiving the messages
connection = mFactory.createQueueConnection();
session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue sourceQueue = session.createQueue(mSourceQueueName);
consumer = session.createConsumer(sourceQueue);
// Create a null producer allowing us to send messages
// to any queue.
producer = session.createProducer(null);
// Create the destination queues based on the consumer names we
// were given.
Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
for (int index = 0; index < mConsumerQueueNames.length; ++index) {
destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
}
connection.start();
while (!mStop) {
// Only wait QUEUE_WAIT_TIME in order to give
// the dispatcher a chance to see if it should
// quit
Message m = consumer.receive(QUEUE_WAIT_TIME);
if (m == null) {
continue;
}
// Take the message we received and put
// it in each of the consumers destination
// queues for them to process
for (Queue q : destinationQueues) {
producer.send(q, m);
}
}
} catch (JMSException ex) {
// Do wonderful things here
} finally {
if (producer != null) {
try {
producer.close();
} catch (JMSException ex) {
}
}
if (consumer != null) {
try {
consumer.close();
} catch (JMSException ex) {
}
}
if (session != null) {
try {
session.close();
} catch (JMSException ex) {
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
}
}
}
}
}
Main.java
QueueConnectionFactory factory = ...;
Dispatcher dispatcher =
new Dispatcher(
factory,
"Queue_Original",
new String[]{
"Consumer_Queue_1",
"Consumer_Queue_2",
"Consumer_Queue_3"});
dispatcher.start();
+1 хороший ответ. – skaffman
Это был отличный ответ. Я использую реализацию MOM от JBoss, которая является HornetQ. –
@Anonimo Последний раз, когда я проверил JBoss, полностью придерживается спецификации JMS. Это вызвало некоторое разочарование для меня в прошлом, потому что я динамически создаю темы, которые спецификация JMS не учитывает. Другие, такие как ActiveMQ, позволяют динамически создавать темы, и требуется только одна строка изменения кода в JBoss, чтобы обеспечить ту же функциональность. –