2016-09-09 2 views
1

Привет, Я пытаюсь читать твиты из Twitter с помощью Apache Spark Streaming и пытаться преобразовать в DataFrame. У меня есть подход, который я применил ниже. Тем не менее, я не могу понять правильный подход. Некоторые указатели будут приветствоваться.Преобразование DStream в кадр данных

Как видите, преобразование в DF внутри foreach не дает мне ни одного DF из tweetStream. У меня, вероятно, неправильный подход, поскольку я новичок в этом. Как я к этому подхожу?

val tweetStream = TwitterUtils.createStream(ssc, Utils.getAuth).filter(status=>status.getLang=="en") 
     .map(status=>gson.toJson(status)) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 
    tweetStream.foreachRDD({status=>val DF = status.toDF()}) 
+0

Я думал об использовании DF.merge() внутри цикла, чтобы получить весь DF, вычисленный внутри foreachRDD {} – Ayon

ответ

0

Я не пробовал, но, возможно, что-то вроде это работает:

var df_tweets:DataFrame = null 

    dstream_tweets.foreachRDD { 
    rrd => if (df_tweets != null) { 
     df_tweets = df_tweets.unionAll(rdd.toDF) // combine previous dataframe 
    } else { 
     df_tweets = rdd.toDF() // create new dataframe 
     } 
    } 
Смежные вопросы