2015-06-25 4 views
0

Я пытаюсь решить эту problem в kaggle с помощью искры:Читать несколько файлов из каталога, используя искру

иерархия ввода, как это:

drivers/{driver_id}/trip#.csv 
e.g., drivers/1/1.csv 
     drivers/1/2.csv 
     drivers/2/1.csv 

Я хочу прочитать родительский каталог «драйверы» и для каждой подгруппы каталога я хотел бы создать pairRDD с ключом, как (sub_directory, имя_файла) и значение как содержание файла

this Я проверил ссылку и попытался использовать

val text = sc.wholeTextFiles("drivers") 
text.collect() 

это не удалось с ошибкой:

java.lang.ArrayIndexOutOfBoundsException: 0 
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.<init>(CombineFileInputFormat.java:591) 
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:283) 
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:243) 
    at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:267) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:884) 

, но когда я запускаю следующий код, он работает.

val text = sc.wholeTextFiles("drivers/1") 
text.collect() 

, но я не хочу, чтобы это сделать, так как здесь я должен буду читать каталог драйверов и петлевые файлы и вызвать wholeTextFiles для каждой записи.

+4

Пытались ли вы Вэл text = sc.wholeTextFiles ("drivers/*") –

+0

thanx. да, это сработало. –

ответ

1

Вместо использования

sc.textfile("path/*/**") or sc.wholeTextFiles("path/*") 

Вы можете использовать этот кусок кода. Потому что искров внутри перечисляет все возможные значения папки и подпапки, чтобы она могла стоить вам времени на больших наборах данных. Вместо этого вы можете использовать Unions для этой же цели.

передать этот объект List, который содержит адреса в следующей части кода, примечание: СБН является объектом SQLContext

var df: DataFrame = null; 
    for (file <- files) { 
    val fileDf= sc.textFile(file) 
    if (df!= null) { 
     df= df.unionAll(fileDf) 
    } else { 
     df= fileDf 
    } 
    } 

Теперь вы получили окончательное унифицированного RDD т.е. ДФ

Смежные вопросы