2013-12-02 3 views
0

У меня есть поток, где я читаю из файла, конвертирую в CSV, а затем конвертирую каждую строку в объекты. Поэтому я получаю список. Теперь я хочу разбить это на более мелкие списки и обрабатывать каждый параллельно. Я могу использовать split() для получения отдельных записей, но все мои попытки использовать агрегат не привели к списку, а только к отдельным элементам.Как я могу разбить список на более мелкие списки в Camel?

from("file://") 
    .unmarshal(csvDataFormat) 
    .to("bean:personReader") 
    .split(body()) 
    .aggregate(???) 
    .to("bean:send") 

ответ

1

Вы говорите о составленном процессоре сообщения шаблона EIP: http://camel.apache.org/composed-message-processor.html где вы можете сделать вилку/присоединиться к обработке?

Если так смотреть на эту ссылку выше, и посмотреть примеры с разветвителя только как сплиттер EIP в Camel имеет встроенный агрегации, так что вы можете объединить все расщепленные сообщения вместе к одному сообщению снова.

0

Я закончил с использованием пользовательской совокупной стратегии, видя, как AbstractListAggregationStrategy не совсем подходит как:

class ListAggregationStrategy implements AggregationStrategy { 
    @Override 
    public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) { 
     final Object value = newExchange.getIn().getBody(); 
     if (oldExchange == null) { 
      newExchange.getIn().setBody(Lists.newArrayList(value)); 
      return newExchange; 
     } else { 
      oldExchange.getIn().getBody(List.class).add(value); 
      return oldExchange; 
     } 
    } 
} 

Тогда: из ("прямой: источник") .aggregate() .constant (true) .completionSize (1000) .aggregationStrategy (новый ListAggregationStrategy()) .to ("direct: target");

Смежные вопросы