В основном я потребляю данные из нескольких тем кафки, используя одного пользователя Spark Streaming [Direct Approach].Как преобразовать DStream количества RDD в Single RDD
val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)
Сегментный интервал 30 Seconds
.
У меня есть несколько вопросов здесь.
- Будет ли DStream содержать несколько RDD вместо Single RDD, когда я вызываю foreachRDD на DStream? будет ли каждая тема создавать отдельные RDD?
- Если да, я хочу объединить все RDD в одиночный RDD, а затем обработать данные. Как мне это сделать?
- Если мое время обработки больше, чем периодический интервал, будет ли DStream содержать более одного RDD?
Я попытался подключить DSTream RDD к одиночному RDD, используя нижеследующий путь. Прежде всего, мое понимание правильное? Если DStream всегда возвращает одиночный RDD, тогда код ниже не требуется.
Пример кода:
var dStreamRDDList = new ListBuffer[RDD[String]]
dStream.foreachRDD(rdd =>
{
dStreamRDDList += rdd
})
val joinedRDD = ssc.sparkContext.union(dStreamRDDList).cache()
//THEN PROCESS USING joinedRDD
//Convert joinedRDD to DF, then apply aggregate operations using DF API.
Спасибо, я прочитал ваш пост и вернуться ...:) – Shankar