2013-07-04 4 views
0

С RabbitMq подписки на несколько очередей выполняются по одному соединению через каналы. Это позволяет использовать один сокет для обработки нескольких подписей. Какова аналоговая стратегия минимизации соединений сокетов для нескольких типов сообщений в Zero MQ?Как создать несколько типов сообщений через одно соединение с 0MQ?

ответ

3

Я бы сказал, что конверты сообщений PUB/SUB являются предпочтительным способом решения этой проблемы в ZeroMQ (см. Pub-Sub Message Envelopes).

Перед отправкой сообщения, издатель должен отправить свою тему в конверте:

s_sendmore (publisher, "A"); 
    s_send (publisher, <Serialized MessageA>); 
    ..... 
    s_sendmore (publisher, "B"); 
    s_send (publisher, <Serialized MessageB>); 

Suscriber имеет 3 варианта для обработки нескольких тем:

Figure 1

  • Прочитать сообщение тип из конверта, используйте десериализатор в зависимости от типа сообщения (Клиент 1 на рисунке).

    char *topic = s_recv (subscriber); 
    char *body = s_recv (subscriber); 
    //If topic == "A", the body contains serialized MessageA, otherwise MessageB 
    
  • Подписаться только на одной теме с zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, ...);. В этой ситуации ZeroMQ выполняет PUB стороне фильтрации сообщений (начиная с версии 3.x), и клиент может быть уверен, что он будет получать только сообщения типа MessageA (Client 2 на рисунке)

    char *topic = s_recv (subscriber); 
    char *body = s_recv (subscriber); 
    //Ignore the topic, the body contains serialized MessageA 
    
  • Подписаться на все сообщений и создать прокси-сервер в клиентском процессе, который публикует сообщения в сокетах INPROC. Отдельные подписчики должны использовать zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, ...); для подписки на сообщения одного типа. (см. Клиент 3).

    void *frontend = zsocket_new (context, ZMQ_SUB); 
    zsocket_connect (frontend, "tcp://<....>"); 
    void *backend = zsocket_new (context, ZMQ_PUB); 
    zsocket_bind (backend, "inproc://all_messages"); 
    // Start the proxy 
    zmq_proxy (frontend, backend, NULL); 
    ... 
    //On another thread 
    void *sub1 = zsocket_new (context, ZMQ_SUB); 
    zsocket_connect (sub1, "inproc://all_messages"); 
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "A", 1); 
    ... 
    void *sub2 = zsocket_new (context, ZMQ_SUB); 
    zsocket_connect (sub2, "inproc://all_messages"); 
    zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "B", 1); 
    

Вы также можете найти полезные XPUB/XSub розетки для различных трюков. См., Например, Last Value Caching

+0

Отличный ответ, спасибо. Есть ли способ добиться такого поведения с помощью запроса/ответа? –

+0

@DerekGreer Фильтрация тем доступна только для SUB-сокетов, поэтому будет работать только вариант №1. Я бы сказал, что шаблон RPC более естественен для сокетов REQ/REP. И можно использовать одну из схем сериализации для уровня протокола RPC, используя ZeroMQ в качестве транспорта. Например, посмотрите мой проект ThriftZMQ на GitHub: https://github.com/thriftzmq/thriftzmq-java – Wildfire