Я пытаюсь использовать NetMQ (3.3.3.4) и создавать шаблон pub-sub.Не удается получить шаблон pub-sub NetMQ для работы с ReceiveReady
Я хочу, чтобы хост/сервер прослушивал все входящие данные на одном порту (9000) и пересылал данные всем клиентам/абонентам на другом порту (9001).
клиенты будут затем посылать данные на 9000 и получать все сообщения, отправленные (кем бы) на 9001.
После документации я создал что-то вроде кода ниже, но я не могу заставить его работать. В основном, я считаю, потому что ReceiveReady
никогда не звонил!
Как я считаю, что это должно работать:
client.Publish
должен вызвать первую строчку вhost.SubscriberSocket_ReceiveReady
, чтобы разблокировать и передать данные вместе с другим сокетом- Когда данные будут переданы по ней должен появиться в бесконечное погонных
Task
в клиенте
Результаты:
- Точки останова на
// This line is never reached
не достигнуты - Исключений нет нигде.
- Переключение портов на хосте так, чтобы опубликовать = 9000 и подписываться = 9001 не имеет никакого эффекта
- брандмауэр Windows выключен, так что не должно быть никакой блокировки
- Это не имеет никакого значения, если я ставлю адрес в
PublisherSocket
конструктора, или если я использую_publisherSocket.Bind(address)
в хост или_publisherSocket.Connect(address)
в клиенте
Что я делаю неправильно?
Хост
public class MyNetMQHost {
private NetMQSocket _publishSocket;
private NetMQSocket _subscribeSocket;
private NetMQPoller _poller;
public MyNetMQHost(string publishAddress = "@tcp://localhost:9001", string subscribeAddress = "@tcp://localhost:9000") {
Task.Factory.StartNew(() => {
using (_publishSocket = new PublisherSocket(publishAddress))
using (_subscribeSocket = new SubscriberSocket(subscribeAddress))
using (_poller = new NetMQPoller { _publishSocket, _subscribeSocket }) {
_subscriberSocket.ReceiveReady += SubscriberSocket_ReceiveReady;
_poller.Run();
}
});
}
private void SubscriberSocket_ReceiveReady(object sender, NetMQSocketEventArgs e) {
var data = e.Socket.ReceiveMultipartBytes(); // This line is never reached
_publishSocket.SendMultipartBytes(data);
}
}
Client
public class MyNetMQClient {
private readonly NetMQSocket _publishSocket;
private readonly NetMQSocket _subscribeSocket;
public MyNetMQClient(string publishAddress = ">tcp://localhost:9000", string subscribeAddress = ">tcp://localhost:9001") {
_publishSocket = new PublisherSocket(publishAddress);
_subscribeSocket = new SubscriberSocket(subscribeAddress);
Task.Factory.StartNew(() => {
while (true) {
byte[] frameBytes = _subscribeSocket.ReceiveFrameBytes();
int one = 1; // This line is never reached
}
});
}
public void Publish(byte[] data) {
_publishSocket.SendFrame(data);
}
}
тестер
public class Tester {
public void MyTester() {
MyNetMQHost host = new MyNetMQHost();
MyNetMQClient client = new MyNetMQClient();
client.Publish(Encoding.Unicode.GetBytes("Hello world!");
}
}
После прочтения документации о XSub/XPub и некоторых проб и ошибок, я думаю, что у меня это получилось! Благодаря :) – GTHvidsten