У меня возникли некоторые проблемы, связанные с повторным использованием XAConnection и XASession для нескольких сотрудников в приложении JBoss. Мне удалось упростить проблему до одного метода. Он должен иметь возможность как Продукт и Потребитель сообщение с использованием того же соединения и сеанса. В настоящее время мое приложение имеет много очередей и рабочих, где каждый работник в настоящее время инициирует и запускает каждое собственное соединение и сеанс вместо того, чтобы делиться им. Разве это не возможно?HornetQ: как повторно использовать XAConnection и XASession
Вот мой пример кода:
import org.apache.log4j.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.jms.*;
import javax.jms.Queue;
import javax.naming.InitialContext;
@Singleton
@Startup
public class QueueTest {
private Logger logger = Logger.getLogger(QueueTest.class);
@PostConstruct
public void startup() {
try {
String queue = "queue/Queue1";
String message = "test";
//setting up connection
InitialContext iniCtx = new InitialContext();
XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA");
XAConnection connection = qcf.createXAConnection();
connection.start();
logger.debug("creating connection at " + new java.util.Date());
//setting up session
XASession session = connection.createXASession();
logger.debug("creating session at " + new java.util.Date());
//find the queue
Object queueObj = iniCtx.lookup(queue);
Queue jmsQueue = (javax.jms.Queue)queueObj;
//adding message to queue
javax.jms.MessageProducer producer = session.createProducer(jmsQueue);
javax.jms.TextMessage textMessage = session.createTextMessage(message);
producer.send(textMessage);
producer.close();
logger.debug("Message added to queue");
//receiving message from queue
javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue);
javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000);
if (messageReceived==null)
throw new Exception("No message reveived");
logger.debug("Got message:"+messageReceived.getText());
consumer.close();
}
catch(Exception e) {
logger.debug("Error: " + e.getMessage(), e);
}
}
@PreDestroy
public void shutdown() {
}
}
Это приводит к этому выводу:
11:47:17,905 DEBUG [QueueTest] (MSC service thread 1-8) creating connection at Thu Sep 05 11:47:17 CEST 2013
11:47:18,041 DEBUG [QueueTest] (MSC service thread 1-8) creating session at Thu Sep 05 11:47:18 CEST 2013
11:47:18,065 DEBUG [QueueTest] (MSC service thread 1-8) Message added to queue
11:47:23,081 DEBUG [QueueTest] (MSC service thread 1-8) Error: No message reveived
Как вы можете видеть, ни одно сообщение не принимается Потребителя. Зачем?
EDIT 1:
package dk.energimidt.uapi.zigbee.services;
import org.apache.log4j.Logger;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.jms.Queue;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.naming.InitialContext;
@TransactionAttribute(TransactionAttributeType.REQUIRED)
@Stateless
public class QueueTestWorkerBean implements QueueTestWorker {
private Logger logger = Logger.getLogger(QueueTestWorkerBean.class);
public void run() {
try {
String queue = "queue/Queue1";
String message = "test";
//setting up connection
InitialContext iniCtx = new InitialContext();
XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA");
XAConnection connection = qcf.createXAConnection();
connection.start();
logger.debug("creating connection at " + new java.util.Date());
//setting up session
XASession session = connection.createXASession();
logger.debug("creating session at " + new java.util.Date());
//find the queue
Object queueObj = iniCtx.lookup(queue);
Queue jmsQueue = (javax.jms.Queue)queueObj;
//adding message to queue
javax.jms.MessageProducer producer = session.createProducer(jmsQueue);
javax.jms.TextMessage textMessage = session.createTextMessage(message);
producer.send(textMessage);
producer.close();
session.commit();
logger.debug("Message added to queue");
//receiving message from queue
javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue);
javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000);
if (messageReceived==null)
throw new Exception("No message reveived");
logger.debug("Got message:"+messageReceived.getText());
consumer.close();
connection.close();
}
catch(Exception e) {
logger.debug("Error: " + e.getMessage(), e);
}
}
}
Теперь я получаю исключение на Session.Commit():
10:46:03,697 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating connection at Tue Sep 17 10:46:03 CEST 2013
10:46:04,343 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating session at Tue Sep 17 10:46:04 CEST 2013
10:46:04,355 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) Error: XA connection: javax.jms.TransactionInProgressException: XA connection
at org.hornetq.ra.HornetQRASession.commit(HornetQRASession.java:386)
at QueueTestWorkerBean.run(QueueTestWorkerBean.java:45) [library-1.0.0.jar:]
Спасибо за ваш ответ. Я постараюсь разобраться в этом. Не могли бы вы дать мне небольшой пример решения, описанного в части 1? – dhrm
У меня нет примера для использования в одном классе. вы должны искать аннотации транзакций на EE6 –
У меня есть сообщение, отредактированное с дополнительной информацией –