2016-08-04 2 views
5

Я пытаюсь внедрить потребителя kafka в scala. Я видел миллион учебников для того, как это сделать на Java, и даже некоторых (like this one), которые говорят, что это для scala, но написано на Java.Как реализовать Kafka Consumer в Scala

Кто-нибудь знает, где я могу найти пример, как написать его в Scala? Я только начал изучать Scala, поэтому, возможно, связанный пример можно использовать в Scala, даже если он написан на Java или что-то в этом роде, но я честно понятия не имею, что я делаю в данный момент. Все, что я говорю, просто связывает меня с тем, как это сделать на Java.

+0

Вы можете использовать весь код Java в Scala с очень небольшими изменениями. –

+0

Могу я просто сделать класс на Java, а затем просто импортировать его в класс, в котором я хочу его использовать? Или мне нужно будет переписать все переменные и вещи вместо scala? – annedroiid

+0

Nevermind, мой тест scala не узнает класс java. Вот класс в Java (http://pastebin.com/tnS9Amie), я просто не знаю достаточно о scala, чтобы преобразовать его. Похоже ли, что это будет возможно? – annedroiid

ответ

4

Причина, по которой вы видите большинство примеров на Java, заключается в том, что новый KafkaProducer, начинающийся с 0.8.2.2, написан на Java.

Предполагая, что вы используете SBT в системе сборки, и предполагается, что ваш работы с Кафкой 0.8.2.2 (вы можете изменить версию по мере необходимости), вам нужно:

libraryDependencies ++= { 
    Seq(
    "org.apache.kafka" %% "kafka" % "0.8.2.2", 
    "org.apache.kafka" % "kafka-clients" % "0.8.2.2", 
) 
} 

Простой пример должен вы начали:

import scala.collection.JavaConverters._ 
import org.apache.kafka.clients.consumer.KafkaConsumer 
import org.apache.kafka.common.serialization.StringDeserializer 

object KafkaExample { 
    def main(args: Array[String]): Unit = { 
    val properties = new Properties() 
    properties.put("bootstrap.servers", "localhost:9092") 
    properties.put("group.id", "consumer-tutorial") 
    properties.put("key.deserializer", classOf[StringDeserializer]) 
    properties.put("value.deserializer", classOf[StringDeserializer]) 

    val kafkaConsumer = new KafkaConsumer[String, String](properties) 
    kafkaConsumer.subscribe("firstTopic", "secondTopic") 

    while (true) { 
     val results = kafkaConsumer.poll(2000).asScala 
     for ((topic, data) <- results) { 
     // Do stuff 
     } 
    } 
} 
+0

Разве потребитель не должен разговаривать с клиентами zookeeper, а не с брокерами? –

+1

@AvihooMamka Kafka больше не требует «ZooKeeper для отслеживания смещений. Это зависит от вас, как вы это делаете. И вообще, потребитель разговаривает с брокерами для потребления. –

+0

'results' всегда' null'. Что я пропустил? – ItayB

1

вы также можете посмотреть на рабочий шаблон полностью построить на Scala здесь: https://github.com/knoldus/activator-kafka-scala-producer-consumer Это приложение содержит код, который вы хотите использовать здесь.

Надеюсь, я решил вашу проблему, спасибо!

Смежные вопросы