1

Я подсчитываю значения в каждом окне и нахожу верхние значения и хочу сохранить только 10 самых популярных значений каждого окна для hdfs, а не всех значений.Сохранение частичной искры Окно DStream для HDFS

eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2) 
    val counts = eegStreams(a).map(x => (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4)) 
    val sortedCounts = counts.map(_.swap).transform(rdd => rdd.sortByKey(false)).map(_.swap) 
    ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + (a+1))}  


    //sortedCounts.foreachRDD(rdd =>println("\nTop 10 amplitudes:\n" + rdd.take(10).mkString("\n"))) 
    sortedCounts.map(tuple => "%s,%s".format(tuple._1, tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + (a+1)) 

Я могу распечатать 10 лучших, как указано выше (прокомментировано).

Я также попытался

sortedCounts.foreachRDD{ rdd => ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + (a+1))} 

, но я получаю следующее сообщение об ошибке. Мой массив не сериализуемый

15/01/05 17:12:23 ОШИБКИ actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext java.io.NotSerializableException: org.apache.spark. streaming.StreamingContext

ответ

0

Вы можете попробовать это?

sortedCounts.foreachRDD(rdd => rdd.filterWith(ind => ind)((v, ind) => ind <= 10).saveAsTextFile(...)) 

Примечание: Я не проверял сниппет ...

0

Ваша первая версия должна работать. Просто объявите @transient ssc = ..., где создается Streaming Context.

Вторая версия не работает b/c StreamingContext не может быть сериализована в закрытии.