2015-07-15 4 views
1

Я в настоящее время изучаю Scala & пытался создать SimpleConsumer для извлечения сообщений из раздела Kafka.Создайте простого пользователя Kafka с помощью Scala

Потребитель должен иметь возможность обрабатывать следующие задачи:

  1. Следите за смещениями.
  2. Укажите, какой брокер является ведущим брокером для темы и раздела
  3. Должен быть в состоянии справиться с изменениями в брокерах.

Я смог найти очень хорошую документацию для создания этого потребителя на Java (https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example).

Есть ли у кого-нибудь образец кода Scala для создания этого простого потребителя или если вы можете направить мне какую-то документацию, которая укажет мне в правильном направлении, это будет очень полезно.

+0

Что произойдет, если в случае, если ваш потребитель идет вниз? Вы где-то упорствуете Смещение? – sparkr

+0

@sparkr Немного поздно, но у вас есть несколько вариантов, если ваш потребитель опустится вниз, использовал хранилище kafka или вы можете использовать внешнее хранилище, например Zookeeper или Hbase. Очень хорошая статья об этом находится здесь: https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark -streaming/ – dbustosp

ответ

5

Вот пример кода простого потребителя Kafka, написанного на Scala. Он работал после нескольких проб и ошибок.

package com.Kafka.Consumer 

import kafka.api.FetchRequest 
import kafka.api.FetchRequestBuilder 
import kafka.api.PartitionOffsetRequestInfo 
import kafka.common.ErrorMapping 
import kafka.common.TopicAndPartition 
import kafka.javaapi._ 
import kafka.javaapi.consumer.SimpleConsumer 
import kafka.message.MessageAndOffset 
import java.nio.ByteBuffer 
import java.util.ArrayList 
import java.util.Collections 
import java.util.HashMap 
import java.util.List 
import java.util.Map 
import SimpleExample._ 

//remove if not needed 
import scala.collection.JavaConversions._ 

object SimpleExample { 

    def main(args: Array[String]) { 
    val example = new SimpleExample() 
    val maxReads = java.lang.Integer.parseInt(args(0)) 
    val topic = args(1) 
    val partition = java.lang.Integer.parseInt(args(2)) 
    val seeds = new ArrayList[String]() 
    seeds.add(args(3)) 
    val port = java.lang.Integer.parseInt(args(4)) 
    try { 
     example.run(maxReads, topic, partition, seeds, port) 
    } catch { 
     case e: Exception => { 
     println("Oops:" + e) 
     e.printStackTrace() 
     } 
    } 
    } 

    def getLastOffset(consumer: SimpleConsumer, 
     topic: String, 
     partition: Int, 
     whichTime: Long, 
     clientName: String): Long = { 
    val topicAndPartition = new TopicAndPartition(topic, partition) 
    val requestInfo = new HashMap[TopicAndPartition, PartitionOffsetRequestInfo]() 
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)) 
    val request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion, clientName) 
    val response = consumer.getOffsetsBefore(request) 
    if (response.hasError) { 
     println("Error fetching data Offset Data the Broker. Reason: " + 
     response.errorCode(topic, partition)) 
     return 0 
    } 
    val offsets = response.offsets(topic, partition) 
    offsets(0) 
    } 
} 

class SimpleExample { 

    private var m_replicaBrokers: List[String] = new ArrayList[String]() 

    def run(a_maxReads: Int, 
     a_topic: String, 
     a_partition: Int, 
     a_seedBrokers: List[String], 
     a_port: Int) { 
    val metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition) 
    if (metadata == null) { 
     println("Can't find metadata for Topic and Partition. Exiting") 
     return 
    } 
    if (metadata.leader == null) { 
     println("Can't find Leader for Topic and Partition. Exiting") 
     return 
    } 
    var leadBroker = metadata.leader.host 
    val clientName = "Client_" + a_topic + "_" + a_partition 
    var consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName) 
    var readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime, clientName) 
    var numErrors = 0 
    //while (a_maxReads > 0) { 
     if (consumer == null) { 
     consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName) 
     } 
     val req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 
     100000) 
     .build() 
     val fetchResponse = consumer.fetch(req) 
     if (fetchResponse.hasError) { 
     numErrors += 1 
     val code = fetchResponse.errorCode(a_topic, a_partition) 
     println("Error fetching data from the Broker:" + leadBroker + 
      " Reason: " + 
      code) 
     if (numErrors > 5) //break 
     if (code == ErrorMapping.OffsetOutOfRangeCode) { 
      readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime, clientName) 
      //continue 
     } 
     consumer.close() 
     consumer = null 
     leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port) 
     //continue 
     } 
     numErrors = 0 
     var numRead = 0 
     for (messageAndOffset <- fetchResponse.messageSet(a_topic, a_partition)) { 
     val currentOffset = messageAndOffset.offset 
     if (currentOffset < readOffset) { 
      println("Found an old offset: " + currentOffset + " Expecting: " + 
      readOffset) 
      //continue 
     } 
     readOffset = messageAndOffset.nextOffset 
     val payload = messageAndOffset.message.payload 
     val bytes = Array.ofDim[Byte](payload.limit()) 
     payload.get(bytes) 
     println(String.valueOf(messageAndOffset.offset) + ": " + new String(bytes, "UTF-8")) 
     numRead += 1 
     // a_maxReads -= 1 
     } 
     if (numRead == 0) { 
     try { 
      Thread.sleep(1000) 
     } catch { 
      case ie: InterruptedException => 
     } 
     } 
    //} 
    if (consumer != null) consumer.close() 
    } 

    private def findNewLeader(a_oldLeader: String, 
     a_topic: String, 
     a_partition: Int, 
     a_port: Int): String = { 
    for (i <- 0 until 3) { 
     var goToSleep = false 
     val metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition) 
     if (metadata == null) { 
     goToSleep = true 
     } else if (metadata.leader == null) { 
     goToSleep = true 
     } else if (a_oldLeader.equalsIgnoreCase(metadata.leader.host) && i == 0) { 
     goToSleep = true 
     } else { 
     return metadata.leader.host 
     } 
     if (goToSleep) { 
     try { 
      Thread.sleep(1000) 
     } catch { 
      case ie: InterruptedException => 
     } 
     } 
    } 
    println("Unable to find new leader after Broker failure. Exiting") 
    throw new Exception("Unable to find new leader after Broker failure. Exiting") 
    } 

    private def findLeader(a_seedBrokers: List[String], 
     a_port: Int, 
     a_topic: String, 
     a_partition: Int): PartitionMetadata = { 
    var returnMetaData: PartitionMetadata = null 

    for (seed <- a_seedBrokers) { 
     var consumer: SimpleConsumer = null 
     try { 
     consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup") 
     val topics = Collections.singletonList(a_topic) 
     val req = new TopicMetadataRequest(topics) 
     val resp = consumer.send(req) 
     val metaData = resp.topicsMetadata 
     for (item <- metaData; part <- item.partitionsMetadata){ 
      if (part.partitionId == a_partition) { 
      returnMetaData = part 
     //break 
     } 
     } 
     } catch { 
     case e: Exception => println("Error communicating with Broker [" + seed + "] to find Leader for [" + 
      a_topic + 
      ", " + 
      a_partition + 
      "] Reason: " + 
      e) 
     } finally { 
     if (consumer != null) consumer.close() 
     } 
    } 
    if (returnMetaData != null) { 
     m_replicaBrokers.clear() 
     for (replica <- returnMetaData.replicas) { 
     m_replicaBrokers.add(replica.host) 
     } 
    } 
    returnMetaData 
    } 
} 
+0

может обеспечить время выполнения выборки аргументы основному методу – Aamir

+0

В примере ожидаются следующие параметры: 1. Максимальное количество сообщений для чтения (поэтому мы не зацикливаемся навсегда) 2. Тема для чтения от 3. Перечень для чтения из 4. Один брокер для использование для поиска метаданных 5. Порт брокеры слушать на –

+8

Это не Scala, а скорее Java в маскировке! – sparkr

0

Я построил простого потребителя и производителя кафки, используя scala.

потребитель:

package com.kafka 

import java.util.concurrent._ 
import java.util.{Collections, Properties} 

import com.sun.javafx.util.Logging 
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} 

import scala.collection.JavaConversions._ 

class Consumer(val brokers: String, 
       val groupId: String, 
       val topic: String) extends Logging { 

    val props = createConsumerConfig(brokers, groupId) 
    val consumer = new KafkaConsumer[String, String](props) 
    var executor: ExecutorService = null 

    def shutdown() = { 
    if (consumer != null) 
     consumer.close() 
    if (executor != null) 
     executor.shutdown() 
    } 

    def createConsumerConfig(brokers: String, groupId: String): Properties = { 
    val props = new Properties() 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") 
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") 
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") 
    props 
    } 

    def run() = { 
    consumer.subscribe(Collections.singletonList(this.topic)) 

    Executors.newSingleThreadExecutor.execute(new Runnable { 
     override def run(): Unit = { 
     while (true) { 
      val records = consumer.poll(1000) 

      for (record <- records) { 
      System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()) 
      } 
     } 
     } 
    }) 
    } 
} 

object Consumer extends App{ 
    val newArgs = Array("localhost:9092", "2","test") 
    val example = new Consumer(newArgs(0), newArgs(1), newArgs(2)) 
    example.run() 
} 

производитель:

package com.kafka 

import java.util.{Date, Properties} 

import org.apache.kafka.clients.producer.KafkaProducer 
import org.apache.kafka.clients.producer.ProducerRecord 

object Producer extends App{ 
    val newArgs = Array("20","test","localhost:9092") 
    val events = newArgs(0).toInt 
    val topic = newArgs(1) 
    val brokers = newArgs(2) 
    val props = new Properties() 
    props.put("bootstrap.servers", brokers) 
    props.put("client.id", "producer") 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 


    val producer = new KafkaProducer[String, String](props) 
    val t = System.currentTimeMillis() 
    for (nEvents <- Range(0, events)) { 
    val key = "messageKey " + nEvents.toString 
    val msg = "test message" 
    val data = new ProducerRecord[String, String](topic, key, msg) 

    //async 
    //producer.send(data, (m,e) => {}) 
    //sync 
    producer.send(data) 
    } 

    System.out.println("sent per second: " + events * 1000/(System.currentTimeMillis() - t)) 
    producer.close() 
} 
+0

какая версия кафки вы используете? (SBT/Maven) – ItayB

Смежные вопросы