Я работаю над POC для RabbitMQ для решения M2M. У меня есть большое количество физических устройств, которые будут публиковать данные (имитируя клиентов, использующих Java-клиент, в настоящее время - в конечном итоге, над MQTT). Я хочу:rabbitmq exchange routing ключ сопоставление неожиданно
- подписки и журнал всех исходных данных в базу данных
- подписаться на суб-наборы данных по данным типа, так что я могу масштабировать решения для тех типов данных, независимо друг от друга
- публиковать новыми события путем обмена (например, принимать необработанное событие, сделать более полезным и повторно отправить его через систему)
Каждое сообщение имеет ключ маршрутизации как ключ: value.key: value.key: value.messageType: 1, а данные с устройств имеют ext ra из FROMDEVICE.MESSAGETYPE: 1.key: значение ... и т. д. Абонент, который сохраняет исходные данные с устройства, строит очередь из обмена с помощью ключа маршрутизации # .FROMDEVICE. # (случай № 1 выше). Абонент, который принимает конкретный тип сообщения и его значение, добавляет, что он строит очередь с ключом маршрутизации # .MESSAGETYPE: 1. # (пример № 2 выше) и отправляет новое сообщение на тот же обмен, удаляя FROMDEVICE из ключа маршрутизации и заменяя .MESSAGETYPE: 1 с .MESSAGETYPE: 101 (случай № 3 выше). Затем появляется независимый абонент/очередь для нового типа сообщения.
Все в порядке, за исключением моего подписчика, который должен получать только данные от устройств, также получает данные с добавленной стоимостью (MESSAGETYPE: 101), даже если маршрутизацияKey, которую он должен искать, не существует в повторно опубликованной/ценности -адресное сообщение.
- FROMDEVICE.MESSAGETYPE: 1 ->
- должен соответствовать маршрутизации ключа # .FROMDEVICE #
- должны соответствовать # .MESSAGETYPE: 1. #
- MessageType: 101
-
.
- должен соответствовать ключу маршрутизации # .MESSAGETYPE: 101. #
- НЕ должен соответствовать # .FROMDEVICE. # (Но есть)
код для подписки на данные только из устройств:
public class HandlerWriteEverythingFromDevice {
private final static String EXCHANGE_NAME = "logsTopicDurable";
private final static String QUEUE_NAME = "fromDevice";
/**
* Writes all data from device to a data store.
*/
public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.101");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] listens for messages from devices - durable!");
channel.basicQos(1);
String routingKey = "#.fromDevice.#".toUpperCase();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //bind to all selected messages
System.out.println(" [*] subscribing to: " + routingKey);
System.out.println(" [*] Waiting for messages. To exit press CTRL_C");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false; //ack back when done
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
int msgCount = 0;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Message Count: " + ++msgCount + " ROUTINGKEY: '" + delivery.getEnvelope().getRoutingKey() + "\n MESSAGE: '" + message + "'");
Thread.sleep(250); //simulate some time to insert into the db.
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
Код для подписки только MessageType: 1 и переиздать MessageType: 101
private final static String EXCHANGE_NAME = "logsTopicDurable";
private final static String QUEUE_NAME = "messageType1";
/**
* Handler for messageType:1
*/
public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.101");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] listens for messageType:1 and submits messageType:101");
channel.basicQos(1);
String routingKey = "#.messageType:1.#".toUpperCase();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //bind to all selected messages
System.out.println(" [*] subscribing to: " + routingKey);
System.out.println(" [*] Waiting for messages. To exit press CTRL_C");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false; //ack back when done
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
int msgCount = 0;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Message Count: " + ++msgCount + " ROUTINGKEY: '" + delivery.getEnvelope().getRoutingKey() + "\n MESSAGE: '" + message + "'");
channel.basicPublish(EXCHANGE_NAME,
delivery.getEnvelope().
getRoutingKey().
replaceAll("messageType:1", "messageType:101").
replaceAll(".FROMDEVICE", "").
replaceAll("FROMDEVICE.", "").trim(),
true,
MessageProperties.PERSISTENT_BASIC,
message.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
Существует издатель код и код абонента для messageType: 101, но я не думаю, что они необходимы для этого обсуждения. Я задавался вопросом, может ли быть причиной публикации на канале, у которого есть связанная с ним очередь, но я попытался создать два канала (тот же объект соединения) и имел тот же результат и много уродливый код.
Вы правы, я немного смешиваю свою терминологию. Ключ маршрутизации в сообщении: «FROMDEVICE.MESSAGETYPE: 1», а ключ привязки в подписке «# .FROMDEVICE. #». Моя проблема заключается в том, что ключ маршрутизации «MESSAGETYPE: 101» каким-то образом соответствует ключу привязки «# .FROMDEVICE. #» (В этом случае я хочу взять сообщение с ключом маршрутизации «FROMDEVICE.MESSAGETYPE: 1», изменив его и повторно - опубликуйте сообщение с помощью ключа маршрутизации «MESSAGETYPE: 101» без согласования ключа привязки «# .FROMDEVICE. #»). – pherris
вы попробовали мое предложение? – robthewolf
Мне было любопытно, как будет работать этот открытый матч. Я слишком рано в своем POC, чтобы узнать, смогу ли я выполнить заказ в ключе маршрутизации. Я упростил свой пример кода, изменив ключи маршрутизации, чтобы всегда начинать с FROMDEVICE ... (если с устройства и не опубликован другим пользователем). Я также обновил ключ привязки к fromdevice. #, Чтобы отразить изменение. Результат был одинаков - повторно опубликованное сообщение все еще поставлено в очередь для абонента, который не соответствует ключу маршрутизации. Может быть, я использую тот же канал для потребления и публикации сообщений? Или дефект WSO2? – pherris