С RabbitMq подписки на несколько очередей выполняются по одному соединению через каналы. Это позволяет использовать один сокет для обработки нескольких подписей. Какова аналоговая стратегия минимизации соединений сокетов для нескольких типов сообщений в Zero MQ?Как создать несколько типов сообщений через одно соединение с 0MQ?
ответ
Я бы сказал, что конверты сообщений PUB/SUB являются предпочтительным способом решения этой проблемы в ZeroMQ (см. Pub-Sub Message Envelopes).
Перед отправкой сообщения, издатель должен отправить свою тему в конверте:
s_sendmore (publisher, "A");
s_send (publisher, <Serialized MessageA>);
.....
s_sendmore (publisher, "B");
s_send (publisher, <Serialized MessageB>);
Suscriber имеет 3 варианта для обработки нескольких тем:
Прочитать сообщение тип из конверта, используйте десериализатор в зависимости от типа сообщения (Клиент 1 на рисунке).
char *topic = s_recv (subscriber); char *body = s_recv (subscriber); //If topic == "A", the body contains serialized MessageA, otherwise MessageB
Подписаться только на одной теме с
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, ...);
. В этой ситуации ZeroMQ выполняет PUB стороне фильтрации сообщений (начиная с версии 3.x), и клиент может быть уверен, что он будет получать только сообщения типа MessageA (Client 2 на рисунке)char *topic = s_recv (subscriber); char *body = s_recv (subscriber); //Ignore the topic, the body contains serialized MessageA
Подписаться на все сообщений и создать прокси-сервер в клиентском процессе, который публикует сообщения в сокетах
INPROC
. Отдельные подписчики должны использоватьzmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, ...);
для подписки на сообщения одного типа. (см. Клиент 3).void *frontend = zsocket_new (context, ZMQ_SUB); zsocket_connect (frontend, "tcp://<....>"); void *backend = zsocket_new (context, ZMQ_PUB); zsocket_bind (backend, "inproc://all_messages"); // Start the proxy zmq_proxy (frontend, backend, NULL); ... //On another thread void *sub1 = zsocket_new (context, ZMQ_SUB); zsocket_connect (sub1, "inproc://all_messages"); zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "A", 1); ... void *sub2 = zsocket_new (context, ZMQ_SUB); zsocket_connect (sub2, "inproc://all_messages"); zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "B", 1);
Вы также можете найти полезные XPUB/XSub розетки для различных трюков. См., Например, Last Value Caching
Отличный ответ, спасибо. Есть ли способ добиться такого поведения с помощью запроса/ответа? –
@DerekGreer Фильтрация тем доступна только для SUB-сокетов, поэтому будет работать только вариант №1. Я бы сказал, что шаблон RPC более естественен для сокетов REQ/REP. И можно использовать одну из схем сериализации для уровня протокола RPC, используя ZeroMQ в качестве транспорта. Например, посмотрите мой проект ThriftZMQ на GitHub: https://github.com/thriftzmq/thriftzmq-java – Wildfire