2017-01-16 5 views
0

Я хочу прочитать более 180 000 xmls на искро и начать процесс синтаксического анализа на каждом из них, чтобы в конце концов я получал файлы csv в виде таблиц. Эти xmls сжаты в файле bz2. зная, что у меня есть 2 версии этого bz2: , содержащий конкатенированные xmls (30mb каждый) и один xmls. Проблема заключается в том, что я получаю сообщение об ошибке ИСПОЛНИТЕЛЬ потерял при чтении одного XMLs версиичитать очень большой набор данных из xml-файлов на искру

sc.newAPIHadoopFile(path, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", conf={"textinputformat.record.delimiter": '\n\n'}).map(lambda num_line: num_line[1])

и java.io.IOException: Too many bytes before newline когда Ретем с

sc.textFile(hdfs_input_path).persist(pyspark.StorageLevel.MEMORY_AND_DISK) 

при использовании следующий код для чтения сцепленный версии XML

sc.newAPIHadoopFile(path, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", conf={"textinputformat.record.delimiter": '</delimiter>'}).map(lambda num_line: num_line[1]) 

он работает иногда только в том случае, когда количество файлов xml не слишком велика, как приблизительно 180 000.

Так что мой вопрос в том, как я могу достичь вышеуказанной задачи с сжатым файлом формата bz2? есть ли другой формат, который мог бы работать? (Gzip не расщепляется ...) Давайте забудем о сжатии с помощью bz2, есть ли другой способ сделать это?

Иам используя CDH 5.7.0 с 283.20 ГБ физической памяти и 42 VCores и 6 активных узлов

+0

У вас есть 180.000 xml файлов? и каждый файл составляет 30 МБ? вы можете попытаться загрузить пакетный пакет и обработать каждую партию, тогда вы можете комбинировать –

+0

@NarendraParmar входные файлы raw xml, в качестве вывода у меня есть 11 файлов .csv, которые соответствуют 11 таблицам в улье. – sdikby

ответ

0

Я решил свою собственную проблему. Поскольку у меня есть xml файлов, сжатых в bz2 файле, я изменил разделитель так, что искры отделяющую xmls после него в conf={"textinputformat.record.delimiter": '<\delimiter>'}, а затем я добавил .map() или .filter() операции для очистки отделенной xmls, т.е. удалить некоторые неиспользуемых тегов и т.д.

Но я не узнал, почему я получаю такую ​​ошибку, поскольку отладка в pyspark вообще не простая задача.

Смежные вопросы