2013-11-22 3 views
3

У меня есть сервер (разъем ROUTER), который связывает и позволяет одному клиенту (разъему DEALER) подключиться к нему. Затем сервер начинает отправлять данные.ZeroMq Router молчает сообщения

В идеале, я хотел бы знать, когда маршрутизатор достигает настройки hwm и начинает отбрасывать сообщения. Я установил ZMQ_ROUTER_MANDATORY в 1 на маршрутизаторе, но это тоже не помогает. Маршрутизатор продолжает сообщать, что сообщения отправляются, хотя я намеренно не запускал клиент (isAlive = false, поэтому на другом конце нет ничего, что могло бы вытащить эти сообщения).

Я делаю что-то неправильно или настройка HWM просто ненадежна в гнезде ROUTER?

Я использую jeromq версии 0.3.1 с JDK 1.6.0_32 на Windows 7 64-битной

Благодаря

public final class SenderSocket implements Runnable{ 

    private final int total; 
    private final int sentHwm; 
    private final String address; 
    private final Socket sendSocket; 
    private final ExecutorService executor; 

    private final static String NAME  = SenderSocket.class.getSimpleName(); 
    private final static Logger LOGGER  = LoggerFactory.getLogger(NAME); 


    public SenderSocket(ZContext context, String address, int sentHwm, int total){ 
     this.address  = address; 
     this.total   = total; 
     this.sentHwm  = sentHwm; 
     this.sendSocket  = context.createSocket(ZMQ.ROUTER); 
     this.executor  = Executors.newSingleThreadExecutor(); 
    } 


    public void init(){ 

     sendSocket.setSndHWM(sentHwm); 
     sendSocket.setRouterMandatory(true); 
     sendSocket.bind(address); 

     executor.execute(this); 
     LOGGER.info("ROUTER configured with HWM {} bound to {}.", sentHwm, address); 

    } 



    @Override 
    public void run(){   

     for(int i =0; i <total; i++){  

      try{ 

       String item  = new StringBuilder(8).append(i).toString(); 
       boolean result = sendSocket.send(item); 

       LOGGER.info("SENT>> [{}] [{}]", result, item); 

      }catch(ZMQException zmqEx){ 

       int errorCode = zmqEx.getErrorCode(); 

       if(ZError.EHOSTUNREACH == errorCode){ 
        LOGGER.warn("Attempted to send message to but dealer is DOWN!"); 
       } 

       if(ZMQ.Error.ETERM.getCode() == errorCode){ 
        LOGGER.error("Received error code [{}], terminating."); 
        stop(); 
       } 

       LOGGER.error("ZMQException while sending message.", zmqEx); 

      }catch(Exception ex){ 
       LOGGER.error("Exception while sending message.", ex); 
      } 

     } 

     stop(); 

    } 


    public void stop(){ 
     sendSocket.setLinger(0); 
    } 


} 

// КЛИЕНТ

public class ReceiverSocket implements Runnable{ 

     private final int hwm; 
     private final String address; 
     private final Socket recvSocket; 
     private final ExecutorService executor; 

     private volatile boolean isAlive; 

     private final static String NAME  = ReceiverSocket.class.getSimpleName(); 
     private final static Logger LOGGER  = LoggerFactory.getLogger(NAME); 


     public ReceiverSocket(ZContext context, String address, int hwm){ 
      this.address  = address; 
      this.hwm   = hwm; 
      this.recvSocket  = context.createSocket(ZMQ.DEALER); 
      this.executor  = Executors.newSingleThreadExecutor(); 
     } 


     public void init(){ 

      this.isAlive = false; 

      recvSocket.setRcvHWM(hwm); 
      recvSocket.connect(address); 
      executor.execute(this); 

      LOGGER.info("DEALER configured with HWM {} connected to {}.", hwm, address); 

     } 



     @Override 
     public void run(){   

      Poller poller  = new Poller(1); 
      poller.register(recvSocket, Poller.POLLIN); 

      while( isAlive){  

       try{ 

        int pollCount  = poller.poll(); 

        if(pollCount == NEGATIVE_ONE){ 
         LOGGER.warn("ERROR! Was the thread interrupted?", pollCount); 
         isAlive = false; 
         return; 
        } 

        if(poller.pollin(ZERO)){ 
         String data = recvSocket.recvStr(); 
         LOGGER.info("RECVD >> {} {}", data, NEWLINE); 
        } 

       }catch(Exception e){ 
        LOGGER.error("Exception while receving message.", e); 
       } 

      } 

     } 


     public void stop(){ 
      recvSocket.setLinger(0); 
      LOGGER.info("{} Stopped!", NAME); 
     } 
} 

// ОСНОВНАЯ

public static void main(String[ ] args) throws InterruptedException{ 

     int recvHwm   = 5; 
     int sentHwm   = 5; 
     int totalSent  = 5000; 
     String address  = "tcp://*:20000"; 
     ZContext context  = new ZContext(1); 

     ReceiverSocket recvr = new ReceiverSocket(context, address, recvHwm); 
     SenderSocket sender = new SenderSocket(context, address, sentHwm, totalSent); 

     recvr.init(); 
     Thread.sleep(1000); 

     sender.init(); 

    } 

ответ

3

Router mandat ory и знак высокой воды не имеют ничего общего друг с другом.

Я поставил ZMQ_ROUTER_MANDATORY 1 на маршрутизаторе но не помогает. Маршрутизатор продолжает сообщать о том, что сообщения посылаются даже если я deliberatly не запустить клиент

маршрутизатор не будет поднимать исключение, даже если узлы не подключены к нему, если Вы не адресовать сообщение для конкретного ID клиента.

//#1 no exception raised here, message dropped silently 
rtr.setRouterMandatory(true) 
rtr.bind("tcp://*:20000") 
rtr.send("omg!") 

//#2 exception raised here 
rtr.setRouterMandatory(true) 
rtr.bind("tcp://*:20000") 
rtr.sendMore("client1") 
rtr.sendMore("") 
rtr.send("omg!") 

Пример кода # 2 бросает исключение, потому что вы рассказываете маршрутизатор для отправки «OMG» на равных с идентичностью client1. Сокеты маршрутизатора отслеживают все соединения, назначая случайный идентификатор каждому подключенному одноранговому узлу. Если маршрутизатор не имеет соединения для client1, или, если client1 ранее отключен, маршрутизатор будет создавать исключение в обоих случаях.

можно присвоить идентификатор на клиенте, чтобы переопределить маршрутизаторы рандомизации идентичности:

client.setIdentity("client1".getBytes()) 
client.connect("tcp://*:20000") 

Приведенный выше код останавливает гнездо маршрутизатора выбрасывать исключение в образце # 2

я предлагаю читать вверх на this, он объясняет адресацию и обертывание сообщений; понимание того, как это работает, имеет важное значение для использования розеточных сокетов.

+0

Благодаря Raffian, после установления личности (не могу поверить, что я уже забыл) я получить исключение, если одноранговый узел не подключен. Есть ли способ узнать, удаляет ли ROUTER сообщения, если достигнута его передача HWM?(при условии, что DEALER подключен, но не тянет сообщения вообще). – CaptainHastings

+0

Функциональность для обнаружения, когда ROUTER превышает HWM, не существует, см. Это: http://grokbase.com/t/zeromq/zeromq-dev/119t6vb1yq/logging-hwm-events – raffian

+0

Еще раз спасибо. Я только что обнаружил в 0.3.1, что ROUTER будет фактически блокироваться, если setRouterMandatory (true) установлен и hwm достигнут. Без настройки он работает как задокументированный и отбрасывает сообщения. – CaptainHastings

0

в питона, это сделать маршрутизатор рейз zmq.Again на таких сценариев:

self.context = zmq.Context() 
    self.socket = self.context.socket(zmq.ROUTER) 

    # make sure router doesn't just drop unroutable message 
    self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1) 

    # make sure when there is unroutable message to raise exception after timeout 
    timeout_sec = 2.5 
    timeout_milisec = int(timeout_sec * 1000) 
    self.socket.setsockopt(zmq.SNDTIMEO, timeout_milisec) 

см http://grokbase.com/t/zeromq/zeromq-dev/12aje3ya9t/zmq-router-mandatory-was-zmq-router-behavior

Смежные вопросы