2016-10-23 2 views
0

Я пытаюсь использовать NetMQ (3.3.3.4) и создавать шаблон pub-sub.Не удается получить шаблон pub-sub NetMQ для работы с ReceiveReady

Я хочу, чтобы хост/сервер прослушивал все входящие данные на одном порту (9000) и пересылал данные всем клиентам/абонентам на другом порту (9001).

клиенты будут затем посылать данные на 9000 и получать все сообщения, отправленные (кем бы) на 9001.

После документации я создал что-то вроде кода ниже, но я не могу заставить его работать. В основном, я считаю, потому что ReceiveReady никогда не звонил!

Как я считаю, что это должно работать:

  • client.Publish должен вызвать первую строчку в host.SubscriberSocket_ReceiveReady, чтобы разблокировать и передать данные вместе с другим сокетом
  • Когда данные будут переданы по ней должен появиться в бесконечное погонных Task в клиенте

Результаты:

  • Точки останова на // This line is never reached не достигнуты
  • Исключений нет нигде.
  • Переключение портов на хосте так, чтобы опубликовать = 9000 и подписываться = 9001 не имеет никакого эффекта
  • брандмауэр Windows выключен, так что не должно быть никакой блокировки
  • Это не имеет никакого значения, если я ставлю адрес в PublisherSocket конструктора, или если я использую _publisherSocket.Bind(address) в хост или _publisherSocket.Connect(address) в клиенте

Что я делаю неправильно?

Хост

public class MyNetMQHost { 

    private NetMQSocket _publishSocket; 
    private NetMQSocket _subscribeSocket; 
    private NetMQPoller _poller; 

    public MyNetMQHost(string publishAddress = "@tcp://localhost:9001", string subscribeAddress = "@tcp://localhost:9000") { 
     Task.Factory.StartNew(() => { 
      using (_publishSocket = new PublisherSocket(publishAddress)) 
      using (_subscribeSocket = new SubscriberSocket(subscribeAddress)) 
      using (_poller = new NetMQPoller { _publishSocket, _subscribeSocket }) { 
       _subscriberSocket.ReceiveReady += SubscriberSocket_ReceiveReady; 
       _poller.Run(); 
      } 
     }); 
    } 

    private void SubscriberSocket_ReceiveReady(object sender, NetMQSocketEventArgs e) { 
     var data = e.Socket.ReceiveMultipartBytes(); // This line is never reached 
     _publishSocket.SendMultipartBytes(data); 
    } 
} 

Client

public class MyNetMQClient { 

    private readonly NetMQSocket _publishSocket; 
    private readonly NetMQSocket _subscribeSocket; 

    public MyNetMQClient(string publishAddress = ">tcp://localhost:9000", string subscribeAddress = ">tcp://localhost:9001") { 
     _publishSocket = new PublisherSocket(publishAddress); 
     _subscribeSocket = new SubscriberSocket(subscribeAddress); 

     Task.Factory.StartNew(() => { 
      while (true) { 
       byte[] frameBytes = _subscribeSocket.ReceiveFrameBytes(); 
       int one = 1; // This line is never reached 
      } 
     }); 
    } 

    public void Publish(byte[] data) { 
     _publishSocket.SendFrame(data); 
    } 
} 

тестер

public class Tester { 
    public void MyTester() { 
     MyNetMQHost host = new MyNetMQHost(); 
     MyNetMQClient client = new MyNetMQClient(); 

     client.Publish(Encoding.Unicode.GetBytes("Hello world!"); 
    } 
} 

ответ

2

И ваш брокер и клиент никогда не называют suscr IBE. На брокерском вызове suscriber.Subscribe (""), чтобы подписаться на всех. На своем клиенте подписаться на то, что вы хотите.

В вашем брокере вы должны использовать XSubscriber и XPublisher для перемещения подозрительных объектов. Таким образом вам не нужно подписывать все. Вы можете использовать для этого класс Proxy.

+0

После прочтения документации о XSub/XPub и некоторых проб и ошибок, я думаю, что у меня это получилось! Благодаря :) – GTHvidsten