2016-01-22 4 views
0

Я использую встроенный брокер ActiveMQ. Моя цель - найти способ обнаружения, когда внешний производитель в очереди потерял свое соединение.Как получить уведомление о прекращении соединения производителя?

Я начинаю брокера, как это:

BrokerService broker = new BrokerService(); 
broker.addConnector("tcp://" + LISTEN_DEVICE_IP + ":" + port); 
setLastMessagesPersistent(broker); 
broker.start(); 

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); 
connection = factory.createConnection(); 
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

connection.start(); 

И после этого я попытался добавить TransportListener:

((ActiveMQConnection) connection).addTransportListener(new TransportListener() { 
    public void transportResumed() { 
     System.out.println("resumed"); 
    } 
    public void transportInterupted() { 
     System.out.println("interrupted"); 
    } 
    public void onException(IOException arg0) { 
     System.out.println("ioexception: " + arg0); 
    } 
    public void onCommand(Object arg0) { 
     System.out.println("command: " + arg0); 
    } 
}); 

Я также зарегистрировать потребителя и ProducerListener вроде этого:

Destination dest = session.createQueue(queuename); 
MessageConsumer consumer = session.createConsumer(dest); 

ProducerEventSource source = new ProducerEventSource(connection, dest); 
System.out.println("Setting Producer Listener"); 
source.setProducerListener(prodevent -> { 
    System.out.println("producer status: " + prodevent.isStarted()); 
}); 
// Gets called from inside the broker's Thread and somehow causes deadlocks if I don't invoke this from the outside 
new Thread(() -> { 
    try { 
     consumer.setMessageListener(new NetworkEventPlayerAdapter(objectMapper, event, gameEventManager, playerID)); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
}).start(); 

К сожалению, ни TransportListener, ни ProducerListener не дают мне y, когда я принудительно покидаю другое приложение, которое ранее было добавлено как производитель (Alt + F4). Брокер, конечно замечает, хотя:

WARN | Transport Connection to: tcp://127.0.0.1:58988 failed: java.net.SocketException: Connection reset 
WARN | Transport Connection to: tcp://127.0.0.1:58986 failed: java.net.SocketException: Connection reset 

Но я не найти способ, чтобы получить обратный вызов на этих событий в Java. Я также попробовал установить в брокере обычай IOExceptionHandler и добавить к соединению ExceptionListener. Они также никогда не звонят.

ответ

1

Вы можете использовать консультативные темы ActiveMQ.Advisory.Connection или даже ActiveMQ.Advisory.Producer.Queue ActiveMQ.Advisory.Producer.Topic, они предоставляют статистику по соединениям производителей, проверьте эту ссылку http://activemq.apache.org/advisory-message.html

+0

Спасибо, что решил это для меня. Прослушивание консультативного мероприятия с datastructure типа 'RemoveInfo' было именно тем, что мне было нужно. – Felk

0

возможный путь для анализа лог-выход для подключения сброса

WARN | Транспортное соединение с: tcp: //127.0.0.1: 58988 не удалось: java.net.SocketException: Сброс соединения WARN | Транспорт Подключение к: TCP: //127.0.0.1: 58986 неуспешно: сброс соединения

Поскольку сокет реализован в ActiveMQ вы бы добавить ExceptionListener там написать свой собственный: java.net.SocketException процедура обработки исключений ...

Смежные вопросы