2016-09-19 2 views
0

Iam создание приложения с использованием activeMQ, где у меня есть производитель и потребитель. В потребительском iam с помощью MessageListener для асинхронного прослушивания сообщений от производителя, который выполняется с помощью метода, называемого onMessage (Message message). Но перед употреблением сообщений я хочу выполнить проверку состояния и затем использовать сообщения. Я не хочу использовать синхронное потребление сообщения, потому что это будет против моего дизайна.Как приостановить и возобновить асинхронное потребление сообщений JMS

void initialize() throws JMSException { 
      this.connection = this.connectionFactory.createConnection(); 
      this.connection.start(); 
      final Session session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      final Destination destination = session.createQueue("testQ"); 
      this.consumer = session.createConsumer(destination); 
      this.consumer.setMessageListener(this); 
} 

Проверить состояние здесь как обнаружение подключения к Интернету и т.д.

public void onMessage(final Message message) { 
     Preconditions.checkNotNull(message); 

     if (!(message instanceof TextMessage)) { 
      _LOG.error("The message is not of type TextMessage but of type {} so we could not process", message.getClass().getSimpleName()); 
      throw new IllegalArgumentException("This type '" + message.getClass().getSimpleName() + "' of message could not been handled"); 
     } 

     try { 
      final String messageType = message.getStringProperty("messageType"); 
      Preconditions.checkNotNull(messageType); 
      _LOG.info("The MessageType is {}", messageType); 

      final String msg = ((TextMessage) message).getText(); 
      Preconditions.checkNotNull(msg); 
      _LOG.debug(msg); 

      process(messageType, msg); 
     } catch (final JMSException e) { 
      _LOG.error("We could not read the message", e); 
     } 
    } 

Любой пример кода, было бы здорово.

ответ

1

Непонятно, почему вы хотите делать проверки, потому что, если есть какие-либо проблемы с подключением, ваше приложение будет уведомлено о такой ошибке. Вы можете настроить ExceptionListener для JMS Connection, который будет вызван, если возникнет проблема с подключением.

OnMessage не будет вызываться, если есть проблема с подключением.

Также я хотел бы нажать метод метода connection.start() после настройки прослушивателя сообщений для пользователя. Вызов connection.start() указывает поставщику сообщений, что приложение готово для приема сообщений.

Существует подключение.стоп() для приостановки/остановки доставки сообщения. Вы можете повторно отправить connection.start(), чтобы возобновить доставку сообщений.


Обновление на основе вашего ответа

Я предлагаю вам использовать сеанс с клиентом режим признать.

final Session session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE) 

Тогда в onMessage методы, если нет подключения к Интернету, не признает сообщение. Сообщение будет доставлено снова, если оно не истекло.

+0

Благодарность за reply.Right Я опишу свой сценарий. Возьмите сообщение, которое я потребляю из очереди activemq, я хочу отправить его в веб-службу и перед отправкой в ​​веб-службу iam, проверяющей наличие интернет-соединения или нет. Если интернет-соединение отсутствует, я хочу пауза, потребляющая сообщения из MessageListener, который в конечном итоге истекает в activemq из-за истечения срока действия (который соответствует моему дизайну), и если интернет-соединение вверх, я отправляю сообщения в webservice. Можете мне помочь, отредактировав мой код onMessage, который я разместил выше. –

+0

общественного недействительными OnMessage (последнее сообщение Message) { если (isNetavailable() == ложь) \t \t { \t \t \t попробовать { \t \t \t \t this.connection.stop(); \t \t \t} задвижка (JMSException е) { \t \t \t \t e.printStackTrace(); \t \t \t} \t \t} \t \t еще если (isNetavailable() == TRUE) \t \t { \t \t \t попробовать { \t \t \t \t this.connection.start(); \t \t \t} задвижка (JMSException е1) { \t \t \t \t e1.printStackTrace(); \t \t \t} \t \t} } Мой вопрос здесь после того, как я призываю connection.stop() в OnMessage, если нет подключения к Интернету, то будет ли OnMessage быть вызван в следующий раз, чтобы выполнить connection.start() –

+0

БЩЕСТВЕННЫЕ void onMessage (окончательное сообщение сообщения) { if (isReachable == false) {return;} else –

1

В асинхронном обмене сообщениями вы всегда можете проверить состояние перед обработкой сообщения. Как только сообщение JMS будет получено в вашем коде прослушивателя (т. Е. Метод OnMessage), вы должны дать дальнейшее направление. Я решил такую ​​проблему, когда хотел проверить сообщение JMS с моим содержимым db.

проверить подключение к Интернету жив, делая URLConnection к вебсервис URL-адрес что-то вроде ниже:

public static boolean isReachable(String targetUrl) throws IOException 
{ 
    try { 
    HttpURLConnection httpUrlConnection = (HttpURLConnection) new URL(
     targetUrl).openConnection(); 
    httpUrlConnection.setRequestMethod("HEAD"); 


    int responseCode = httpUrlConnection.getResponseCode(); 

    return responseCode == HttpURLConnection.HTTP_OK; 
} catch (UnknownHostException noInternetConnection) 
    { 
    return false; 
    } 
} 

, а затем в вашем методе OnMessage вызвать метод, как

public void onMessage(final Message message) { 
    if(isReachable){ 
    Preconditions.checkNotNull(message); 

    if (!(message instanceof TextMessage)) { 
     _LOG.error("The message is not of type TextMessage but of type {} so we could not process", message.getClass().getSimpleName()); 
     throw new IllegalArgumentException("This type '" + message.getClass().getSimpleName() + "' of message could not been handled"); 
    } 

    try { 
     final String messageType = message.getStringProperty("messageType"); 
     Preconditions.checkNotNull(messageType); 
     _LOG.info("The MessageType is {}", messageType); 

     final String msg = ((TextMessage) message).getText(); 
     Preconditions.checkNotNull(msg); 
     _LOG.debug(msg); 

     process(messageType, msg); 
    } catch (final JMSException e) { 
     _LOG.error("We could not read the message", e); 
    } 
    } 
} 
+0

Запрос об AUTOACKNOWLEDGE.Iam с использованием AUTOACKNOWLEDGE &, как вы видите в Onmessage, есть процесс метода (messageType, msg). Если процесс() выдает исключение, будет активирована функция activeMQ для повторного отправки одного и того же сообщения? В моем случае предположим, что iam отправка сообщения процессу() и нет Интернета, что сообщение отправляется в webservice внутри process() 7 раз с разницей в 1 секунду, если я получаю сообщение об ошибке ConnectionRefused Exception или 404 и т. д. ie process (messageType, msg) вызывается 7 раз. Но если я получаю процесс 200OK (messageType, msg), вызывается один раз. Почему это и что мне нужно сделать, чтобы избежать этого. –

+0

Является ли activeMQ ожидающим автоматического подтверждения от OnMessage, которое не получается из-за исключений в процессе (messageType, msg), где процесс возвращает Retrofit.client.Response.И есть причина, по которой он пытается отправить сообщения 7 раз. –

Смежные вопросы