1

У меня есть кусок кода, который создает DataFrame и сохраняется на S3. Ниже создается DataFrame из 1000 строк и 100 столбцов, заполненных math.Random. Я запускаю это в кластере с рабочими узлами 4 x r3.8xlarge и настраивает большое количество памяти. Я пробовал с максимальным количеством исполнителей и одним исполнителем на узел.Медленный или неполный saveAsParquetFile от EMR Spark to S3

// create some random data for performance and scalability testing 
val df = sqlContext.range(0,1000).map(x => 
      Row.fromSeq((1 to 100).map(y => math.Random))) 

df.saveAsParquetFile("s3://kirk/my_file.parquet") 

Моя проблема заключается в том, что я могу создать гораздо больше DataFrame в памяти, чем я могу сэкономить на S3.

Например, 1 миллиард строк и 1000 столбцов могут быть построены и запрошены, но 100 миллионов строк и 100 столбцов терпят неудачу, когда я пишу на S3 таким образом. Я не получаю отличные сообщения из контекста Spark, но работа провалится, потому что слишком много задач не удалось.

Есть ли какая-то конфигурация для более эффективного сохранения файла? Должен ли я настраивать Spark по-разному, чтобы saveAsParquetFile?

Это StackTrace от исполнителя:

15/09/09 18:10:26 ERROR sources.InsertIntoHadoopFsRelation: Aborting task. 
java.lang.OutOfMemoryError: Java heap space 
    at parquet.column.values.dictionary.IntList.initSlab(IntList.java:87) 
    at parquet.column.values.dictionary.IntList.<init>(IntList.java:83) 
    at parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:85) 
    at parquet.column.values.dictionary.DictionaryValuesWriter$PlainIntegerDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:549) 
    at parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:88) 
    at parquet.column.impl.ColumnWriterImpl.<init>(ColumnWriterImpl.java:74) 
    at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:68) 
    at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) 
    at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178) 
    at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) 
    at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) 
    at parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94) 
    at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64) 
    at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) 
    at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) 
    at org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:83) 
    at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229) 
    at org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470) 
    at org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360) 
    at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172) 
    at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) 
    at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
+1

Что говорит журнал ошибок исполнителя/контейнера для ошибок? Если в журнале нет ошибок, nodemanager убивает контейнер для исполнителя из-за превышения памяти? – ChristopherB

+0

@ChristopherB обновил вопрос в журнале исполнителя. Java куча из памяти. Но как мне это настроить? –

ответ

0

Я имею в виду, что вам нужно переразмечаете dataframe (Вы должны иметь по крайней мере numberOfWorkerInstances * numberOfCoresOnEachInstance число разделов), чтобы для параллельных операций записи S3 ,

+0

Я пробовал это, но это не повлияло. Блок данных уже разбит на разделы, поэтому это не должно быть фактором. –

+0

А какие исключения показывают журналы на неудачных исполнителях? –

+0

Я думаю о журналах под/home/hadoop/spark/work для каждого из отдельных исполнителей. –

Смежные вопросы