2016-02-29 2 views
0

Я использую API-интерфейс Spark Streaming, я просто хотел лучше понять, как лучше всего разработать код.Spark Streaming Элемент DStream vs RDD

настоящее время я использую Кафка потребителей (в pyspark) от pyspark.streaming.kafka.createDirectStream

Согласно http://spark.apache.org/docs/latest/streaming-programming-guide.html

Спарк Streaming обеспечивает абстракции высокого уровня называется дискрети- поток или DStream , который представляет собой непрерывный поток данных. DStreams могут быть созданы либо из входных потоков данных из источников , таких как Kafka, Flume и Kinesis, либо путем применения высокоуровневых операций на других DStreams. Внутренне, DStream представляется как последовательностью RDD.

По существу, я хочу применить набор функций к каждому из элементов в DStream. В настоящее время я использую функцию «map» для pyspark.streaming.DStream. Согласно документации, мой подход кажется правильным. http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream

отображение (е, preservesPartitioning = False) Возвращает новый DStream, применяя функцию к каждому элементу DStream.

Должен ли я использовать карту или был бы правильным подход применять функции/преобразования к RDD (поскольку DStream использует RDD)?

foreachRDD (func) Примените функцию к каждому RDD в этом DStream.

Больше Docs: http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html

ответ

1

DirectStream.map является правильным выбором здесь. После map:

stream.map(f) 

эквивалентно:

stream.transform(lambda rdd: rdd.map(f)) 

DirectStream.foreachRDD с другой стороны, является выходным действием и создает выход DStream. Функция, которую вы используете с foreachRDD, не должна возвращать ничего, как и сам метод. Это очевидно, когда взгляните на подпись Scala:

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit 
+0

Благодарим вас за отзыв! Я провел некоторое исследование и изучил документацию. По сути, я хочу сделать все аспекты преобразования/массажа через функции mapping/transform/filter. Чтобы вставить в базу данных (т. Е. Некоторые «боковые функции»), я буду использовать .foreachRDD –

+0

. Это не очень полезный подход, выходящий за рамки очень простых приложений. Любое преобразование, которое применяется внутри 'foreachRDD', теряется при выходе из его закрытия. Таким образом, это означает, что вы можете применять только «линейные» рабочие процессы. – zero323

+0

Что такое полезный подход, помимо простых приложений? Не могли бы вы подробнее остановиться на этом, не зная, что вы имели в виду. Что касается foreachRDD, то я намерен использовать это только для вставлять данные в базу данных. –

Смежные вопросы