2017-01-03 5 views
0

Предположим, у меня есть поток данныхкатегоризировать или GroupBy данных Datastream и процесс с КЭП отдельно

x:1, y:2 , z:3 , x:7 , y:-1, z:0, z:3 , z:2, y:3 ,x: 2 ,y:6 

Как я положил x,y,z в их собственном ведре и применить мое правило КЭП на нем.

x:1, x:7,x: 2 
y:2, y:-1, y:3 , y:6 
z:3, z:0 , z:3, z:2 

Или иначе. Как разделить поток на эти категории (один поток для каждого x, y, z). Я бы получил 3 субпотока, которые имеют собственную обработку CEP.

Задача здесь: x, y, z не заданы заранее. Поэтому я не могу создавать потоки и назначать их с помощью оператора if или switch.

Edit:. картина будет походить,»если значение х находится в диапазоне от 0 - 8 в течение последних 10 минут

ответ

0

Это делается с помощью„манипуляция“поток по атрибуту категории

Если у вас есть DataStream[(String, Int)] это выглядит следующим образом:.

val yourStream: DataStream[(String, Int)] = ??? 
val yourPattern: Pattern = ??? 

// key by String attribute 
val keyedStream = yourStream.keyBy(_._1) 
// apply pattern on keyed stream 
val patternStream: PatternStream = CEP.pattern(keyedStream, yourPattern) 

картина будет оцениваться для каждого отдельного значения атрибута шпоночного

+0

Отредактированный мой вопрос. Как я могу заставить его работать для моего требования. Я должен был бы увидеть значение x в течение 10 минут и принять решение. – madhairsilence

Смежные вопросы