У меня есть сервер (разъем ROUTER), который связывает и позволяет одному клиенту (разъему DEALER) подключиться к нему. Затем сервер начинает отправлять данные.ZeroMq Router молчает сообщения
В идеале, я хотел бы знать, когда маршрутизатор достигает настройки hwm и начинает отбрасывать сообщения. Я установил ZMQ_ROUTER_MANDATORY в 1 на маршрутизаторе, но это тоже не помогает. Маршрутизатор продолжает сообщать, что сообщения отправляются, хотя я намеренно не запускал клиент (isAlive = false, поэтому на другом конце нет ничего, что могло бы вытащить эти сообщения).
Я делаю что-то неправильно или настройка HWM просто ненадежна в гнезде ROUTER?
Я использую jeromq версии 0.3.1 с JDK 1.6.0_32 на Windows 7 64-битной
Благодаря
public final class SenderSocket implements Runnable{
private final int total;
private final int sentHwm;
private final String address;
private final Socket sendSocket;
private final ExecutorService executor;
private final static String NAME = SenderSocket.class.getSimpleName();
private final static Logger LOGGER = LoggerFactory.getLogger(NAME);
public SenderSocket(ZContext context, String address, int sentHwm, int total){
this.address = address;
this.total = total;
this.sentHwm = sentHwm;
this.sendSocket = context.createSocket(ZMQ.ROUTER);
this.executor = Executors.newSingleThreadExecutor();
}
public void init(){
sendSocket.setSndHWM(sentHwm);
sendSocket.setRouterMandatory(true);
sendSocket.bind(address);
executor.execute(this);
LOGGER.info("ROUTER configured with HWM {} bound to {}.", sentHwm, address);
}
@Override
public void run(){
for(int i =0; i <total; i++){
try{
String item = new StringBuilder(8).append(i).toString();
boolean result = sendSocket.send(item);
LOGGER.info("SENT>> [{}] [{}]", result, item);
}catch(ZMQException zmqEx){
int errorCode = zmqEx.getErrorCode();
if(ZError.EHOSTUNREACH == errorCode){
LOGGER.warn("Attempted to send message to but dealer is DOWN!");
}
if(ZMQ.Error.ETERM.getCode() == errorCode){
LOGGER.error("Received error code [{}], terminating.");
stop();
}
LOGGER.error("ZMQException while sending message.", zmqEx);
}catch(Exception ex){
LOGGER.error("Exception while sending message.", ex);
}
}
stop();
}
public void stop(){
sendSocket.setLinger(0);
}
}
// КЛИЕНТ
public class ReceiverSocket implements Runnable{
private final int hwm;
private final String address;
private final Socket recvSocket;
private final ExecutorService executor;
private volatile boolean isAlive;
private final static String NAME = ReceiverSocket.class.getSimpleName();
private final static Logger LOGGER = LoggerFactory.getLogger(NAME);
public ReceiverSocket(ZContext context, String address, int hwm){
this.address = address;
this.hwm = hwm;
this.recvSocket = context.createSocket(ZMQ.DEALER);
this.executor = Executors.newSingleThreadExecutor();
}
public void init(){
this.isAlive = false;
recvSocket.setRcvHWM(hwm);
recvSocket.connect(address);
executor.execute(this);
LOGGER.info("DEALER configured with HWM {} connected to {}.", hwm, address);
}
@Override
public void run(){
Poller poller = new Poller(1);
poller.register(recvSocket, Poller.POLLIN);
while( isAlive){
try{
int pollCount = poller.poll();
if(pollCount == NEGATIVE_ONE){
LOGGER.warn("ERROR! Was the thread interrupted?", pollCount);
isAlive = false;
return;
}
if(poller.pollin(ZERO)){
String data = recvSocket.recvStr();
LOGGER.info("RECVD >> {} {}", data, NEWLINE);
}
}catch(Exception e){
LOGGER.error("Exception while receving message.", e);
}
}
}
public void stop(){
recvSocket.setLinger(0);
LOGGER.info("{} Stopped!", NAME);
}
}
// ОСНОВНАЯ
public static void main(String[ ] args) throws InterruptedException{
int recvHwm = 5;
int sentHwm = 5;
int totalSent = 5000;
String address = "tcp://*:20000";
ZContext context = new ZContext(1);
ReceiverSocket recvr = new ReceiverSocket(context, address, recvHwm);
SenderSocket sender = new SenderSocket(context, address, sentHwm, totalSent);
recvr.init();
Thread.sleep(1000);
sender.init();
}
Благодаря Raffian, после установления личности (не могу поверить, что я уже забыл) я получить исключение, если одноранговый узел не подключен. Есть ли способ узнать, удаляет ли ROUTER сообщения, если достигнута его передача HWM?(при условии, что DEALER подключен, но не тянет сообщения вообще). – CaptainHastings
Функциональность для обнаружения, когда ROUTER превышает HWM, не существует, см. Это: http://grokbase.com/t/zeromq/zeromq-dev/119t6vb1yq/logging-hwm-events – raffian
Еще раз спасибо. Я только что обнаружил в 0.3.1, что ROUTER будет фактически блокироваться, если setRouterMandatory (true) установлен и hwm достигнут. Без настройки он работает как задокументированный и отбрасывает сообщения. – CaptainHastings