2015-05-17 2 views
1

У меня есть приложение для создания динамического маршрута. Согласно конструкции потока, я создаю маршрут верблюда. Маршрут может содержать многоадресную рассылку, фильтр, агрегат, процессор и т.д. После проектирования потока через пользовательский интерфейс, мой маршрут был создан, как это:Получение точного ответа от комбинированных маршрутов

from("seda:start").routeId("idx") 
    .multicast() 
     .to("direct:a", "direct:b", "direct:c") 
     .parallelProcessing() 
    .end(); 

from("direct:a").transform(constant("A")).delay(1000).to("direct:merge"); 
from("direct:b").transform(constant("B")).delay(2000).to("direct:merge"); 
from("direct:c").transform(constant("C")).delay(3000).to("direct:merge"); 

from("direct:merge") 
    .aggregate(new MyAggregationStrategy()).constant(true).completionSize(3) 
    .to("mock:end"); 

У меня есть API, чтобы дать результат этого маршрута для пользователей. Когда я выполняю этот маршрут с InOut MEP, ответ «C», но издеваться: конец удовлетворен «ABC»:

MockEndpoint mock = getMockEndpoint("mock:end"); 
    mock.expectedBodiesReceived("ABC"); //works as expected 

    String reply = template.requestBody("seda:start", "", String.class); 

    assertEquals("ABC", reply); //it returns 'C', but I expect 'ABC' 

    assertMockEndpointsSatisfied(); 

Как я могу изменить код, чтобы получить агрегированный результат с синхронного вызова? Вот код:

public class ResponseTest extends CamelTestSupport { 

@Test 
public void testAsyncInOut() throws Exception { 
    MockEndpoint mock = getMockEndpoint("mock:end"); 
    mock.expectedBodiesReceived("ABC"); //works as expected 

    String reply = template.requestBody("seda:start", "", String.class); 

    assertEquals("ABC", reply); //it returns 'C', but I expect 'ABC' 

    assertMockEndpointsSatisfied(); 
} 

@Override 
protected RouteBuilder createRouteBuilder() throws Exception { 
    return new RouteBuilder() { 
     @Override 
     public void configure() throws Exception { 
      from("seda:start").routeId("idx") 
       .multicast() 
        .to("direct:a", "direct:b", "direct:c") 
        .parallelProcessing() 
       .end(); 

      from("direct:a").transform(constant("A")).delay(1000).to("direct:merge"); 
      from("direct:b").transform(constant("B")).delay(2000).to("direct:merge"); 
      from("direct:c").transform(constant("C")).delay(3000).to("direct:merge"); 

      from("direct:merge") 
       .aggregate(new MyAggregationStrategy()).constant(true).completionSize(3) 
       .to("mock:end"); 
     } 
    }; 
} 

class MyAggregationStrategy implements AggregationStrategy { 

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
     if (oldExchange == null) { 
      // this is the first time so no existing aggregated exchange 
      return newExchange; 
     } 

     // append the new word to the existing 
     String body = newExchange.getIn().getBody(String.class); 
     String existing = oldExchange.getIn().getBody(String.class); 

     oldExchange.getIn().setBody(existing + body); 
     return oldExchange; 
    } 
} 
} 

EDIT

Multicasting сообщение 5 различных конечных точек не означает, что все сообщения будут объединены позже. Поэтому я не могу использовать агрегированную стратегию в определении многоадресной рассылки. Некоторые из них могут использоваться для другого вида работы. Определение потока может выглядеть следующим образом: После сообщения многоадресной передачи конечным точкам 'a', 'b', 'c', 'd', 'e', ​​'a' и 'b' могут быть объединены вместе ('direct: merge1'), 'c' и 'd', объединенные вместе ('direct: merge2') и 'e' могут использоваться для другой вещи. И последний агрегатор будет агрегировать «direct: merge1» и «direct: merge2» в «direct: merge3». Все эти конечные точки создаются динамически («direct: a», «direct: b», «direct: c», «direct: d», «direct: e», «direct: merge1», «direct: merge2», прямой: merge3'). Этот сценарий будет создан так:

from("seda:start").routeId("idx") 
    .multicast() 
     .to("direct:a", "direct:b", "direct:c", "direct:d", "direct:e") 
     .parallelProcessing() 
    .end(); 

from("direct:a").transform(constant("A")).delay(1000).to("direct:merge1"); 
from("direct:b").transform(constant("B")).delay(2000).to("direct:merge1"); 

from("direct:c").transform(constant("C")).delay(3000).to("direct:merge2"); 
from("direct:d").transform(constant("D")).delay(1000).to("direct:merge2"); 

from("direct:e").transform(constant("E")).delay(1000).to("mock:anywhere"); 

from("direct:merge1").aggregate(new MyAggregationStrategy()).constant(true).completionSize(2).to("direct:merge3"); 
from("direct:merge2").aggregate(new MyAggregationStrategy()).constant(true).completionSize(2).to("direct:merge3"); 

from("direct:merge3").aggregate(new MyAggregationStrategy()).constant(true).completionSize(2).to("mock:end"); 

Когда я отправить сообщение Седа: старт, я ожидаю ABDC, но я получил «E». Есть ли способ получить окончательное агрегированное сообщение («ABDC»)? Вот метод испытания:

@Test 
public void testAsyncInOut() throws Exception { 
    MockEndpoint mock = getMockEndpoint("mock:end"); 
    mock.expectedBodiesReceived("ABDC"); //works as expected 

    String reply = template.requestBody("seda:start", "", String.class); 

    assertEquals("ABDC", reply); //it returns 'E' because of default multicast behavior, but I expect 'ABDC' 

    assertMockEndpointsSatisfied(); 
} 

ответ

1

От multicastdocumentation:

По умолчанию Camel будет использовать последний ответ в качестве исходящего сообщения.

Если вы хотите агрегировать результаты многоадресной передачи в одно сообщение, вы указываете это в определении многоадресной рассылки.

@Override 
public void configure() throws Exception { 
    from("seda:start").routeId("idx") 
    .multicast(new MyAggregationStrategy()) //Put the Aggregation Strategy here! 
     .to("direct:a", "direct:b", "direct:c") 
     .parallelProcessing() 
    .end(); 

    from("direct:a").transform(constant("A")).delay(1000).to("direct:merge");      
    from("direct:b").transform(constant("B")).delay(2000).to("direct:merge"); 
    from("direct:c").transform(constant("C")).delay(3000).to("direct:merge"); 

    from("direct:merge") 
    .to("mock:end"); 
} 

Обратите внимание, что ваша фиктивная конечная точка теперь будет называться 3 раза, так как агрегация не произойдет дольше. Вам нужно будет соответствующим образом изменить ваш тест.

+0

Спасибо за ваш ответ, но я думаю, что я не мог четко объяснить свою проблему. Я не могу использовать агрегированную стратегию в определении многоадресной рассылки. Из-за того, что некоторые сообщения не могут быть включены в агрегированное сообщение в соответствии с дизайном потока, как я объяснил в своем EDIT – audtou

+0

, я думаю, что вы можете вложить вызовы на многоадресную рассылку (новый MyAggregationStrategy()) для получения желаемого результата. –

Смежные вопросы