Я только начиная с искрой & ScalaSpark: Как превратить Seq из РДА в РД
У меня есть каталог с несколькими файлами в ней я успешно загрузить их с помощью
sc.wholeTextFiles(directory)
Теперь я хочу идти на один уровень вверх. На самом деле у меня есть каталог, содержащий вспомогательные каталоги, содержащие файлы. Моя цель - получить RDD[(String,String)]
, чтобы я мог двигаться вперед, где RDD
представляет имя и содержимое файла.
Я попытался следующие:
val listOfFolders = getListOfSubDirectories(rootFolder)
val input = listOfFolders.map(directory => sc.wholeTextFiles(directory))
, но я получил Seq[RDD[(String,String)]]
Как превратить этот Seq
в RDD[(String,String)]
?
Возможно, я не делаю все правильно, и я должен попробовать другой подход?
Редактировать: добавлен код
// HADOOP VERSION
val rootFolderHDFS = "hdfs://****/"
val hdfsURI = "hdfs://****/**/"
// returns a list of folders (currently about 800)
val listOfFoldersHDFS = ListDirectoryContents.list(hdfsURI,rootFolderHDFS)
val inputHDFS = listOfFoldersHDFS.map(directory => sc.wholeTextFiles(directory))
// RDD[(String,String)]
// val inputHDFS2 = inputHDFS.reduceRight((rdd1,rdd2) => rdd2 ++ rdd1)
val init = sc.parallelize(Array[(String, String)]())
val inputHDFS2 = inputHDFS.foldRight(init)((rdd1,rdd2) => rdd2 ++ rdd1)
// returns org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
println(inputHDFS2.count)
, который вы меня спасаете. Не знал, что вы можете использовать двойные подстановочные знаки, спасибо !!!! – Stephane
Я нахожу это решение невероятно медленным. Это похоже на однопоточность или что-то в этом роде. Я могу загружать одни и те же данные намного быстрее, создавая список RDD по одному для каждого каталога, хотя у меня возникают проблемы с переполнением стека при их объединении. – Brian
@Brian - Я не понимаю, почему это будет медленнее, не увидев вашу реализацию и то, что вы пытаетесь сделать. Сделать новую запись и ссылку на нее? – climbage