2015-02-12 3 views
0

Я пытаюсь разработать приложение, которое принимает четыре разных темы с сервера kafka и принимает конкретные действия с каждой темой.чтение из нескольких тем

Я создал класс, который получает DStream и имеет метод, который должен преобразовать DStream.

Например, класс обработчика:

class StreamHandler(stream:DStream[String]) { 
    val stream:DStream[String] = stream 

    def doActions():DStream[String] = { 
    //Do smth. to DStream 
    } 
} 

А теперь представьте, я называю doActions() из основного класса для каждого класса обработчика Я хочу, что это будет повторяться с каждым прибывающим DStream или только один раз?

val topicHandler1 = new StreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic1"->1)).map(_._2) 
val topicHandler2 = new OtherStreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic2"->1)).map(_._2) 

topicHandler1.doActions() 
topicHandler2 .doActions() 

ssc.start() 

Есть ли лучший подход?

+0

Я думаю, вы хотите сказать 'topicHandler1.doActions()' вместо этого. – maasg

+0

Да, я ошибался, что – Godraude

ответ

0

Преобразования, объявленные в StreamHandler, будут применены к каждой партии DStream. Текущий код довольно неполный, чтобы дать вам определенный ответ. В конвейере преобразования DStream вам понадобится action that materializes the DStream, иначе ничего не произойдет.

Что касается подхода, функция, которая принимает DStream и применяет преобразования к нему было бы достаточно и легко проверить:

val pipeline:DStream[Data] =>() = dstream => 
    dstream.map(...).filter(...).print() 

Как она стоит, это не выглядит как строительство класса покупает много ,

+0

Спасибо. Я сократил фрагменты кода, чтобы сделать его более ясным. – Godraude

Смежные вопросы