2014-11-21 3 views
2

Мы используем apache Camel для маршрутизации и извлечения из файлов.Camel - данные из двух источников

У меня возникла ситуация, когда мне нужно получить данные из файла в общей папке и данные из базы данных. Мне нужно объединить данные только тогда, когда прибыли данные с обеих сторон. Если обе стороны не получили, то мой процесс объединения данных должен ждать, пока обе стороны не будут присутствовать.

Возможно ли это? Как я могу это достичь? Любой пример кода?

ответ

0

Что-то должно инициировать процесс - либо файл, либо базу данных, поэтому выберите один.

Затем вы можете использовать enricher pattern для заполнения другого источника (когда данные готовы). Для объединения данных используется стратегия агрегации. Обычно вы пишете стратегию агрегации в java.

Ссылка содержит примеры того, как обогащать и объединять данные. Вы можете узнать, как обращаться с базой данных и файлами в документации Camel.

+0

Используя стратегию агрегирования, скажем, файл пришел и процесс верблюда ног, но строка базы данных нет. В этом случае я хочу подождать и начать опрос до тех пор, пока не появятся строки базы данных. Это возможно? Извините, если я задаю глупый вопрос, несмотря на то, что вы указали ссылку выше. Я хочу, чтобы обе вещи происходили в контексте одного верблюда. – nav

0

Я использую это для почтового файла с обработкой журнала вместе. Я привел пример, надеюсь, что это поможет вам.

//Archived 
    from("direct:" + EnvironmentSetup.ARCHIVED) 
     .routeId(ROUTES.ARCHIVED.name()) 
     .setHeader(HEADER_ZIP_AGG_ID, header(Exchange.FILE_NAME)) 
     .setHeader(HEADER_AFTER_ZIP_DEST).constant(getArchiveUri()) 
     .setHeader(HEADER_STATUS).constant(STATUS.SUCCESS) 
     .pipeline() 
      .to("direct:" + EnvironmentSetup.ARCHIVED_ZIP) 
     .end() 
     .pipeline() 
      .setHeader(Exchange.FILE_NAME, header(Exchange.FILE_NAME).append(".report")) 
      .setBody(header(ProcessManager.PROCESS_LOG).convertToString()) 
      .to("direct:" + EnvironmentSetup.ARCHIVED_ZIP) 
     .end() 
    .end(); 

    from(
     "direct:" + EnvironmentSetup.DECRYPT_FAILED_ZIP, 
     "direct:"+EnvironmentSetup.PROCESS_FAILED_ZIP, 
     "direct:"+EnvironmentSetup.ARCHIVED_ZIP 
    ) 
     .routeId("ZIP") 
      .aggregate(header(HEADER_ZIP_AGG_ID), new CopiedGroupedExchangeAggregationStrategy()) 
      .completionSize(2) 
      .marshal(zipFileDataFormat) 
       .multicast() 
       .pipeline() 
        .setHeader(Exchange.FILE_NAME, simple(String.format(
         "${in.header.%s}/${in.header.%s}", HEADER_EMAIL, Exchange.FILE_NAME))) //header(HEADER_EMAIL). header(Exchange.FILE_NAME)) 
        //.dynamicRouter(header(HEADER_AFTER_ZIP_DEST)) 
        .to("direct:dynamic") 

       .end() 
       .pipeline() 
        .marshal(encryption) 
        .setHeader(Exchange.FILE_NAME, simple(String.format(
         "${in.header.%s}/${in.header.%s}.gpg", HEADER_EMAIL, Exchange.FILE_NAME))) 
        //.setHeader(Exchange.FILE_NAME, header(Exchange.FILE_NAME).append(".gpg")) 
        .to("direct:"+EnvironmentSetup.SEND_BACK) 
       .end() 
      .end() //end aggregate 
     .end(); 

CopiedGroupedExchangeAggregationStrategy.java

public class CopiedGroupedExchangeAggregationStrategy extends 
                 AbstractListAggregationStrategy<Exchange> { 

    @Override 
    public boolean isStoreAsBodyOnCompletion() { 
     // keep the list as a property to be compatible with old behavior 
     return true; 
    } 

    @Override 
    public Exchange getValue(Exchange exchange) { 
     return exchange.copy(); 
    } 

} 
Смежные вопросы