2016-01-29 4 views
0

Кто-нибудь, пожалуйста, помогите мне, как создать DStream из существующего RDD. Мой код:Spark Streaming из существующего RDD

JavaSparkContext ctx = new JavaSparkContext(conf); 
JavaRDD<String> rddd = ctx.parallelize(arraylist); 

Теперь мне нужно использовать эти rddd в качестве входных данных для JavaStreamingContext.

+2

Что именно вы пытаетесь достичь? Необычно создавать поток из RDD, потому что RDD - это конечный набор данных, а потоковая передача обычно подразумевает непрерывную обработку данных. –

+0

Если вы хотите протестировать вещи и иметь DStream, на каждой итерации возвращается тот же RDD, вы можете использовать http://spark.apache.org/docs/latest/api/scala/#org.apache.spark.streaming. dstream.ConstantInputDStream, но вы не сказали точно, что вы пытаетесь сделать –

+0

Здравствуйте @AlexLarikov и @ Roberto Congiu .. Спасибо за ваш быстрый ответ. Мое требование к проекту заключается в том, что каждые 15 mints новые xml-файлы помещаются в сервер aws s3, а затем эти файлы анализируются и сохраняются в каком-то arraylist. теперь я должен хранить arraylist в базу данных cassandra. В настоящее время мой код находится в искровом ядре, но проблема в том, что я должен запускать код каждые 15 минут, чтобы сохранить arraylist в db ... Вот почему я ищу Spark Streaming – user4342532

ответ

2

Попробуйте queueStream API.
Очередь RDD как потока, каждый RDD, помещенный в очередь, будет обрабатываться как пакет данных в DStream и обрабатываться как поток.

public <T> InputDStream<T> queueStream(scala.collection.mutable.Queue<RDD<T>> queue, 
           boolean oneAtATime, 
           scala.reflect.ClassTag<T> evidence$15) 

Create an input stream from a queue of RDDs. In each batch, it will process either one or all of the RDDs returned by the queue. 
NOTE: Arbitrary RDDs can be added to queueStream, there is no way to recover data of those RDDs, so queueStream doesn't support checkpointing. 
+0

обратите внимание, что queueStream не поддерживает контрольную точку –

Смежные вопросы