2017-02-03 3 views
2

У меня есть простой пример журнала:Apache верблюд: агрегат по классификатором из журнала

2017-02-02 09:58:12,764 - INFO - PRC0XK - logged in 
2017-02-02 09:58:13,766 - INFO - L3J5WW - logged in 
2017-02-02 09:58:14,005 - INFO - 0NKCVZ - call s2 
2017-02-02 09:58:14,767 - INFO - P0QIOW - logged in 
2017-02-02 09:58:15,729 - INFO - E0MVFZ - call s2 
2017-02-02 09:58:16,257 - INFO - L3J5WW - call s2 
2017-02-02 09:58:17,750 - INFO - PRC0XK - call s2 
2017-02-02 09:58:21,908 - INFO - P0QIOW - call s2 
2017-02-02 09:58:30,479 - INFO - PRC0XK - get answer from s2 
2017-02-02 09:58:30,479 - INFO - PRC0XK - logged out 

Это сформированный полями как . Я хочу использовать его в качестве ввода и формирования действий один за другим на USERID. Позже я хочу добавить еще один файл журнала, сформированный таким же образом, который также имеет простой модифицированный USERID, и собрать все действия через два журнала вместе на USERID. Я пытаюсь использовать стратегию агрегации, но у меня есть некоторые, которых я не ожидаю. Мой верблюд маршрут:

<route id="fileeater"> 
<description> 
    this route will eat log file and try to put guid through lot of log entry by some identifier 
</description> 
<from uri="file://data/in?charset=utf-8"/> 
<split streaming="true"> 
    <tokenize token="\n"/> 
    <to uri="log:gotlogline"/> 
    <aggregate strategyRef="SimpleAggregationStrategy" completionSize="4"> 
     <correlationExpression> 
      <constant>true</constant> 
     </correlationExpression> 
     <log logName="LOGEater" message="this is logeater part"/> 
     <to uri="file://data/out"/> 
    </aggregate> 
</split> 

где SimpleAggregationStrategy является:

import org.apache.camel.Exchange; 
import org.apache.camel.processor.aggregate.AggregationStrategy; 

public class SimpleAggregationStrategy implements AggregationStrategy{ 

@Override 
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 

    if(oldExchange == null) { 
     return newExchange; 
    } 

    String oldBody = oldExchange.getIn().getBody(String.class); 
    String newBody = newExchange.getIn().getBody(String.class); 
    String body= oldBody; 
    if (oldBody.split(" - ")[2].equalsIgnoreCase(newBody.split(" - ")[2])){ 
     body = oldBody + "\n" + newBody; 
    } 

    oldExchange.getIn().setBody(body); 

    return oldExchange; 
} 

} 

Итак, я ожидаю записи в журнал и сгруппированы по USERID:

... 
2017-02-02 09:59:45,599 - INFO - NU7444 - logged in 
2017-02-02 09:59:51,229 - INFO - NU7444 - call s2 
2017-02-02 10:00:09,818 - INFO - NU7444 - get answer from s2 
2017-02-02 10:00:09,818 - INFO - NU7444 - logged out 
... 

Но у меня есть только две линии в outfile:

2017-02-02 10:00:09,818 - INFO - NU7444 - get answer from s2 
2017-02-02 10:00:09,818 - INFO - NU7444 - logged out 

Мои мысли о correlationExpression агрегации:

  1. можно использовать часть строки журнала (сплит («-») [2], как USERID), чтобы связать его вместе агрегацией?

  2. Я прочитал http://www.catify.com/2012/07/09/parsing-large-files-with-apache-camel/ и обнаружил, что агрегация по заголовку быстрее, чем простая агрегация. Итак, могу ли я использовать часть строки после расщепления в качестве заголовка, а затем собрать ее в агрегации по заголовку? Должен ли я использовать Процессор, чтобы получить часть строки (USERID) и поместить его в заголовок для него?

+0

хорошо, я добавить процессор, который заполнить заголовок для каждой строки: 'общественного класса UserIDProcessor реализует процессор { \t общественных недействительный процесс (Exchange обмен) бросает исключение { \t \t входа String = exchange.getIn() getBody (String. .класс); \t if (input.split ("-") .length> 2) { \t \t exchange.getIn(). SetHeader ("LOGLEVEL", input.split ("-") [1]); \t \t замена.getIn().setHeader ("USERID", input.split ("-") [2]); \t} \t замена.getIn(). КомплектBody (вход); \t} 'и сделал агрегацию по заголовку. Но у меня все еще есть выход только для одного ПОЛЬЗОВАТЕЛЯ, любые предложения? – smartydoc

ответ

0

Ну, ребята. Кажется, я нашел решение после игры с верблюдом. это об использовании процесса, который может установить заголовок для каждой записи журнала, как я упоминаю в комментарии:

public class UserIDProcessor implements Processor{ 
    public void process(Exchange exchange) throws Exception { 
     String input = exchange.getIn().getBody(String.class); 
     if (input.split(" - ").length > 2){ 
      exchange.getIn().setHeader("LOGLEVEL", input.split(" - ")[1]); 
      exchange.getIn().setHeader("USERID", input.split(" - ")[2]); 
     } 
     exchange.getIn().setBody(input); 
    } 
} 

Затем я агрегировать сообщения по заголовку, используя простой aggrstrategy:

public class SimpleAggregationStrategy implements AggregationStrategy{ 
    @Override 
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
     if(oldExchange == null) { 
      return newExchange; 
     } 
     String oldBody = oldExchange.getIn().getBody(String.class); 
     String newBody = newExchange.getIn().getBody(String.class); 
     String body= oldBody + "\r\n" + newBody; 
     oldExchange.getIn().setBody(body); 
     return oldExchange; 
    } 
} 

И использовать довольно простой маршрут (можно добавить флаг тайм-аут и размер завершения в части базы агрегации маршрутов от ваших потребностей):

<route id="fileeater"> 
    <description> 
     this route will eat log file and try to put guid through lot of log entry by some identifier 
    </description> 
    <from uri="file://data/in?charset=utf-8&amp;delete=false&amp;readLock=idempotent-changed&amp;readLockCheckInterval=5000"/> 
    <split streaming="true"> 
     <tokenize token="\n"/> 
     <process ref="UIDProcessor"/> 
     <aggregate strategyRef="SimpleAggregationStrategy" completionSize="4"> 
      <correlationExpression> 
       <simple>header.USERID</simple> 
      </correlationExpression> 
      <to uri="log:gotlogline"/> 
      <to uri="file://data/out?fileExist=append"/> 
     </aggregate> 
    </split> 
</route> 

Кроме того, для увеличения скорости парсинга у ou может добавить флаг parallelProcessing="true", чтобы разделить и получить сверхбыстрый результат.

Смежные вопросы