2015-09-10 2 views
0

Что было бы лучшим способом использовать пакеты сообщений «тема-ed» из RabbitMQ параллельно и в порядке.Потребляйте сообщения RabbitMQ с порядком и контекстом

У нас есть сервер, который обрабатывает данные для многих клиентов. Каждый раз, когда обрабатываются данные клиента, в RMQ отправляется куча сообщений. С другой стороны, у нас есть процесс, который использует данные и хранит их в базе данных.

Процесс потребления медленный, и мы хотим распараллелить его и сделать его масштабируемым. Проблема в том, что данные для одного клиента не могут одновременно обрабатываться двумя потребителями. Производитель работает время от времени и может добавлять сообщения в очередь даже для клиента, у которого уже есть сообщения в очереди.

Одним из предложений было создание новой таблицы БД, которая будет указана для каждого клиента, если данные обрабатываются. Потребитель будет запрашивать сообщения только для клиентов, которые не обрабатываются другими потребителями, и регистрируется в БД для этого клиента.

Я не хочу использовать это решение, потому что он требует подключения к базе данных и он содержит состояние выполнения в базе данных.

Я надеялся найти решение, которое может быть обработано в рамках нашего кода потребителя/производителя и RMQ.

Было высказано предположение о том, что сообщения, написанные в RMQ по «темам» клиента, и потребители читают одну «тему». Сообщение будет добавлено в отдельную очередь (или «тему») для каждого сообщения пакета или клиента. Потребитель будет получать сообщение «клиентов» и использовать его данные для выбора «темы» из основной очереди.

Проблема заключается в том, что производитель хочет добавить новые данные в основную очередь для клиента, который уже имеет данные в основной очереди, которая в настоящее время обрабатывается.

Как мы можем синхронизировать потребление продукции над RMQ?

+0

Вы пытались использовать транзакции? У меня нет непосредственного опыта с ними, но кажется, что вы ищете (до тех пор, пока сообщения происходят из той же самой сессии производителя) – SJuan76

ответ

0

Я думаю, вы могли бы взглянуть на учебные пособия по rabbitmq.

http://www.rabbitmq.com/tutorials/tutorial-five-java.html

Пример кода:

import com.rabbitmq.client.*; 

import java.io.IOException; 

public class ReceiveLogsTopic { 
    private static final String EXCHANGE_NAME = "topic_logs"; 

    public static void main(String[] argv) throws Exception { 
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 

    channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 
    String queueName = channel.queueDeclare().getQueue(); 

    if (argv.length < 1) { 
     System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); 
     System.exit(1); 
    } 

    for (String bindingKey : argv) { 
     channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); 
    } 

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

    Consumer consumer = new DefaultConsumer(channel) { 
     @Override 
     public void handleDelivery(String consumerTag, Envelope envelope, 
           AMQP.BasicProperties properties, byte[] body) throws IOException { 
     String message = new String(body, "UTF-8"); 
     System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); 
     } 
    }; 
    channel.basicConsume(queueName, true, consumer); 
    } 
} 

Метод для выполнения кода и сохранения в файле:

java -cp $CP ReceiveLogsTopic "#" > logfile.log 

Я надеюсь, что это помогает и дать вам идею.

На самом деле лучшим способом является использование БД, но если вы не в порядке, это означает, что вы можете попробовать, имея сообщения в файле и отслеживать и повторно использовать его.

То есть вы можете сохранять данные в файле во время выполнения и отслеживать его по мере необходимости во время выполнения.

Примечание: Я приложил пример кода, приведенный в учебнике, потому что любой может отслеживать детали, даже если связь будет изменяться в будущем.

Смежные вопросы