2016-06-12 3 views
1

Использованием клиента QPID Java я только могу получить сообщения, доставляемые через обмен на связанную очередь, используя следующий расширенный синтаксис AMQAnyDestinationRabbitMQ сообщения с QPID 0,32 клиентом

Destination queue = new AMQAnyDestination(new AMQShortString("onms2"), 
               new AMQShortString("direct"), 
               new AMQShortString("Simon"), 
               true,   
               true,   
               new AMQShortString(""), 
               false,  
               bindvars); 

Если я пытаться использовать другую форму, которая просто определяет адрес следующим образом он не работает: -

Destination queue = new AMQAnyDestination("onms2/Simon"); 

сообщение попадает RabbitMQ нормально, но не доставляется.

Qpid 0,32 Client Rabbit MQ 3.5.7

Обмен ONMS Routing Key Simon

Я использую qpid примеры и модификации примера ListSender, как показано ниже

package org.apache.qpid.example; 

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 

import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.Message; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 

import org.apache.qpid.client.AMQAnyDestination; 
import org.apache.qpid.client.AMQConnection; 

import org.apache.qpid.framing.AMQShortString; 
import org.apache.qpid.jms.ListMessage; 


public class ListSender { 

public static void main(String[] args) throws Exception 
{ 
    Connection connection = 
     new AMQConnection("amqp://simon:[email protected]/test?brokerlist='tcp://localhost:5672'"); 
               AMQShortString a1 = new AMQShortString(""); 
               AMQShortString a2 = new AMQShortString(""); 
    AMQShortString[] bindvars = new AMQShortString[]{a1,a2}; 
    boolean is_durable = true; 
    /* 
    Destination queue = new AMQAnyDestination(new AMQShortString("onms2"), 
               new AMQShortString("direct"), 
               new AMQShortString("Simon"), 
               true,   
               true,   
               new AMQShortString(""), 
               false,  
               bindvars); 
    */ 

    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    Destination queue = new AMQAnyDestination("onms2/Simon"); 
    //Destination queue = new AMQAnyDestination("amqp:OpenNMSExchange/Taylor; {create: always}"); 
    //Destination queue = new AMQAnyDestination("OpenNMSExchange; {create: always}"); 
    MessageProducer producer = session.createProducer(queue); 

ListMessage m = ((org.apache.qpid.jms.Session)session).createListMessage(); 
    m.setIntProperty("Id", 987654321); 
    m.setStringProperty("name", "WidgetSimon"); 
    m.setDoubleProperty("price", 0.99); 

    List<String> colors = new ArrayList<String>(); 
    colors.add("red"); 
    colors.add("green"); 
    colors.add("white"); 
    m.add(colors); 

    Map<String,Double> dimensions = new HashMap<String,Double>(); 
    dimensions.put("length",10.2); 
    dimensions.put("width",5.1); 
    dimensions.put("depth",2.0); 
    m.add(dimensions); 

    List<List<Integer>> parts = new ArrayList<List<Integer>>(); 
    parts.add(Arrays.asList(new Integer[] {1,2,5})); 
    parts.add(Arrays.asList(new Integer[] {8,2,5})); 
    m.add(parts); 

    Map<String,Object> specs = new HashMap<String,Object>(); 
    specs.put("colours", colors); 
    specs.put("dimensions", dimensions); 
    specs.put("parts", parts); 
    m.add(specs); 

    producer.send((Message)m); 
    System.out.println("Sent: " + m); 
    connection.close(); 
} 

} 

, когда он работает с использованием расширенного формата AMQAnyDestination, журналы отладки выглядят следующим образом: -

163 [main] INFO org.apache.qpid.client.AMQConnection - Connection 1 now connected from /127.0.0.1:43298 to localhost/127.0.0.1:5672 
163 [main] DEBUG org.apache.qpid.client.AMQConnection - Are we connected:true 
163 [main] DEBUG org.apache.qpid.client.AMQConnection - Connected with ProtocolHandler Version:0-91 
166 [main] DEBUG org.apache.qpid.client.AMQDestination - Based on direct://onms2/Simon/?routingkey='Simon'&exclusive='true'&autodelete='true' the selected destination syntax is BURL 
169 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - Write channel open frame for channel id 1 
184 [main] DEBUG org.apache.qpid.client.AMQSession - Created session:[email protected] 
186 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (1028176102)Method frame received: [ChannelOpenOkBody] 
189 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (1028176102)Method frame received: [BasicQosOkBodyImpl: ] 
195 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - MessageProducer [email protected] using publish mode : ASYNC_PUBLISH_ALL 
206 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content body frames to direct://onms2/Simon/?routingkey='Simon'&exclusive='true'&autodelete='true' 
206 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content header frame to direct://onms2/Simon/?routingkey='Simon'&exclusive='true'&autodelete='true' 
207 [main] DEBUG org.apache.qpid.framing.FieldTable - FieldTable::writeToBuffer: Writing encoded length of 67... 

Когда это не удается, используя короткий синтаксис журнал отладки выглядит следующим образом: -

149 [main] INFO org.apache.qpid.client.AMQConnection - Connection 1 now connected from /127.0.0.1:36940 to localhost/127.0.0.1:5672 
149 [main] DEBUG org.apache.qpid.client.AMQConnection - Are we connected:true 
149 [main] DEBUG org.apache.qpid.client.AMQConnection - Connected with ProtocolHandler Version:0-91 
153 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - Write channel open frame for channel id 1 
169 [main] DEBUG org.apache.qpid.client.AMQSession - Created session:[email protected] 
170 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (472294496)Method frame received: [ChannelOpenOkBody] 
171 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (472294496)Method frame received: [BasicQosOkBodyImpl: ] 
179 [main] DEBUG org.apache.qpid.client.AMQDestination - Based on onms2/Simon the selected destination syntax is ADDR 
182 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - supportsIsBound: false 
182 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - supportsIsBound: false 
182 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - supportsIsBound: false 
184 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (472294496)Method frame received: [ExchangeDeclareOkBodyImpl: ] 
184 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - MessageProducer [email protected] using publish mode : ASYNC_PUBLISH_ALL 
195 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content body frames to 'onms2'/'Simon'; None 
195 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content header frame to 'onms2'/'Simon'; None 
196 [main] DEBUG org.apache.qpid.framing.FieldTable - FieldTable::writeToBuffer: Writing encoded length of 90... 
196 [main] DEBUG org.apache.qpid.framing.FieldTable - {Id=[INT: 987654321], name=[LONG_STRING: WidgetSimon], price=[DOUBLE: 0.99], qpid.subject=[LONG_STRING: Simon], JMS_QPID_DESTTYPE=[INT: 2]} 
198 [main] DEBUG org.apache.qpid.client.AMQSession - Closing session: [email protected] 
198 [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolSession - closeSession called on protocol session for session 1 

В идеале мне нужен более короткий синтаксис для работы, так как это то, что используется другим приложением, я использую, который рассылаемые сообщения используя AMQP.

Я подозреваю, что есть что-то неправильное с синтаксисом, который я использую для определения адреса, но я не могу понять, что это такое.

Я попытался: -

AMQP: onms2/Simon АДРЕСА: onms2/Simon

Я подтвердил кролик конфигурации является правильным путем тестирования с использованием как автономный клиент Java с использованием qpid, а также с использованием как perl (используя net_amqp) и python (используя pika). Так что я не думаю, что это так.

Любое увещевание оценивается.

EDIT: - Найдено некоторые дополнительные параметры конфигурации на QPID website я пропустившие Когда я настроить адрес следующим образом, это работает! onms3/Simon; { 'Создать': 'всегда', 'узел': { 'тип': 'тема'}} Подробности

<name> [/<subject> ] ; { 
create: always | sender | receiver | never, 
delete: always | sender | receiver | never, 
assert: always | sender | receiver | never, 
mode: browse | consume, 
node: { 
type: queue | topic, 
durable: True | False, 
x-declare: { ... <declare-overrides> ... }, 
x-bindings: [<binding_1>, ... <binding_n>] 
}, 
link: { 
name: <link-name>, 
durable: True | False, 
reliability: unreliable | at-most-once | at-least-once | exactly-once, 
x-declare: { ... <declare-overrides> ... }, 
x-bindings: [<binding_1>, ... <binding_n>], 
x-subscribe: { ... <subscribe-overrides> ... } 
} 
} 

Simon

ответ

1

EDIT: - Найдено некоторые дополнительные параметры конфигурации на QPID website я пропустил Когда я настраиваю адрес следующим образом, он работает!

onms3/Simon; {'create':'always','node':{'type':'topic'} } 

Деталь

<name> [/<subject> ] ; { 
create: always | sender | receiver | never, 
delete: always | sender | receiver | never, 
assert: always | sender | receiver | never, 
mode: browse | consume, 
node: { 
type: queue | topic, 
durable: True | False, 
x-declare: { ... <declare-overrides> ... }, 
x-bindings: [<binding_1>, ... <binding_n>] 
}, 
link: { 
name: <link-name>, 
durable: True | False, 
reliability: unreliable | at-most-once | at-least-once | exactly-once, 
x-declare: { ... <declare-overrides> ... }, 
x-bindings: [<binding_1>, ... <binding_n>], 
x-subscribe: { ... <subscribe-overrides> ... } 
} 
} 
+0

Я редактировал свой вопрос с ответом. Не уверен, что вы имеете в виду - пожалуйста, уточните. –

+0

Это идея. – paisanco

+0

np :-) спасибо за руководство –

Смежные вопросы