0

настоящее время у меня следующий верблюд маршрут:Camel AggregationStrategy продуцирующие NULL работоспособных сообщения

<camelContext id="my-camel-context" xmlns="http://camel.apache.org/schema/spring"> 
    <propertyPlaceholder id="envProps" location="classpath:myapp.properties" /> 
    <route id="my-camel-route"> 
     <from uri="{{start.uri}}"/> 

     <setHeader headerName="id"> 
      <constant>1</constant> 
     </setHeader> 

     <to uri="bean:preProcessor?method=process" /> 

     <aggregate strategyRef="myAggregationStrategy" completionSize="1"> 
      <correlationExpression> 
       <simple>${header.id} == 1</simple> 
      </correlationExpression> 
      <to uri="bean:postProcessor?method=process" /> 
     </aggregate> 

     <to uri="bean:mailer?method=process" /> 
    </route> 
</camelContext> 

<bean id="myAggregationStrategy" class="com.me.myapp.MyAggregationStrategy" /> 
<bean id="postProcessor" class="com.me.myapp.PostProcessor" /> 
<bean id="mailer" class="com.me.myapp.Mailer" /> 

сейчас, я на самом деле не агрегирующего ничего значимого (completionSize=1), я действительно только тестирование AggregationStrategy аут. Вот моя стратегия:

public class MyAggregationStrategy implements AggregationStrategy { 
    @Override 
    public Exchange aggregate(Exchange aggregatingExchange, Exchange incomingExchange) { 
     AppPayload payload = null; 

     if(aggregatingExchange == null) 
      payload = new AppPayload(); // This should prevent it from being NULL below in PostProcessor... 
     else 
      payload = (AppPayload)incomingExchange.getIn().getBody(); 

     payload.setCargo((Order)incomingExchange.getIn().getBody()); 

     if(aggregatingExchange == null) { 
      incomingExchange.getIn().setBody(payload); 
      return incomingExchange; 
     } 
     else 
      return aggregatingExchange; 
    } 
} 

А также мой postProcessor боб:

public class PostProcessor implement Processor { 
    @Override 
    public void process(Exchange exchange) { 
     try { 
      System.out.println("In PostProcessor..."); 
      AppPayload payload = (AppPayload)exchange.getIn().getBody(); 
      System.out.println("\t...payload acquired..."); 

      if(payload == null) 
       System.out.println("Payload is NULL."); 
     } catch(Throwable throwable) { 
      System.out.println(ExceptionUtils.getFullStackTrace(throwable)); 
     } 
    } 
} 

Когда я запускаю этот код, я вижу сообщение от моего preProcessor боба, что inidcate он правильно выполняет лог. И я также вижу, что MyAggregationStrategy правильно «агрегирует» сообщение, а затем пропускает его до postProcessor после получения 1-го сообщения (опять же, потому что completionSize=1). Тем не менее, я получаю следующий результат в postProcessor:

In PostProcessor... 
    ...payload acquired... 
Payload is NULL. 

Может кто-нибудь понять, почему payload будет NULL? Не должно быть инициализировано внутри MyAggregationStrategy?!? Я рад опубликовать больше кода, но я считаю, что это связано с неправильным использованием API AggregationStrategy.

ответ

1

Я считаю, что вы путаетесь с aggregatingExchange и incomingExchange. Вы можете попробовать:

public class MyAggregationStrategy implements AggregationStrategy { 
    @Override 
    public Exchange aggregate(Exchange aggregatingExchange, Exchange incomingExchange) { 
     AppPayload payload = null; 

     if(aggregatingExchange == null) { 
     payload = new AppPayload(); // This should prevent it from being NULL below in PostProcessor... 
     } else { 
      payload = (AppPayload)aggregatingExchange.getIn().getBody(); 
     } 

     payload.setCargo((Order)incomingExchange.getIn().getBody()); 

     if(aggregatingExchange == null) { 
      incomingExchange.getIn().setBody(payload); 
      return incomingExchange; 
     } else { 
      return aggregatingExchange; 
     } 
    } 
} 
+0

Nailed это - спасибо! – AdjustingForInflation

0

Добавление к тому, что уже упоминалось в @hveiga. У меня была аналогичная проблема, которую я разрешил, добавив заголовок в мои сообщения. Однако в вашем случае я вижу, что вы не используете сплиттер и у вас уже есть заголовок. Таким образом, часть информации, полученной от Clauss Ibssen, заключалась в том, что первый обмен временем будет пустым, и нам нужно проверить нулевой объект.

Смотрите это для более подробного объяснения - Apache Camel - Split and aggregate - Old Exchange is always null

Track полное объяснение здесь - http://camel.465427.n5.nabble.com/Split-and-Aggregate-Old-Exchange-is-null-everytime-in-AggregationStrategy-td5746365.html

Смежные вопросы