Я запускаю искрообразующую работу, которая убегает от Kafka. Я получаю сообщения в так:Spark Streaming GroupBy Части кортежа для обработки
val messageStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long, String)](ssc, getKafkaBrokers(), getKafkaTopics("raw"), (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic, mmd.partition, mmd.offset, mmd.message)
})
Теперь, когда я привожу данные в Я хочу, чтобы сгруппировать по теме и раздела, так все с одной и той же теме/раздела можно обрабатывать в одном пакете. Какую функцию можно использовать здесь
messageStream.foreachRDD(x => x.?
Это группа? и если это groupBy, как я группирую первые две части кортежа, в которых я есть. KafkaRDD [0] будет иметь много сообщений в нем, поэтому я хочу сгруппировать их в группы похожих сообщений, чем иметь возможность обрабатывать каждый группировка как кусок против отдельных сообщений.
Edit: Так на основе ниже корма назад, так что я бы что-то вроде этого:
messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
?
}))
Является ли это сейчас в K, V, как K является (тема, раздел), значение (смещение , тема)? Мне нужна первая и вторая части кортежа, потому что это позволит мне сделать вызов API, чтобы получить инструкции о том, что делать с сообщением. То, что я не хочу делать, это индивидуально вызывать API для каждого сообщения, потому что многие из них имеют один и тот же набор команд, основанный на теме/разделе.
Edit: Понял, что речь идет сейчас, как:
K: (тема, раздел) V: CompactBuffer ((тема, раздел, Offset, Message),()) и т.д.
messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
val topic = x._1_.1
val partition = x._1._2
x._2.forEach(x=> ...
}))
Отредактировано мое мнение, чтобы опираться на то, что вы сказали. – theMadKing
Значение по-прежнему является 4-кортежем. Если вам нужен только третий и четвертый элементы, вы должны сделать «mapValues» (v => (v._3, v._4)). – ryan
Мне нужна первая и вторая части кортежа, потому что это позволит мне сделать вызов API, чтобы получить инструкции о том, что делать с сообщением. Мне нужна третья часть, потому что у меня есть собственный менеджер смещения. То, что я не хочу делать, это индивидуально вызывать API для каждого сообщения, потому что многие из них имеют один и тот же набор команд, основанный на теме/разделе. – theMadKing