В проекте я использую ActiveMQ для обработки относительно большого количества сообщений. Для этой цели есть очередь, fooQueue
, которая содержит сообщения для обработки.Только один сеанс ActiveMQ обрабатывает сообщения
Два экземпляра приложения обрабатывают сообщения из этой очереди, используя Spring JMS. У меня есть DefaultMessageListenerContainer
, созданный следующим образом.
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setDestinationName(QUEUE_NAME);
container.setMessageListener(myMessageListener);
container.setConcurrency(getProperty("concurrency"));
container.setSessionTransacted(true);
container.setErrorHandler(new ErrorHandler());
return container;
Глядя на AMQ веб-консоли, я могу подтвердить, что правильное число потребителей и сессий созданы (сеанс/потребитель). Тем не менее, похоже, что одна сессия выполняет большую часть работы, что приводит к тому, что приложение время от времени замораживается.
Эта сессия завершает и отменяет большинство сообщений, все остальные гораздо меньше, чем ее. Если я перезапущу один из этих двух экземпляров приложения, один сеанс в экземпляре приложения забирает работу и ведет себя одинаково.
Помимо проверки, блокируется ли myMessageListener
, есть ли что-нибудь еще, что я могу сделать?
Как определить правило организации сделки? У меня была аналогичная проблема, при которой отправка кучи сообщений блокировала чтение сообщений с использованием того же соединения, что и при отправке, оказалось, что отправка сообщения была выполнена в одной большой и длительной транзакции. – Tobb
Хороший намек. Мне нужно будет проверить это, но я уже видел несколько потоков, ожидающих совершения транзакций. Как вы решили проблему? Отдельные соединения для приема и отправки? –
Я решил, что одна транзакция на сообщение отправляется вместо одного большого. Сделки также сделали некоторые вещи JPA. – Tobb