У меня возникла проблема с JMS Connection stop() и start(). Простой Java программа, иллюстрирующая то же:JMS Соединение, отправляющее сообщения, отправленные в очередь при остановленном соединении
public class Test {
static Connection producerConn = null;
static BufferedWriter consumerLog = null;
static BufferedWriter producerLog = null;
public static final void main(String[] args) throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
producerConn = cf.createConnection();
producerLog = new BufferedWriter(new FileWriter("produced.log"));
consumerLog = new BufferedWriter(new FileWriter("consumed.log"));
new Thread(new Runnable() {
public void run() {
try {
producerConn.start();
Session session = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("SampleQ1");
MessageProducer producer = session.createProducer(queue);
Random random = new Random();
byte[] messageBytes = new byte[1024];
for (int i = 0; i < 100; i++) {
random.nextBytes(messageBytes);
Message message = session.createObjectMessage(messageBytes);
producer.send(message);
Thread.currentThread().sleep(10);
producerLog.write(message.getJMSMessageID());
producerLog.newLine();
producerLog.flush();
}
System.out.println("Produced 100000 messages...");
producerLog.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}).start();
System.out.println("Started producer...");
new Thread(new Runnable() {
public void run() {
int count = 0;
try {
producerConn.start();
Session session = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("SampleQ1");
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new Test().new MyListener());
}
catch (Exception e) {
e.printStackTrace();
}
}
}).start();
System.out.println("Started consumer...");
}
private class MyListener implements MessageListener{
private int count = 0;
public void onMessage(Message message) {
try {
message.acknowledge();
System.out.println("count is " +count++);
if(count == 5){
producerConn.stop();
System.out.println("Sleeping Now for 5 seconds. . ." +System.currentTimeMillis());
Thread.currentThread().sleep(5000);
producerConn.start();
}
System.out.println("Waking up . . ." +System.currentTimeMillis());
consumerLog.write(message.getJMSMessageID());
consumerLog.newLine();
consumerLog.flush();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
Моя идея заключается в том, чтобы имитировать остановку подключения() и начать(). Поэтому в потребительском потоке после вызова stop() я установил спать 5 секунд. Однако в то же время продюсерский поток продолжает работу по отправке сообщения в очередь.
Я ожидал, что тест просто поглотит только сообщение до того, как потребитель вызовет stop() и после того, как он снова вызовет start() после пробуждения из сна. Но вот что происходит здесь, когда потребитель просыпается, он считывает все сообщения с сервера, даже те, которые были отправлены в очередь, когда прием сообщений пользователя был остановлен.
Я делаю что-то не так здесь?
Да Шаши. Это правильно для продюсера. Это не мешает продюсеру создавать сообщения. Но моя забота была о Потребителе. Потребитель не должен принимать сообщения, отправленные в очередь, когда соединение было остановлено. Не следует ли просто отбрасывать сообщения, отправленные в очередь при соединении? – piy26
Нет, JMS-провайдер не будет отбрасывать сообщения, и это тоже неправильно. Это ваше потребительское приложение, которое должно заботиться о том, чтобы отбрасывать сообщения, которые имеют временную метку старше, чем потребительское соединение. – Shashi
Хорошо. Спасибо Шаши за ответ. Но у меня есть еще один вопрос. Когда я изменяю один и тот же пример для создания Publisher/Subscriber inplace of Producer/Consumer, я наблюдаю то же поведение. Я имею в виду даже без создания DurableSubscriber, я наблюдаю поведение, подобное Durable Subscriber. Я имею в виду, что мой абонент получает даже сообщения, которые были опубликованы, когда соединение было остановлено. – piy26