2016-09-01 1 views
0

Я запускаю искрообразующую работу, которая убегает от Kafka. Я получаю сообщения в так:Spark Streaming GroupBy Части кортежа для обработки

val messageStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long, String)](ssc, getKafkaBrokers(), getKafkaTopics("raw"), (mmd: MessageAndMetadata[String, String]) => { 
    (mmd.topic, mmd.partition, mmd.offset, mmd.message) 
}) 

Теперь, когда я привожу данные в Я хочу, чтобы сгруппировать по теме и раздела, так все с одной и той же теме/раздела можно обрабатывать в одном пакете. Какую функцию можно использовать здесь

messageStream.foreachRDD(x => x.? 

Это группа? и если это groupBy, как я группирую первые две части кортежа, в которых я есть. KafkaRDD [0] будет иметь много сообщений в нем, поэтому я хочу сгруппировать их в группы похожих сообщений, чем иметь возможность обрабатывать каждый группировка как кусок против отдельных сообщений.

Edit: Так на основе ниже корма назад, так что я бы что-то вроде этого:

messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => { 
     ? 
    })) 

Является ли это сейчас в K, V, как K является (тема, раздел), значение (смещение , тема)? Мне нужна первая и вторая части кортежа, потому что это позволит мне сделать вызов API, чтобы получить инструкции о том, что делать с сообщением. То, что я не хочу делать, это индивидуально вызывать API для каждого сообщения, потому что многие из них имеют один и тот же набор команд, основанный на теме/разделе.

Edit: Понял, что речь идет сейчас, как:

K: (тема, раздел) V: CompactBuffer ((тема, раздел, Offset, Message),()) и т.д.

messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => { 
      val topic = x._1_.1 
      val partition = x._1._2 
      x._2.forEach(x=> ... 
     })) 

ответ

1

groupBy первых двух частей в кортеже, вы можете попробовать следующее:

messageStream groupBy (x => (x._1, x._2)) 
+0

Отредактировано мое мнение, чтобы опираться на то, что вы сказали. – theMadKing

+0

Значение по-прежнему является 4-кортежем. Если вам нужен только третий и четвертый элементы, вы должны сделать «mapValues» (v => (v._3, v._4)). – ryan

+0

Мне нужна первая и вторая части кортежа, потому что это позволит мне сделать вызов API, чтобы получить инструкции о том, что делать с сообщением. Мне нужна третья часть, потому что у меня есть собственный менеджер смещения. То, что я не хочу делать, это индивидуально вызывать API для каждого сообщения, потому что многие из них имеют один и тот же набор команд, основанный на теме/разделе. – theMadKing

Смежные вопросы