2013-09-05 3 views
5

У меня возникли некоторые проблемы, связанные с повторным использованием XAConnection и XASession для нескольких сотрудников в приложении JBoss. Мне удалось упростить проблему до одного метода. Он должен иметь возможность как Продукт и Потребитель сообщение с использованием того же соединения и сеанса. В настоящее время мое приложение имеет много очередей и рабочих, где каждый работник в настоящее время инициирует и запускает каждое собственное соединение и сеанс вместо того, чтобы делиться им. Разве это не возможно?HornetQ: как повторно использовать XAConnection и XASession

Вот мой пример кода:

import org.apache.log4j.Logger; 
import javax.annotation.PostConstruct; 
import javax.annotation.PreDestroy; 
import javax.ejb.Singleton; 
import javax.ejb.Startup; 
import javax.jms.*; 
import javax.jms.Queue; 
import javax.naming.InitialContext; 

@Singleton 
@Startup 
public class QueueTest { 

    private Logger logger = Logger.getLogger(QueueTest.class); 

    @PostConstruct 
    public void startup() { 
     try { 
      String queue = "queue/Queue1"; 
      String message = "test"; 

      //setting up connection 
      InitialContext iniCtx = new InitialContext(); 
      XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA"); 
      XAConnection connection = qcf.createXAConnection(); 
      connection.start(); 
      logger.debug("creating connection at " + new java.util.Date()); 

      //setting up session 
      XASession session = connection.createXASession(); 
      logger.debug("creating session at " + new java.util.Date()); 

      //find the queue 
      Object queueObj = iniCtx.lookup(queue); 
      Queue jmsQueue = (javax.jms.Queue)queueObj; 

      //adding message to queue 
      javax.jms.MessageProducer producer = session.createProducer(jmsQueue); 
      javax.jms.TextMessage textMessage = session.createTextMessage(message); 
      producer.send(textMessage); 
      producer.close(); 
      logger.debug("Message added to queue"); 

      //receiving message from queue 
      javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue); 
      javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000); 

      if (messageReceived==null) 
       throw new Exception("No message reveived"); 

      logger.debug("Got message:"+messageReceived.getText()); 
      consumer.close(); 
     } 
     catch(Exception e) { 
      logger.debug("Error: " + e.getMessage(), e); 
     } 
    } 

    @PreDestroy 
    public void shutdown() { 

    } 
} 

Это приводит к этому выводу:

11:47:17,905 DEBUG [QueueTest] (MSC service thread 1-8) creating connection at Thu Sep 05 11:47:17 CEST 2013 
11:47:18,041 DEBUG [QueueTest] (MSC service thread 1-8) creating session at Thu Sep 05 11:47:18 CEST 2013 
11:47:18,065 DEBUG [QueueTest] (MSC service thread 1-8) Message added to queue 
11:47:23,081 DEBUG [QueueTest] (MSC service thread 1-8) Error: No message reveived 

Как вы можете видеть, ни одно сообщение не принимается Потребителя. Зачем?

EDIT 1:

package dk.energimidt.uapi.zigbee.services; 

import org.apache.log4j.Logger; 

import javax.ejb.Stateless; 
import javax.ejb.TransactionAttribute; 
import javax.ejb.TransactionAttributeType; 
import javax.jms.Queue; 
import javax.jms.XAConnection; 
import javax.jms.XAConnectionFactory; 
import javax.jms.XASession; 
import javax.naming.InitialContext; 

@TransactionAttribute(TransactionAttributeType.REQUIRED) 
@Stateless 
public class QueueTestWorkerBean implements QueueTestWorker { 

    private Logger logger = Logger.getLogger(QueueTestWorkerBean.class); 

    public void run() { 
     try { 
      String queue = "queue/Queue1"; 
      String message = "test"; 

      //setting up connection 
      InitialContext iniCtx = new InitialContext(); 
      XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA"); 
      XAConnection connection = qcf.createXAConnection(); 
      connection.start(); 
      logger.debug("creating connection at " + new java.util.Date()); 

      //setting up session 
      XASession session = connection.createXASession(); 
      logger.debug("creating session at " + new java.util.Date()); 

      //find the queue 
      Object queueObj = iniCtx.lookup(queue); 
      Queue jmsQueue = (javax.jms.Queue)queueObj; 

      //adding message to queue 
      javax.jms.MessageProducer producer = session.createProducer(jmsQueue); 
      javax.jms.TextMessage textMessage = session.createTextMessage(message); 
      producer.send(textMessage); 
      producer.close(); 
      session.commit(); 
      logger.debug("Message added to queue"); 

      //receiving message from queue 
      javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue); 
      javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000); 

      if (messageReceived==null) 
       throw new Exception("No message reveived"); 

      logger.debug("Got message:"+messageReceived.getText()); 
      consumer.close(); 

      connection.close(); 
     } 
     catch(Exception e) { 
      logger.debug("Error: " + e.getMessage(), e); 
     } 
    } 
} 

Теперь я получаю исключение на Session.Commit():

10:46:03,697 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating connection at Tue Sep 17 10:46:03 CEST 2013 
10:46:04,343 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating session at Tue Sep 17 10:46:04 CEST 2013 
10:46:04,355 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) Error: XA connection: javax.jms.TransactionInProgressException: XA connection 
    at org.hornetq.ra.HornetQRASession.commit(HornetQRASession.java:386) 
    at QueueTestWorkerBean.run(QueueTestWorkerBean.java:45) [library-1.0.0.jar:] 

ответ

2

Я вижу несколько (на самом деле 2) взлетов микса там:

я - вы используете XA Session, но вы не объявляя никаких границ транзакций ... которые, как правило, делается на Сеансные и МБР. Я не уверен, что вы можете сделать это на этом безгражданстве.

Если вы не используете декларативную транзакцию, вам придется заручиться XID вручную.

ii - jmsXA - это завод по умолчанию для подключения адаптера ресурсов. У него уже есть бассейн. Поэтому всякий раз, когда вы создаете новый сеанс, вы выходите из пула. Когда вы его закрываете, вы возвращаете его в бассейн.

Вы можете использовать обычную фабрику соединений. Только в InVMConnectionFactory (или независимо от того, что вы определили на своем автономном уровне, за пределами PooledConnectionFactories, предполагая, что вы находитесь на JBoss ... и затем просто используете обычную JMS.

Даже обычная фабрика соединений может использоваться с XA, но на этом случае вам нужно будет убедиться, что вы завербовать его с помощью API в Transaction Manager напрямую.

, если использовать обычную фабрику соединений, вы можете просто держать его подключен до тех пор, как вы хотите.

Пожалуйста, дайте мне знать, как это происходит, и я помогу вам. Я знаю, что вы начали щедрость.но я бы ответил на это бесплатно :)

Я не смог найти ни одного примера в учебнике EJB об использовании транзакций с Singleton.

Я бы порекомендовал вам использовать его через Statless или Stateful Session Bean, а затем применил @TransactionAttribute к Bean.

Java-ЭЭ +6 Учебник имеет хорошую информацию о нем:

http://docs.oracle.com/javaee/6/tutorial/doc/bncij.html

уведомления о том, что сообщение не будет доступно пока вы не зафиксируете. Поэтому, если вы отправляете сообщение в транзакции, вы не сможете получить его в той же транзакции.

На вашем примере edit1 вы отправляете сообщение и потребляете его в той же транзакции. Это не сработает, так как сначала вам необходимо выполнить метод создания, прежде чем вы сможете его использовать. Вам понадобится две транзакции в этом случае, поэтому Edit1 будет нарушен.

Также: убедитесь, что вы закрываете соединение в конце. Поскольку вы используете JmsXA (или фабрику объединенного подключения), вы будете автоматически выполнять опрос на сервере приложений.

+0

Спасибо за ваш ответ. Я постараюсь разобраться в этом. Не могли бы вы дать мне небольшой пример решения, описанного в части 1? – dhrm

+0

У меня нет примера для использования в одном классе. вы должны искать аннотации транзакций на EE6 –

+0

У меня есть сообщение, отредактированное с дополнительной информацией –

1

Перед тем, как на самом деле может получать сообщения, вы должны предании объект сеанса. После утверждения manufacturer.send вам нужно добавить session.commit.

Кроме того, я бы рекомендовал закрыть производителя в конце.

Другая вещь, которая выглядит неправильно, заключается в том, что вы создаете потребителя после уничтожения производителя.

+0

Спасибо за ваш ответ. Добавление фиксации в строке после файла manufacturer.send приводит к соединению * XA: javax.jms.TransactionInProgressException *. – dhrm

+0

Щедрость была поставлена ​​на неправильный ответ. Прости. – dhrm

+0

Я отменил вашу награду, @ Денис. Вы можете повторно предложить его и вознаградить по желанию. Это дает * both * ответы на другой выстрел. – Shog9

Смежные вопросы