Я пытаюсь разработать приложение, которое принимает четыре разных темы с сервера kafka и принимает конкретные действия с каждой темой.чтение из нескольких тем
Я создал класс, который получает DStream и имеет метод, который должен преобразовать DStream.
Например, класс обработчика:
class StreamHandler(stream:DStream[String]) {
val stream:DStream[String] = stream
def doActions():DStream[String] = {
//Do smth. to DStream
}
}
А теперь представьте, я называю doActions() из основного класса для каждого класса обработчика Я хочу, что это будет повторяться с каждым прибывающим DStream или только один раз?
val topicHandler1 = new StreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic1"->1)).map(_._2)
val topicHandler2 = new OtherStreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic2"->1)).map(_._2)
topicHandler1.doActions()
topicHandler2 .doActions()
ssc.start()
Есть ли лучший подход?
Я думаю, вы хотите сказать 'topicHandler1.doActions()' вместо этого. – maasg
Да, я ошибался, что – Godraude