2014-12-05 2 views
0

Я пытаюсь заставить Wildfly и ActiveMQ работать вместе с Apache Camel, позвольте мне объяснить сценарий. Каждый час партия верблюдов опросает FTP-сервер, захватывает файлы и отправляет их брокеру ActiveMQ. Брокер реализует два маршрута: numbers и big.numbers. Сообщения помещаются в очередь numbers, если они не готовы к отправке, они направляются на big.numbers. Сообщения в big.numbers удалены и переведены процессором верблюда и поставлены в очередь на numbers как готовые к отправке. Готовые к отправке сообщения в numbers отправляются в Wildfly, где MDB что-то делает. Все работает отлично, за исключением последнего шага. Когда MDB получает сообщение, генерирует исключение: org.hornetq.jms.client.HornetQBytesMessage cannot be cast to javax.jms.TextMessage. Вот camel.xml я использую для реализации маршрутов на моем ActiveMQ брокера:Интеграция ActiveMQ и Wildfly с Apache Camel

... 
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> 
    <package>edu.foo.amq.camel</package> 
    <dataFormats> 
     <string id="utf-8-string" charset="UTF-8"/> 
    </dataFormats> 


    <route id="toBigNumeri"> 
     <from uri="activemq:numbers.queue"/> 
     <marshal ref="utf-8-string"/> 
     <choice> 
      <when> 
       <simple> 
        ${in.header.readyToGo} != true 
       </simple> 
       <process ref="big.numbers.processor"/> 
       <to uri="activemq:big.numbers.queue"/> 
      </when> 
      <otherwise> 
       <process ref="done.processor"/> 
       <to uri="wildflycf:generatoreQueue?username=dummyusr&amp;password=dummy1234"/> 
      </otherwise> 
     </choice>   
    </route> 

    <route id="toNumeri"> 
     <from uri="activemq:big.numbers.queue"/> 
     <marshal ref="utf-8-string"/> 
     <split> 
      <tokenize token="\n"/> 
      <process ref="numbers.processor"/> 
      <setHeader headerName="readyToGo"> 
       <constant>true</constant> 
      </setHeader> 
      <to uri="activemq:numbers.queue"/> 
     </split> 
     <process ref="dump.processor"/> 
     <to uri="activemq:dump.queue"/> 
    </route> 
</camelContext> 

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="connectionFactory"> 
     <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 
     <property name="brokerURL" value="tcp://localhost:61616"/> 
     <property name="userName" value="${activemq.username}"/> 
     <property name="password" value="${activemq.password}"/> 
     </bean> 
    </property> 
</bean> 

<bean id="jndiTmp" class="org.springframework.jndi.JndiTemplate"> 
    <property name="environment"> 
     <props> 
      <prop key="java.naming.provider.url">http-remoting://localhost:8080</prop> 
      <prop key="java.naming.factory.initial">org.jboss.naming.remote.client.InitialContextFactory</prop> 
      <prop key="java.naming.security.principal">jhon</prop> 
      <prop key="java.naming.security.credentials">doe</prop> 
     </props> 
    </property> 
</bean> 

<bean id="wfcf" class="org.springframework.jndi.JndiObjectFactoryBean"> 
    <property name="jndiTemplate" ref="jndiTmp"/> 
    <property name="jndiName" value="jms/RemoteConnectionFactory"/> 
</bean> 

<bean id="wildflycf" class="org.apache.camel.component.jms.JmsComponent"> 
    <property name="connectionFactory" ref="wfcf"/> 
</bean> 
... 

Вот мой MDB:

import javax.ejb.ActivationConfigProperty; 
import javax.ejb.MessageDriven; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageListener; 
import javax.jms.TextMessage; 
import org.apache.log4j.Logger; 

@MessageDriven(name="GeneratoreMDB", activationConfig = { 
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), 
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), 
@ActivationConfigProperty(propertyName="destinationLookup", propertyValue=QueueHandler.jmsQueue)} 
) 
public class QueueHandler implements MessageListener { 

public static final String jmsQueue = "/export/jms/generatoreQueue"; 

private static final Logger log = Logger.getLogger(QueueHandler.class); 

    @Override 
    public void onMessage(Message arg0) { 
    try { 
     log.info(((TextMessage) arg0).getText()); 
     ... 
    } catch (JMSException e) { 
     log.error(e); 
    } 
    } 
} 

Вот исключение мой MDB бросает:

2014-12-05 09:12:04,864 ERROR [org.hornetq.ra] (Thread-35 (HornetQ-client-global-threads-1727074612)) HQ154004: Failed to deliver message: javax.ejb.EJBTransactionRolledbackException: org.hornetq.jms.client.HornetQBytesMessage cannot be cast to javax.jms.TextMessage 
at org.jboss.as.ejb3.tx.CMTTxInterceptor.handleInCallerTx(CMTTxInterceptor.java:163) 
at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInCallerTx(CMTTxInterceptor.java:253) 
at org.jboss.as.ejb3.tx.CMTTxInterceptor.required(CMTTxInterceptor.java:342) 
at org.jboss.as.ejb3.tx.CMTTxInterceptor.processInvocation(CMTTxInterceptor.java:239) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.invocationmetrics.WaitTimeInterceptor.processInvocation(WaitTimeInterceptor.java:43) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.security.SecurityContextInterceptor.processInvocation(SecurityContextInterceptor.java:95) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.interceptors.ShutDownInterceptorFactory$1.processInvocation(ShutDownInterceptorFactory.java:64) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.interceptors.LoggingInterceptor.processInvocation(LoggingInterceptor.java:59) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.interceptors.AdditionalSetupInterceptor.processInvocation(AdditionalSetupInterceptor.java:55) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentDescription$5$1.processInvocation(MessageDrivenComponentDescription.java:211) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:326) 
at org.wildfly.security.manager.WildFlySecurityManager.doChecked(WildFlySecurityManager.java:448) 
at org.jboss.invocation.AccessCheckingInterceptor.processInvocation(AccessCheckingInterceptor.java:61) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:326) 
at org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61) 
at org.jboss.as.ee.component.ViewService$View.invoke(ViewService.java:185) 
at org.jboss.as.ee.component.ViewDescription$1.processInvocation(ViewDescription.java:182) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61) 
at org.jboss.as.ee.component.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:73) 
at edu.foo.incassionline.generatore.jms.QueueHandler$$$view3.onMessage(Unknown Source) 
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) [:1.7.0_45] 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.7.0_45] 
at java.lang.reflect.Method.invoke(Method.java:606) [rt.jar:1.7.0_45] 
at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.doInvoke(MessageEndpointInvocationHandler.java:139) 
at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:73) 
at edu.foo.incassionline.generatore.jms.QueueHandler$$$endpoint1.onMessage(Unknown Source) 
at org.hornetq.ra.inflow.HornetQMessageHandler.onMessage(HornetQMessageHandler.java:319) [hornetq-ra-2.4.1.Final.jar:] 
at org.hornetq.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1116) [hornetq-core-client-2.4.1.Final.jar:] 
at org.hornetq.core.client.impl.ClientConsumerImpl.access$500(ClientConsumerImpl.java:56) [hornetq-core-client-2.4.1.Final.jar:] 
at org.hornetq.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1251) [hornetq-core-client-2.4.1.Final.jar:] 
at org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor$1.run(OrderedExecutorFactory.java:104) [hornetq-core-client-2.4.1.Final.jar:] 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [rt.jar:1.7.0_45] 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [rt.jar:1.7.0_45] 
at java.lang.Thread.run(Thread.java:744) [rt.jar:1.7.0_45] 
Caused by: java.lang.ClassCastException: org.hornetq.jms.client.HornetQBytesMessage cannot be cast to javax.jms.TextMessage 
at edu.foo.incassionline.generatore.jms.QueueHandler.onMessage(QueueHandler.java:27) 
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) [:1.7.0_45] 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.7.0_45] 
at java.lang.reflect.Method.invoke(Method.java:606) [rt.jar:1.7.0_45] 
at org.jboss.as.ee.component.ManagedReferenceMethodInterceptor.processInvocation(ManagedReferenceMethodInterceptor.java:52) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.WeavedInterceptor.processInvocation(WeavedInterceptor.java:53) 
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:407) 
at org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.doMethodInterception(Jsr299BindingsInterceptor.java:82) 
at org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.processInvocation(Jsr299BindingsInterceptor.java:93) 
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.WeavedInterceptor.processInvocation(WeavedInterceptor.java:53) 
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.invocationmetrics.ExecutionTimeInterceptor.processInvocation(ExecutionTimeInterceptor.java:43) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:407) 
at org.jboss.weld.ejb.AbstractEJBRequestScopeActivationInterceptor.aroundInvoke(AbstractEJBRequestScopeActivationInterceptor.java:55) 
at org.jboss.as.weld.ejb.EjbRequestScopeActivationInterceptor.processInvocation(EjbRequestScopeActivationInterceptor.java:83) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.InitialInterceptor.processInvocation(InitialInterceptor.java:21) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61) 
at org.jboss.as.ee.component.interceptors.ComponentDispatcherInterceptor.processInvocation(ComponentDispatcherInterceptor.java:53) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.component.pool.PooledInstanceInterceptor.processInvocation(PooledInstanceInterceptor.java:51) 
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309) 
at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInCallerTx(CMTTxInterceptor.java:251) 
... 49 more 

I не может понять, почему MDB получает сообщение как сообщение HornetQ, а не просто сообщение JMS.

+2

Из stacktrace он указывает, что вы отправляете сообщение в очередь в виде сообщения в виде байтов. А с другой стороны вы пытаетесь набирать текст в качестве текстового сообщения. Вам нужно использовать тот же тип сообщения с JMS –

ответ

1

При отправке сообщения JMS, Camel преобразует тело сообщения следующих типов JMS сообщений (см here):

╔══════════════════════╦═════════════════════════╗ 
║ Body Type   ║ JMS Message    ║ 
╠══════════════════════╬═════════════════════════╣ 
║ String    ║ javax.jms.TextMessage ║ 
║ org.w3c.dom.Node  ║ javax.jms.TextMessage ║ 
║ Map     ║ javax.jms.MapMessage ║ 
║ java.io.Serializable ║ javax.jms.ObjectMessage ║ 
║ byte[]    ║ javax.jms.BytesMessage ║ 
║ java.io.File   ║ javax.jms.BytesMessage ║ 
║ java.io.Reader  ║ javax.jms.BytesMessage ║ 
║ java.io.InputStream ║ javax.jms.BytesMessage ║ 
║ java.nio.ByteBuffer ║ javax.jms.BytesMessage ║ 
╚══════════════════════╩═════════════════════════╝ 

Пожалуйста, проверьте тип тела сообщения в big.numbers.processor. Существуют две возможности: 1) создать «тело типа текстового сообщения» в соответствии с правилами конвертации Camel и привести к TextMessage или 2) вместо BytesMessage.

+0

Право! Я изменить свой маршрут '<Ури = "wildflycf: generatoreQueue имя пользователя = dummyusr & пароль = dummy1234"/>' так: '<Ури =" wildflycf: generatoreQueue имя пользователя = dummyusr & пароль = dummy1234 & jmsTextMessage = true "/>' , и теперь мой MDB может обрабатывать сообщения. – Francesco

Смежные вопросы