2016-10-03 2 views
0

Я работаю над приложением Spark, которое должно читать несколько каталогов (т. Е. Несколько путей) из S3 Bucket и HDFS. Я читал, что newHadoopAPI обеспечивает отличный способ чтения сжатых/проиндексированных файлов Lzo в хорошем исполнении. Но как мы читаем несколько путей/каталогов папок, есть несколько файлов Lzo и файлы индексов в RDD, используя newHadoopAPI?Несколько входных путей в newHadoopAPI для искры для чтения файлов Lzo

Структура папки похожа на разделенную таблицу улья на двух столбцах. Пример: как показано ниже. Перегородка по дате и партии

/rootDirectory/date=20161002/batch=5678/001_0.lzo /rootDirectory/date=20161002/batch=5678/001_0.lzo.index /RootDirectory/дата = 20161002/партия = 5678/002_0.lzo /rootDirectory/date=20161002/batch=5678/002_0.lzo.index /rootDirectory/date=20161002/batch=8765/001_0.lzo /RootDirectory/дата = 20161002/партии = 8765 /001_0.lzo.index /rootDirectory/date=20161002/batch=8765/002_0.lzo /rootDirectory/date=20161002/batch=8765/002_0.lzo.index

..... и прочее.

Теперь я использую приведенный ниже код для чтения данных с S3. Это относится как к файлам Lzo, так и к файлам Lzo.Index в качестве входных данных, которые разбивают мое приложение, поскольку я не хочу читать файлы .lzo.index, но только файлы .lzo, используя индекс для скорости.

val impInput = sparkSession.sparkContext.newAPIHadoopFile("s3://my-bucket/myfolder/*/*", classOf[NonSplittableTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text]) 
    val impRDD = impInput.map(_._2.toString) 

Может ли кто-нибудь помочь мне разобраться, как я могу это сделать?

1). Прочитайте все (mulitple) папки под корнем для файлов Lzo с помощью newHadoopAPI, чтобы я мог использовать файл .index для моей выгоды.

2). Прочитайте данные из HDFS аналогичным образом.

+0

Попробуйте https://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%[email protected].com%3E –

+0

Спасибо @AyanGuha - Но это, похоже, не работает. В примере есть большой мясистый файл объемом 78 ГБ (.lzo), и он использует его для чтения с использованием индексов. В моем случае у меня есть несколько небольших файлов и кажется, что я не получаю никакой пользы от использования файлов Lzo из-за большого количества небольших файлов. Иерархия показана выше. Любая идея по этому вопросу? –

ответ

0

Добавление суффикса к вашему пути HDFS может помочь.

val impInput = sparkSession.sparkContext.newAPIHadoopFile("s3://my-bucket/myfolder/*/*.lzo", classOf[NonSplittableTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text]) 
Смежные вопросы