У меня есть приложение для создания динамического маршрута. Согласно конструкции потока, я создаю маршрут верблюда. Маршрут может содержать многоадресную рассылку, фильтр, агрегат, процессор и т.д. После проектирования потока через пользовательский интерфейс, мой маршрут был создан, как это:Получение точного ответа от комбинированных маршрутов
from("seda:start").routeId("idx")
.multicast()
.to("direct:a", "direct:b", "direct:c")
.parallelProcessing()
.end();
from("direct:a").transform(constant("A")).delay(1000).to("direct:merge");
from("direct:b").transform(constant("B")).delay(2000).to("direct:merge");
from("direct:c").transform(constant("C")).delay(3000).to("direct:merge");
from("direct:merge")
.aggregate(new MyAggregationStrategy()).constant(true).completionSize(3)
.to("mock:end");
У меня есть API, чтобы дать результат этого маршрута для пользователей. Когда я выполняю этот маршрут с InOut MEP, ответ «C», но издеваться: конец удовлетворен «ABC»:
MockEndpoint mock = getMockEndpoint("mock:end");
mock.expectedBodiesReceived("ABC"); //works as expected
String reply = template.requestBody("seda:start", "", String.class);
assertEquals("ABC", reply); //it returns 'C', but I expect 'ABC'
assertMockEndpointsSatisfied();
Как я могу изменить код, чтобы получить агрегированный результат с синхронного вызова? Вот код:
public class ResponseTest extends CamelTestSupport {
@Test
public void testAsyncInOut() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:end");
mock.expectedBodiesReceived("ABC"); //works as expected
String reply = template.requestBody("seda:start", "", String.class);
assertEquals("ABC", reply); //it returns 'C', but I expect 'ABC'
assertMockEndpointsSatisfied();
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("seda:start").routeId("idx")
.multicast()
.to("direct:a", "direct:b", "direct:c")
.parallelProcessing()
.end();
from("direct:a").transform(constant("A")).delay(1000).to("direct:merge");
from("direct:b").transform(constant("B")).delay(2000).to("direct:merge");
from("direct:c").transform(constant("C")).delay(3000).to("direct:merge");
from("direct:merge")
.aggregate(new MyAggregationStrategy()).constant(true).completionSize(3)
.to("mock:end");
}
};
}
class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
// this is the first time so no existing aggregated exchange
return newExchange;
}
// append the new word to the existing
String body = newExchange.getIn().getBody(String.class);
String existing = oldExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(existing + body);
return oldExchange;
}
}
}
EDIT
Multicasting сообщение 5 различных конечных точек не означает, что все сообщения будут объединены позже. Поэтому я не могу использовать агрегированную стратегию в определении многоадресной рассылки. Некоторые из них могут использоваться для другого вида работы. Определение потока может выглядеть следующим образом: После сообщения многоадресной передачи конечным точкам 'a', 'b', 'c', 'd', 'e', 'a' и 'b' могут быть объединены вместе ('direct: merge1'), 'c' и 'd', объединенные вместе ('direct: merge2') и 'e' могут использоваться для другой вещи. И последний агрегатор будет агрегировать «direct: merge1» и «direct: merge2» в «direct: merge3». Все эти конечные точки создаются динамически («direct: a», «direct: b», «direct: c», «direct: d», «direct: e», «direct: merge1», «direct: merge2», прямой: merge3'). Этот сценарий будет создан так:
from("seda:start").routeId("idx")
.multicast()
.to("direct:a", "direct:b", "direct:c", "direct:d", "direct:e")
.parallelProcessing()
.end();
from("direct:a").transform(constant("A")).delay(1000).to("direct:merge1");
from("direct:b").transform(constant("B")).delay(2000).to("direct:merge1");
from("direct:c").transform(constant("C")).delay(3000).to("direct:merge2");
from("direct:d").transform(constant("D")).delay(1000).to("direct:merge2");
from("direct:e").transform(constant("E")).delay(1000).to("mock:anywhere");
from("direct:merge1").aggregate(new MyAggregationStrategy()).constant(true).completionSize(2).to("direct:merge3");
from("direct:merge2").aggregate(new MyAggregationStrategy()).constant(true).completionSize(2).to("direct:merge3");
from("direct:merge3").aggregate(new MyAggregationStrategy()).constant(true).completionSize(2).to("mock:end");
Когда я отправить сообщение Седа: старт, я ожидаю ABDC, но я получил «E». Есть ли способ получить окончательное агрегированное сообщение («ABDC»)? Вот метод испытания:
@Test
public void testAsyncInOut() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:end");
mock.expectedBodiesReceived("ABDC"); //works as expected
String reply = template.requestBody("seda:start", "", String.class);
assertEquals("ABDC", reply); //it returns 'E' because of default multicast behavior, but I expect 'ABDC'
assertMockEndpointsSatisfied();
}
Спасибо за ваш ответ, но я думаю, что я не мог четко объяснить свою проблему. Я не могу использовать агрегированную стратегию в определении многоадресной рассылки. Из-за того, что некоторые сообщения не могут быть включены в агрегированное сообщение в соответствии с дизайном потока, как я объяснил в своем EDIT – audtou
, я думаю, что вы можете вложить вызовы на многоадресную рассылку (новый MyAggregationStrategy()) для получения желаемого результата. –