настоящее время у меня следующий верблюд маршрут:Camel AggregationStrategy продуцирующие NULL работоспособных сообщения
<camelContext id="my-camel-context" xmlns="http://camel.apache.org/schema/spring">
<propertyPlaceholder id="envProps" location="classpath:myapp.properties" />
<route id="my-camel-route">
<from uri="{{start.uri}}"/>
<setHeader headerName="id">
<constant>1</constant>
</setHeader>
<to uri="bean:preProcessor?method=process" />
<aggregate strategyRef="myAggregationStrategy" completionSize="1">
<correlationExpression>
<simple>${header.id} == 1</simple>
</correlationExpression>
<to uri="bean:postProcessor?method=process" />
</aggregate>
<to uri="bean:mailer?method=process" />
</route>
</camelContext>
<bean id="myAggregationStrategy" class="com.me.myapp.MyAggregationStrategy" />
<bean id="postProcessor" class="com.me.myapp.PostProcessor" />
<bean id="mailer" class="com.me.myapp.Mailer" />
сейчас, я на самом деле не агрегирующего ничего значимого (completionSize=1
), я действительно только тестирование AggregationStrategy
аут. Вот моя стратегия:
public class MyAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange aggregatingExchange, Exchange incomingExchange) {
AppPayload payload = null;
if(aggregatingExchange == null)
payload = new AppPayload(); // This should prevent it from being NULL below in PostProcessor...
else
payload = (AppPayload)incomingExchange.getIn().getBody();
payload.setCargo((Order)incomingExchange.getIn().getBody());
if(aggregatingExchange == null) {
incomingExchange.getIn().setBody(payload);
return incomingExchange;
}
else
return aggregatingExchange;
}
}
А также мой postProcessor
боб:
public class PostProcessor implement Processor {
@Override
public void process(Exchange exchange) {
try {
System.out.println("In PostProcessor...");
AppPayload payload = (AppPayload)exchange.getIn().getBody();
System.out.println("\t...payload acquired...");
if(payload == null)
System.out.println("Payload is NULL.");
} catch(Throwable throwable) {
System.out.println(ExceptionUtils.getFullStackTrace(throwable));
}
}
}
Когда я запускаю этот код, я вижу сообщение от моего preProcessor
боба, что inidcate он правильно выполняет лог. И я также вижу, что MyAggregationStrategy
правильно «агрегирует» сообщение, а затем пропускает его до postProcessor
после получения 1-го сообщения (опять же, потому что completionSize=1
). Тем не менее, я получаю следующий результат в postProcessor
:
In PostProcessor...
...payload acquired...
Payload is NULL.
Может кто-нибудь понять, почему payload
будет NULL? Не должно быть инициализировано внутри MyAggregationStrategy
?!? Я рад опубликовать больше кода, но я считаю, что это связано с неправильным использованием API AggregationStrategy
.
Nailed это - спасибо! – AdjustingForInflation