2015-09-09 2 views
1

У меня возникла проблема с JMS Connection stop() и start(). Простой Java программа, иллюстрирующая то же:JMS Соединение, отправляющее сообщения, отправленные в очередь при остановленном соединении

public class Test { 
    static Connection producerConn = null; 
    static BufferedWriter consumerLog = null; 
    static BufferedWriter producerLog = null; 

    public static final void main(String[] args) throws Exception { 
     ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)"); 
     producerConn = cf.createConnection(); 

    producerLog = new BufferedWriter(new FileWriter("produced.log")); 
    consumerLog = new BufferedWriter(new FileWriter("consumed.log")); 

    new Thread(new Runnable() { 
     public void run() { 
      try { 
       producerConn.start(); 
       Session session = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); 
       Queue queue = session.createQueue("SampleQ1"); 
       MessageProducer producer = session.createProducer(queue); 
       Random random = new Random(); 
       byte[] messageBytes = new byte[1024]; 

       for (int i = 0; i < 100; i++) { 
        random.nextBytes(messageBytes); 
        Message message = session.createObjectMessage(messageBytes); 
        producer.send(message); 
        Thread.currentThread().sleep(10); 
        producerLog.write(message.getJMSMessageID()); 
        producerLog.newLine(); 
        producerLog.flush(); 
       } 
       System.out.println("Produced 100000 messages..."); 
       producerLog.close(); 
      } 
      catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    }).start(); 

    System.out.println("Started producer..."); 


    new Thread(new Runnable() { 
     public void run() { 
      int count = 0; 
      try { 
       producerConn.start(); 
       Session session = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); 
       Queue queue = session.createQueue("SampleQ1"); 
       MessageConsumer consumer = session.createConsumer(queue); 
       consumer.setMessageListener(new Test().new MyListener()); 
      } 
      catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    }).start(); 

    System.out.println("Started consumer..."); 
} 

private class MyListener implements MessageListener{ 
    private int count = 0; 

    public void onMessage(Message message) { 
     try { 
      message.acknowledge(); 
      System.out.println("count is " +count++); 
      if(count == 5){ 
       producerConn.stop(); 
       System.out.println("Sleeping Now for 5 seconds. . ." +System.currentTimeMillis()); 
       Thread.currentThread().sleep(5000); 
       producerConn.start(); 
      } 
      System.out.println("Waking up . . ." +System.currentTimeMillis()); 
      consumerLog.write(message.getJMSMessageID()); 
      consumerLog.newLine(); 
      consumerLog.flush(); 
     } 
     catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
} 

}

Моя идея заключается в том, чтобы имитировать остановку подключения() и начать(). Поэтому в потребительском потоке после вызова stop() я установил спать 5 секунд. Однако в то же время продюсерский поток продолжает работу по отправке сообщения в очередь.

Я ожидал, что тест просто поглотит только сообщение до того, как потребитель вызовет stop() и после того, как он снова вызовет start() после пробуждения из сна. Но вот что происходит здесь, когда потребитель просыпается, он считывает все сообщения с сервера, даже те, которые были отправлены в очередь, когда прием сообщений пользователя был остановлен.

Я делаю что-то не так здесь?

ответ

0

Нет ничего плохого, это правильное поведение. В асинхронных почтовых серверах производитель и потребитель слабо развязаны. Производителю все равно, потребляет ли потребитель сообщения или нет. Он продолжает помещать сообщения в очередь, пока потребитель может опуститься или перестанет потреблять сообщения или активно использует сообщения.

Метод connection.stop() не влияет на производителя. Он влияет только на метод consumer, stop() приостанавливает доставку сообщений от JMS-провайдера потребителю, в то время как метод start() запускает/возобновляет доставку сообщений.

Надеюсь, это помогло.

+0

Да Шаши. Это правильно для продюсера. Это не мешает продюсеру создавать сообщения. Но моя забота была о Потребителе. Потребитель не должен принимать сообщения, отправленные в очередь, когда соединение было остановлено. Не следует ли просто отбрасывать сообщения, отправленные в очередь при соединении? – piy26

+0

Нет, JMS-провайдер не будет отбрасывать сообщения, и это тоже неправильно. Это ваше потребительское приложение, которое должно заботиться о том, чтобы отбрасывать сообщения, которые имеют временную метку старше, чем потребительское соединение. – Shashi

+0

Хорошо. Спасибо Шаши за ответ. Но у меня есть еще один вопрос. Когда я изменяю один и тот же пример для создания Publisher/Subscriber inplace of Producer/Consumer, я наблюдаю то же поведение. Я имею в виду даже без создания DurableSubscriber, я наблюдаю поведение, подобное Durable Subscriber. Я имею в виду, что мой абонент получает даже сообщения, которые были опубликованы, когда соединение было остановлено. – piy26

Смежные вопросы