2017-01-04 7 views
0

Это мой код.SplitStream для динамического выходного ключа (Выбрать)

SplitStream<MonitoringEvent> splitStream = inputStream.split(new OutputSelector<MonitoringEvent>() { 

    @Override 
    public Iterable<String> select(MonitoringEvent me) { 

     List<String> ml = new ArrayList<String>();    
     ml.add(me.getEventType());        
     return ml; 
} 

У меня есть поток Мониторинг событий, поступающих в случайном порядке температура: 80, давление: 70, влажность: 80, температура: 30 ...

С выше коде, я разделения потока, eventType, т. е. temperatureStream, pressureStream.

Проблема заключается в том, если я знаю типСобытие, я могу выбрать его из splitStream как

splitStream.select('temperatureStream') 

но типСобытия является динамичной и не предопределен.

Как применить CEP для этого динамического потока. КООС будет, как, если

temperate is > 90 for past 10 minutes ... 

pressure is > 90 for past 10 minutes ... 
+0

Не идеально, но поскольку ваши типы событий являются конечными и маленькими (темп, давление, влажность ...), вы можете иметь несколько потоков, а затем выполнять специфическую обработку по этим отдельным потокам. Если eventTypes значительно возрастет, тогда да, будет сложно справиться. – Aurvoir

+0

или разделить события заранее либо у источника/производителя, либо с помощью какой-либо ключевой маршрутизации, например, в обмене сообщениями – Aurvoir

ответ

0

Поправьте меня, если я ошибаюсь, но я думаю, что это не возможно сделать динамический поиск по parallism выбора кнопки из-FLiNK в. Ваша программа преобразуется в параллельные инструкции для диспетчеров флинков, и координатор работы координирует эти действия. Без общего knowlegde о вашем абстрактном синтаксическом дереве никакой парализм не может быть применен вообще ... Возможно, вы могли бы найти какой-то общий атрибут, который все сообщения разделяют и отличаются от него.

+0

Итак, как мне это достичь? Один поток с несколькими типами событий. И поток должен быть разделен и применен CEP соответственно – madhairsilence

+0

Если вы еще не нашли решение. Вы должны указать и разобрать входящее сообщение на классы, используя DeserializationSchema в своем источнике. Здесь некоторый speudo-код, например: class mySchema extends AbstractDeserializationSchema {public LogEvent deserialize (byte [] message) throws IOException {return (Temperature) message;}}, при этом каждое входящее сообщение напечатано как класс температуры, и его можно выбрать каждое событие используя .select(). type (Temperature.class) .dosomething с CEP-Rules – bl4ckbird

Смежные вопросы