2013-03-19 2 views
0

Я работаю над POC для RabbitMQ для решения M2M. У меня есть большое количество физических устройств, которые будут публиковать данные (имитируя клиентов, использующих Java-клиент, в настоящее время - в конечном итоге, над MQTT). Я хочу:rabbitmq exchange routing ключ сопоставление неожиданно

  1. подписки и журнал всех исходных данных в базу данных
  2. подписаться на суб-наборы данных по данным типа, так что я могу масштабировать решения для тех типов данных, независимо друг от друга
  3. публиковать новыми события путем обмена (например, принимать необработанное событие, сделать более полезным и повторно отправить его через систему)

Каждое сообщение имеет ключ маршрутизации как ключ: value.key: value.key: value.messageType: 1, а данные с устройств имеют ext ra из FROMDEVICE.MESSAGETYPE: 1.key: значение ... и т. д. Абонент, который сохраняет исходные данные с устройства, строит очередь из обмена с помощью ключа маршрутизации # .FROMDEVICE. # (случай № 1 выше). Абонент, который принимает конкретный тип сообщения и его значение, добавляет, что он строит очередь с ключом маршрутизации # .MESSAGETYPE: 1. # (пример № 2 выше) и отправляет новое сообщение на тот же обмен, удаляя FROMDEVICE из ключа маршрутизации и заменяя .MESSAGETYPE: 1 с .MESSAGETYPE: 101 (случай № 3 выше). Затем появляется независимый абонент/очередь для нового типа сообщения.

Все в порядке, за исключением моего подписчика, который должен получать только данные от устройств, также получает данные с добавленной стоимостью (MESSAGETYPE: 101), даже если маршрутизацияKey, которую он должен искать, не существует в повторно опубликованной/ценности -адресное сообщение.

  • FROMDEVICE.MESSAGETYPE: 1 ->
    • должен соответствовать маршрутизации ключа # .FROMDEVICE #
    • должны соответствовать # .MESSAGETYPE: 1. #
  • MessageType: 101
      .
    • должен соответствовать ключу маршрутизации # .MESSAGETYPE: 101. #
    • НЕ должен соответствовать # .FROMDEVICE. # (Но есть)

код для подписки на данные только из устройств:

public class HandlerWriteEverythingFromDevice { 

private final static String EXCHANGE_NAME = "logsTopicDurable"; 
private final static String QUEUE_NAME = "fromDevice"; 
/** 
* Writes all data from device to a data store. 
*/ 
public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException { 
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("192.168.56.101"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 

    channel.exchangeDeclare(EXCHANGE_NAME, "topic", true); 
    channel.queueDeclare(QUEUE_NAME, true, false, false, null); 

    System.out.println(" [*] listens for messages from devices - durable!"); 

    channel.basicQos(1); 

    String routingKey = "#.fromDevice.#".toUpperCase(); 

    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //bind to all selected messages 
    System.out.println(" [*] subscribing to: " + routingKey); 

    System.out.println(" [*] Waiting for messages. To exit press CTRL_C"); 

    QueueingConsumer consumer = new QueueingConsumer(channel); 
    boolean autoAck = false; //ack back when done 
    channel.basicConsume(QUEUE_NAME, autoAck, consumer); 
    int msgCount = 0; 
    while (true) { 
     QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
     String message = new String(delivery.getBody()); 

     System.out.println(" [x] Message Count: " + ++msgCount + " ROUTINGKEY: '" + delivery.getEnvelope().getRoutingKey() + "\n  MESSAGE: '" + message + "'"); 
     Thread.sleep(250); //simulate some time to insert into the db. 
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
    } 
} 
} 

Код для подписки только MessageType: 1 и переиздать MessageType: 101

private final static String EXCHANGE_NAME = "logsTopicDurable"; 
private final static String QUEUE_NAME = "messageType1"; 
/** 
* Handler for messageType:1 
*/ 
public static void main(String[] args) throws java.io.IOException, java.lang.InterruptedException { 
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("192.168.56.101"); 
    Connection connection = factory.newConnection(); 
    Channel channel = connection.createChannel(); 

    channel.exchangeDeclare(EXCHANGE_NAME, "topic", true); 
    channel.queueDeclare(QUEUE_NAME, true, false, false, null); 

    System.out.println(" [*] listens for messageType:1 and submits messageType:101"); 

    channel.basicQos(1); 

    String routingKey = "#.messageType:1.#".toUpperCase(); 

    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); //bind to all selected messages 
    System.out.println(" [*] subscribing to: " + routingKey); 

    System.out.println(" [*] Waiting for messages. To exit press CTRL_C"); 

    QueueingConsumer consumer = new QueueingConsumer(channel); 
    boolean autoAck = false; //ack back when done 
    channel.basicConsume(QUEUE_NAME, autoAck, consumer); 
    int msgCount = 0; 
    while (true) { 
     QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
     String message = new String(delivery.getBody()); 

     System.out.println(" [x] Message Count: " + ++msgCount + " ROUTINGKEY: '" + delivery.getEnvelope().getRoutingKey() + "\n  MESSAGE: '" + message + "'"); 

     channel.basicPublish(EXCHANGE_NAME, 
       delivery.getEnvelope(). 
         getRoutingKey(). 
         replaceAll("messageType:1", "messageType:101"). 
         replaceAll(".FROMDEVICE", ""). 
         replaceAll("FROMDEVICE.", "").trim(), 
       true, 
       MessageProperties.PERSISTENT_BASIC, 
       message.getBytes()); 

     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
    } 
} 

Существует издатель код и код абонента для messageType: 101, но я не думаю, что они необходимы для этого обсуждения. Я задавался вопросом, может ли быть причиной публикации на канале, у которого есть связанная с ним очередь, но я попытался создать два канала (тот же объект соединения) и имел тот же результат и много уродливый код.

ответ

1

Я бы предположил, что вы немного либерализируете с помощью ваших ключей привязки. Чтобы сделать вещи более понятными, вы должны использовать термин «ключ привязки» и ключ маршрутизации по-разному. Ключ маршрутизации - это то, что отправляется производителем. Ключ привязки - это то, что вы используете для привязки очереди к обмену темами.

Как я не могу быть уверен, который вы имеете в виду, когда вы говорите

«должен соответствовать маршрутизации клавишу # .MESSAGETYPE: 101. #»

вы отправив сообщение с маршрутизацией ключ #.MESSAGETYPE:101.#, потому что это было бы плохой идеей. Я полагаю, что нет, но если вы этого не сделаете!

Предположим, что это ваш ключ привязки. Я не уверен, поскольку я не проводил никаких испытаний этого специально, но # до и после, может быть, вызвал некоторые проблемы. Вы должны подумать о спецификации своих ключей маршрутизации. Некоторые форматы, которым они должны соответствовать. Он может быть расширяемым, но не полностью бесплатным. Таким образом, вы можете иметь гораздо более конкретные ключи привязки, используя * вместо #, который даст немного больше контроля.

+0

Вы правы, я немного смешиваю свою терминологию. Ключ маршрутизации в сообщении: «FROMDEVICE.MESSAGETYPE: 1», а ключ привязки в подписке «# .FROMDEVICE. #». Моя проблема заключается в том, что ключ маршрутизации «MESSAGETYPE: 101» каким-то образом соответствует ключу привязки «# .FROMDEVICE. #» (В этом случае я хочу взять сообщение с ключом маршрутизации «FROMDEVICE.MESSAGETYPE: 1», изменив его и повторно - опубликуйте сообщение с помощью ключа маршрутизации «MESSAGETYPE: 101» без согласования ключа привязки «# .FROMDEVICE. #»). – pherris

+0

вы попробовали мое предложение? – robthewolf

+0

Мне было любопытно, как будет работать этот открытый матч. Я слишком рано в своем POC, чтобы узнать, смогу ли я выполнить заказ в ключе маршрутизации. Я упростил свой пример кода, изменив ключи маршрутизации, чтобы всегда начинать с FROMDEVICE ... (если с устройства и не опубликован другим пользователем). Я также обновил ключ привязки к fromdevice. #, Чтобы отразить изменение. Результат был одинаков - повторно опубликованное сообщение все еще поставлено в очередь для абонента, который не соответствует ключу маршрутизации. Может быть, я использую тот же канал для потребления и публикации сообщений? Или дефект WSO2? – pherris