2016-03-24 3 views
2

У меня есть работа по искрообразованию. Я хочу применить фильтр к моему входному RDD.Spark Streaming - Фильтр динамически

Я хочу получать критерии фильтра динамически каждый раз от Hbase во время каждой партии искрообразования.

Как это достичь?

Я могу создать объект соединения с использованием разделов карты один раз.

Но с искровым фильтром, как я могу достичь того же?

+0

В зависимости от ваших критериев фильтрации вы можете достичь этого с помощью 'join'. Вам нужно будет дать более полный пример того, что вы пытаетесь сделать, но если левая сторона соединения - ваш Spark Stream, правая сторона будет набором критериев. Если ни один из критериев не применяется, соединение не приводит к строкам - оно их фильтрует. –

ответ

0

Я думаю, правильный подход пишет функцию фильтра собственного (псевдокод):

DStream<Integer> intDstream= someIntegerIntoDStream; 
intDstream.foreachPartition{ 
    create HBase connection here if you need it for a batch 
    while(arg0.hasNext()){ //here you have an iterator 
      Integer current = arg0.next(); 
      create HBase connection here if you need it for each element 
      //Here is your filter function: 
      if(current meets your condition) 
       arg0.remove(); 

Так что же происходит, что вы работаете на ваш исполнитель и вручную выбирать каждый элемент, применяя условие к нему и удалению, если он соответствует вашим критериям.