2015-05-20 3 views
5

Как определить имя темы из сообщения в kafka.Получить тему из сообщения kafka

String[] topics = { "test", "test1", "test2" }; 
    for (String t : topics) { 
     topicMap.put(t, new Integer(3)); 
    } 

SparkConf conf = new SparkConf().setAppName("KafkaReceiver") 
      .set("spark.streaming.receiver.writeAheadLog.enable", "false") 
      .setMaster("local[4]") 
      .set("spark.cassandra.connection.host", "localhost"); 
    ; 
    final JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(
      1000)); 

    /* Receive Kafka streaming inputs */ 
    JavaPairReceiverInputDStream<String, String> messages = KafkaUtils 
      .createStream(jssc, "localhost:2181", "test-group", 
        topicMap); 

    JavaDStream<MessageAndMetadata> data = 
      messages.map(new Function<Tuple2<String, String>, MessageAndMetadata>() 
      { 

       public MessageAndMetadata call(Tuple2<String, String> message) 
       { 
        System.out.println("message ="+message._2); 
        return null; 
       } 
      } 

     ); 

Я могу получить сообщение от производителя кафки. Но поскольку потребитель теперь потребляет три темы, необходимо определить название темы.

+0

Я очень заинтересован в ответе на это. Вы нашли способ? –

+0

@Arun: Вы нашли решение? Если да, не могли бы вы поделиться им? Благодаря! – jithinpt

ответ

0

К сожалению, это не так просто, поскольку KafkaReceiver и ReliableKafkaReceiver в исходном коде Spark хранят MessageAndMetadata.key и сообщение.

Есть два открытых билетов, связанные с этим вопросом в JIRA Спарк в:

, которые были открыты на некоторое время.

Грязный копировать/вставить/изменить исходный код Спарк, чтобы решить вашу проблему:

package org.apache.spark.streaming.kafka 

import java.lang.{Integer => JInt} 
import java.util.{Map => JMap, Properties} 

import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector} 
import kafka.serializer.{Decoder, StringDecoder} 
import kafka.utils.VerifiableProperties 
import org.apache.spark.Logging 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} 
import org.apache.spark.streaming.dstream.ReceiverInputDStream 
import org.apache.spark.streaming.receiver.Receiver 
import org.apache.spark.streaming.util.WriteAheadLogUtils 
import org.apache.spark.util.ThreadUtils 
import scala.collection.JavaConverters._ 
import scala.collection.Map 
import scala.reflect._ 

object MoreKafkaUtils { 

    def createStream(
    jssc: JavaStreamingContext, 
    zkQuorum: String, 
    groupId: String, 
    topics: JMap[String, JInt], 
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 
): JavaReceiverInputDStream[(String, String, String)] = { 
    val kafkaParams = Map[String, String](
     "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, 
     "zookeeper.connection.timeout.ms" -> "10000") 
    val walEnabled = WriteAheadLogUtils.enableReceiverLog(jssc.ssc.conf) 
    new KafkaInputDStreamWithTopic[String, String, StringDecoder, StringDecoder](jssc.ssc, kafkaParams, topics.asScala.mapValues(_.intValue()), walEnabled, storageLevel) 
    } 

} 

private[streaming] 
class KafkaInputDStreamWithTopic[ 
    K: ClassTag, 
    V: ClassTag, 
    U <: Decoder[_] : ClassTag, 
    T <: Decoder[_] : ClassTag](
    @transient ssc_ : StreamingContext, 
    kafkaParams: Map[String, String], 
    topics: Map[String, Int], 
    useReliableReceiver: Boolean, 
    storageLevel: StorageLevel 
) extends ReceiverInputDStream[(K, V, String)](ssc_) with Logging { 

    def getReceiver(): Receiver[(K, V, String)] = { 
    if (!useReliableReceiver) { 
     new KafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel) 
    } else { 
     new ReliableKafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel) 
    } 
    } 
} 

private[streaming] 
class KafkaReceiverWithTopic[ 
    K: ClassTag, 
    V: ClassTag, 
    U <: Decoder[_] : ClassTag, 
    T <: Decoder[_] : ClassTag](
    kafkaParams: Map[String, String], 
    topics: Map[String, Int], 
    storageLevel: StorageLevel 
) extends Receiver[(K, V, String)](storageLevel) with Logging { 

    // Connection to Kafka 
    var consumerConnector: ConsumerConnector = null 

    def onStop() { 
    if (consumerConnector != null) { 
     consumerConnector.shutdown() 
     consumerConnector = null 
    } 
    } 

    def onStart() { 

    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) 

    // Kafka connection properties 
    val props = new Properties() 
    kafkaParams.foreach(param => props.put(param._1, param._2)) 

    val zkConnect = kafkaParams("zookeeper.connect") 
    // Create the connection to the cluster 
    logInfo("Connecting to Zookeeper: " + zkConnect) 
    val consumerConfig = new ConsumerConfig(props) 
    consumerConnector = Consumer.create(consumerConfig) 
    logInfo("Connected to " + zkConnect) 

    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
     .newInstance(consumerConfig.props) 
     .asInstanceOf[Decoder[K]] 
    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
     .newInstance(consumerConfig.props) 
     .asInstanceOf[Decoder[V]] 

    // Create threads for each topic/message Stream we are listening 
    val topicMessageStreams = consumerConnector.createMessageStreams(
     topics, keyDecoder, valueDecoder) 

    val executorPool = 
     ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") 
    try { 
     // Start the messages handler for each partition 
     topicMessageStreams.values.foreach { streams => 
     streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } 
     } 
    } finally { 
     executorPool.shutdown() // Just causes threads to terminate after work is done 
    } 
    } 

    // Handles Kafka messages 
    private class MessageHandler(stream: KafkaStream[K, V]) 
    extends Runnable { 
    def run() { 
     logInfo("Starting MessageHandler.") 
     try { 
     val streamIterator = stream.iterator() 
     while (streamIterator.hasNext()) { 
      val msgAndMetadata = streamIterator.next() 
      store((msgAndMetadata.key, msgAndMetadata.message, msgAndMetadata.topic)) 
     } 
     } catch { 
     case e: Throwable => reportError("Error handling message; exiting", e) 
     } 
    } 
    } 

} 
+0

Вы также можете использовать Экспериментальный KafkaUtils.createDirectStream, который принимает в качестве параметра сообщениеHandler: JFunction [MessageAndMetadata [K, V], R]. –

1

Как искрового 1.5.0, official documentation рекомендует не используя-приемник/прямой подход, начиная с последних релизов, который имеет закончил экспериментальный в последние 1.5.0. Этот новый Direct API позволяет вам легко получать сообщения и метаданные, помимо других хороших вещей.

+0

Я использую прямой подход и не понимаю, как получить метаданные сообщений. Можете ли вы уточнить? –

+0

@BrandonBradley, см. Последний фрагмент кода в официальной документации по указанной выше ссылке. В принципе, вам нужно отдать RDD в HasOffsetRanges, как только вы его получите. –

Смежные вопросы