2016-06-13 2 views
1

Как преобразовать коллекцию, возвращенную после вызова take(5), другому RDD, поэтому я могу сохранить первые 5 записей в выходном файле?Spark: scala - как преобразовать коллекцию из RDD в другой RDD

Если я использую saveAsTextfile, я не могу использовать take и saveAsTextFile вместе (вот почему вы видите эту строку, прокомментированную ниже). Он хранит все записи из RDD в отсортированном порядке, поэтому первые 5 рецензий относятся к 5 странам, но я хочу сохранить только первые 5 записей - возможно ли конвертировать коллекции [take (5)] в RDD?

val Strips = txtFileLines.map(_.split(",")) 
         .map(line => (line(0) + "," + (line(7).toInt + line(8).toInt))) 
         .sortBy(x => x.split(",")(1).trim().toInt, ascending=false) 
         .take(5) 
         //.saveAsTextFile("output\\country\\byStripsBar") 

Решение: sc.parallelize(Strips, 1).saveAsTextFile("output\\country\\byStripsBar")

ответ

2
val rowsArray: Array[Row] = rdd.take(5) 
val slicedRdd = sparkContext.parallelize(rowsArray, 1) 

slicedRdd.savesTextFile("specify path here") 
1

Если вы абсолютно не нужны saveAsTextFile форматирование, я бы просто напечатать take(5) вывод в файл с помощью простого ввода-вывода (как File).

В противном случае, здесь многословны RDD единственное решение:

scala> val rdd = sc.parallelize(5 to 1 by -1 map{x => (x, x*x)}) 
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[71] at parallelize at <console>:27 

scala> rdd.collect 
res1: Array[(Int, Int)] = Array((5,25), (4,16), (3,9), (2,4), (1,1)) 

scala> val top2 = rdd.sortBy(_._1).zipWithIndex.collect{case x if (x._2 < 2) => x._1} 
top2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[79] at collect at <console>:29 

scala> top2.collect 
res2: Array[(Int, Int)] = Array((1,1), (2,4)) 
Смежные вопросы