2016-11-08 4 views
0

Я использую искровой залитый Kafk, и я пытаюсь получить тему Кафки сообщения с использованием этого подхода:Как получить тему сообщения kafka с помощью CreatDirectStream?

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple2[String, String]](ssc, kafkaParams, topics, (mmd: MessageAndMetadata[String, String]) => { (msg.topic, msg.message)}) 

Но Intellij не может импортировать kafka.message.MessageAndMetada. Я не смог найти этот импорт. Как получить тему? Есть ли другой способ получить его?

Целью этого является обработка сообщений из каждой темы по-другому, поэтому мне нужно знать тему для каждого сообщения.

ответ

0

Вы можете найти класс kafka.message.MessageAndMetada внутри kafka library, который предоставлен spark-streaming-kafka, поэтому, похоже, вам не хватает этой зависимости.

+0

Я использую интеграцию искры с версией kafka 0.10 и искру 1.6. Я думаю, что в этой библиотеке нет класса. Я не могу использовать ваш, потому что он использует искру 2.0 и не нашел ту же библиотеку с искру 1.6. –

Смежные вопросы