2016-04-11 4 views
4

Я работаю над темой «Использование кеша с Apache Camel и как обрабатывать большой файл».Cache Streaming Обработать большой файл

Целью является обработка большого файла с помощью верблюда без загрузки файла в память, потому что это огромный файл за 5 GO.

Мы обнаружили несколько дорожек, первый трек должен использовать компонент сплиттера, чтобы мы могли читать файл, например, по строкам или по блокам, однако, если мы используем разделитель, мы не можем читать снова файл с самого начала, функциональная необходимость состоит в том, чтобы читать часть файла, даже когда раскол завершен.

Таким образом, мы должны использовать систему кэширования, чтобы поместить блоки в кеш для их повторного использования.

Таким образом, мы полагали, что для использования класса CachedOutputStream необходимо записать на диск часть файла после разделителя, этот класс также обеспечивает возможность шифрования данных на диске.

Пример ниже:

<camelContext xmlns="http://camel.apache.org/schema/spring" trace="false" streamCache="true"> 

    <streamCaching id="myCacheConfig" spoolDirectory="target/cachedir" spoolThreshold="16"/> 

    <route id="SPLIT-FLOW" streamCache="true"> 
     <from uri="file:src/data/forSplitCaching\SimpleRecord?noop=true"/> 
     <split streaming="true"> 
      <tokenize token="\n"/> 
      <to uri="direct:PROCESS-BUSINESS"/> 
     </split> 
    </route> 

    <route id="PROCESS-BUSINESS" streamCache="true"> 
     <from uri="direct:PROCESS-BUSINESS"/> 
     <bean ref="ProcessBusiness" method="dealRecord"/> 
     <choice> 
      <when> 
       <simple>${in.header.CamelSplitComplete} == "true"</simple> 
       <to uri="direct:STREAM-CACHING"/> 
      </when> 
     </choice> 
    </route> 

    <route id="STREAM-CACHING"> 
     <from uri="direct:STREAM-CACHING"/> 
     <bean ref="ProcessStreamCaching" method="usingStream"/> 
     <setHeader headerName="CamelFileName"> 
      <simple>${header.CamelFileName}.${header.CamelSplitIndex}</simple> 
     </setHeader> 
     <to uri="file:src/out"/> 
    </route> 

</camelContext> 

Метод dealRecord помещает каждую строку расщепляется в кэш:

public void dealRecord(Exchange exchange) throws Exception { 

    String body; 
    File file; 
    String[] files; 
    boolean isSplitComplete; 

    body = (String) exchange.getIn().getBody(); 
    isSplitComplete = (boolean) exchange.getProperties().get("CamelSplitComplete"); 

    CachedOutputStream cos = new CachedOutputStream(exchange, false); 
    cos.write(body.getBytes("UTF-8")); 

    file = new File("target/cachedir"); 
    files = file.list(); 
    for (String nameTmpfile : files) { 
     LOG.info("Genered File [" + nameTmpfile + "]"); 
    } 

    lstCache.add(cos); 

    if(isSplitComplete){ 
     exchange.getIn().setHeader("Cached",lstCache); 
    } 
} 

Метод usingStream, может использовать каждый кэш существующего в заголовке

public byte[] usingStream(Exchange exchange) throws InputStreamException { 

    final ArrayList<CachedOutputStream> lstcache; 
    byte[] bytesMessage; 
    StringBuilder messageCompleteOut = new StringBuilder(); 
    InputStream is = null; 

    lstcache = (ArrayList<CachedOutputStream>) exchange.getIn().getHeader("Cached"); 
    for (CachedOutputStream oneCache : lstcache) { 
     try { 
     is = oneCache.getWrappedInputStream(); 
     String messageInputstream = toString(is); 
     LOG.info("Message of Cache ["+ messageInputstream +"]"); 
     messageCompleteOut.append(messageInputstream); 
     messageCompleteOut.append(System.lineSeparator()); 
     } catch (IOException e) { 
     LOG.error(InputStreamException.ERROR_MANIPULATING_INPUT_STREAM_CHANNEL); 
     throw new InputStreamException(InputStreamException.ERROR_MANIPULATING_INPUT_STREAM_CHANNEL,e); 
     } 
     // On ferme le flux 
     IOHelper.close(is); 
    } 
    bytesMessage = messageCompleteOut.toString().getBytes(Charset.forName("UTF-8")); 
    return bytesMessage; 
} 

Разве это решение выглядит нормально? или, может быть, есть лучший способ?

thxs

ответ

0

GenericFileMessage (реализация сообщений используется компонент файла) не загружает содержимое файла в unles памяти необходимо. Поэтому на самом деле вам нужно только убедиться, что вы не получаете доступ к телу, чтобы заставить его его преобразовать. Вы также можете написать собственное сообщение (наследующее от GenericFileMessage) и предотвратить такое преобразование или вернуть что-то другое (какой-то «дайджест»).

Процессоры по пути могут получить местоположение файла в файловой системе (из заголовков сообщений) и открыть его напрямую, возможно, заменяя сообщение файла каким-либо другим сообщением.

+0

Этот класс GenericFileMessage позволяет вам управлять сообщениями, по умолчанию файл загружается в память, любые методы, доступные для указания установки в конкретном кеше. – Kikou

+0

См. Этот ответ: http://stackoverflow.com/a/9389465/2956532 Содержимое файла не загружается в память. Верблюд фактически передает экземпляр WrappedFile в теле сообщения. –