2014-12-31 2 views
4

Я только начиная с искрой & ScalaSpark: Как превратить Seq из РДА в РД

У меня есть каталог с несколькими файлами в ней я успешно загрузить их с помощью

sc.wholeTextFiles(directory) 

Теперь я хочу идти на один уровень вверх. На самом деле у меня есть каталог, содержащий вспомогательные каталоги, содержащие файлы. Моя цель - получить RDD[(String,String)], чтобы я мог двигаться вперед, где RDD представляет имя и содержимое файла.

Я попытался следующие:

val listOfFolders = getListOfSubDirectories(rootFolder) 
val input = listOfFolders.map(directory => sc.wholeTextFiles(directory)) 

, но я получил Seq[RDD[(String,String)]] Как превратить этот Seq в RDD[(String,String)]?

Возможно, я не делаю все правильно, и я должен попробовать другой подход?

Редактировать: добавлен код

// HADOOP VERSION 
val rootFolderHDFS = "hdfs://****/" 
val hdfsURI = "hdfs://****/**/" 

// returns a list of folders (currently about 800) 
val listOfFoldersHDFS = ListDirectoryContents.list(hdfsURI,rootFolderHDFS) 
val inputHDFS = listOfFoldersHDFS.map(directory => sc.wholeTextFiles(directory)) 
// RDD[(String,String)] 
// val inputHDFS2 = inputHDFS.reduceRight((rdd1,rdd2) => rdd2 ++ rdd1) 
val init = sc.parallelize(Array[(String, String)]()) 
val inputHDFS2 = inputHDFS.foldRight(init)((rdd1,rdd2) => rdd2 ++ rdd1) 

// returns org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError 
println(inputHDFS2.count) 

ответ

2

Вместо того чтобы загружать каждый каталог в отдельный RDD, можете ли вы просто использовать пустую wildcard для загрузки всех каталогов в один RDD?

Учитывая следующее дерево каталогов ...

$ tree test/spark/so 
test/spark/so 
├── a 
│   ├── text1.txt 
│   └── text2.txt 
└── b 
    ├── text1.txt 
    └── text2.txt 

Создания RDD с шаблоном для каталога.

scala> val rdd = sc.wholeTextFiles("test/spark/so/*/*") 
rdd: org.apache.spark.rdd.RDD[(String, String)] = test/spark/so/*/ WholeTextFileRDD[16] at wholeTextFiles at <console>:37 

Count is 4, как и следовало ожидать.

scala> rdd.count 
res9: Long = 4 

scala> rdd.collect 
res10: Array[(String, String)] = 
Array((test/spark/so/a/text1.txt,a1 
a2 
a3), (test/spark/so/a/text2.txt,a3 
a4 
a5), (test/spark/so/b/text1.txt,b1 
b2 
b3), (test/spark/so/b/text2.txt,b3 
b4 
b5)) 
+0

, который вы меня спасаете. Не знал, что вы можете использовать двойные подстановочные знаки, спасибо !!!! – Stephane

+0

Я нахожу это решение невероятно медленным. Это похоже на однопоточность или что-то в этом роде. Я могу загружать одни и те же данные намного быстрее, создавая список RDD по одному для каждого каталога, хотя у меня возникают проблемы с переполнением стека при их объединении. – Brian

+0

@Brian - Я не понимаю, почему это будет медленнее, не увидев вашу реализацию и то, что вы пытаетесь сделать. Сделать новую запись и ссылку на нее? – climbage

3

Вы можете уменьшить на Seq, как это (конкатенации RDD сек с ++):

val reduced: RDD[(String, String)] = input.reduce((left, right) => left ++ right) 

Несколько более подробно, почему мы можем применить reduce здесь:

  • ++ ассоциативно - это не имеет значения, вы rdda ++ (rddb ++ rddc) или (rdda ++ rddb) ++ rddc
  • предполагается Seq непусто (иначе fold бы лучший выбор, это потребует пустой RDD[(String, String)] в качестве исходного аккумулятора).

В зависимости от конкретного типа Seq, вы можете получить StackOverflow, так что будьте осторожны и тестом с большей коллекцией, хотя для стандартной библиотеки я думаю, что это безопасно.

+0

привет! Я использовал союз непосредственно перед вашим ответом и получил ошибку StackOverflow ... теперь я использую ++, и я все еще получаю один ... что случилось? org.apache.spark.SparkException: работа прерывается из-за срыва этапа: сериализация задачи не выполнена: java.lang.StackOverflowError – Stephane

+0

Хм. В этом случае попробуйте 'reduceRight' /' foldRight'. Это может избежать этого. (Какую версию Scala вы используете?) –

+0

Использую 2.10.4. Что касается foldRight, как бы вы могли написать эту функцию? (мне нужно начать с элемента, но я не знаю, как создать пустойRDD) – Stephane

1

Вы должны использовать union при условии искровым контексте

val rdds: Seq[RDD[Int]] = (1 to 100).map(i => sc.parallelize(Seq(i))) 
val rdd_union: RDD[Int] = sc.union(rdds) 
Смежные вопросы