2016-10-27 2 views
2

У меня есть программа Spark, написанная на Scala, которая считывает CSV-файл из HDFS, вычисляет новый столбец и сохраняет его как файл паркета. Я запускаю программу в кластере YARN. Но каждый раз, когда я пытаюсь запустить его, исполнители терпят неудачу в какой-то момент с этой ошибкой.Исправлена ​​ошибка в режиме YARN

Не могли бы вы помочь мне найти причину этой ошибки?

Вход с по исполнителю

16/10/27 15:58:10 WARN storage.BlockManager: Putting block rdd_12_225 failed due to an exception 
16/10/27 15:58:10 WARN storage.BlockManager: Block rdd_12_225 could not be removed as it was not found on disk or in memory 
16/10/27 15:58:10 ERROR executor.Executor: Exception in task 225.0 in stage 4.0 (TID 465) 
java.io.IOException: Stream is corrupted 
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211) 
    at org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125) 
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) 
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265) 
    at java.io.DataInputStream.readInt(DataInputStream.java:387) 
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.readSize(UnsafeRowSerializer.scala:113) 
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.<init>(UnsafeRowSerializer.scala:120) 
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3.asKeyValueIterator(UnsafeRowSerializer.scala:110) 
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:66) 
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:62) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryRelation.scala:118) 
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryRelation.scala:110) 
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:214) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) 
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 15385 of input buffer 
    at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39) 
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:205) 
    ... 41 more 

EDIT:

Существует код, используемый

var df = spark.read.option("header", "true").option("inferSchema", "true").option("treatEmptyValuesAsNulls", "true").csv(hdfsFileURLIn).repartition(nPartitions) 
df.printSchema() 
df = df.withColumn("ipix", a2p(df.col(deName), df.col(raName))).persist(StorageLevel.MEMORY_AND_DISK) 
df.repartition(nPartitions, $"ipix").write.mode("overwrite").option("spark.hadoop.dfs.replication", 1).parquet(hdfsFileURLOut) 

функция a2p пользователь только с двумя двойными и возвращает другой двойной

Мне нужно сказать, что это хорошо работало с относительно небольшим CSV (~ 1Go), но th это ошибка случается каждый раз с более крупными (~ 15Go)

EDIT 2: После предложения я отключил передел, и я использовал StorageLevel.DISK_ONLY

С этим я не получаю Собирает блок RDD _ ** *** не удалось из-за исключения, но есть еще исключение связано с LZ4 (поток поврежден):

16/10/28 07:53:00 ERROR util.Utils: Aborting task 
java.io.IOException: Stream is corrupted 
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211) 
    at org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) 
    at java.io.BufferedInputStream.read(BufferedInputStream.java:353) 
    at java.io.DataInputStream.read(DataInputStream.java:149) 
    at org.spark_project.guava.io.ByteStreams.read(ByteStreams.java:899) 
    at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:733) 
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) 
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) 
    at scala.collection.Iterator$$anon$12.next(Iterator.scala:444) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) 
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:254) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) 
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 12966 of input buffer 
    at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39) 
    at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:205) 
    ... 25 more 

EDIT 3: мне удалось запустить его без каких-либо ошибок, путем удаления и второго передела (один это перераспределение с использованием столбца ipix). Я буду смотреть дальше в документации по этому методу

EDIT 4: Это странно, иногда некоторые исполнители неудачу с ошибкой сегментации:

# 
# A fatal error has been detected by the Java Runtime Environment: 
# 
# SIGSEGV (0xb) at pc=0x00007f48d8a47f2c, pid=3501, tid=0x00007f48cc60c700 
# 
# JRE version: Java(TM) SE Runtime Environment (8.0_102-b14) (build 1.8.0_102-b14) 
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode linux-amd64 compressed oops) 
# Problematic frame: 
# J 4713 C2 org.apache.spark.unsafe.types.UTF8String.hashCode()I (18 bytes) @ 0x00007f48d8a47f2c [0x00007f48d8a47e60+0xcc] 
# 
# Core dump written. Default location: /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1477580152295_0008/container_1477580152295_0008_01_000006/core or core.3501 
# 
# An error report file with more information is saved as: 
# /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1477580152295_0008/container_1477580152295_0008_01_000006/hs_err_pid3501.log 
# 
# If you would like to submit a bug report, please visit: 
# http://bugreport.java.com/bugreport/crash.jsp 
# 

Я проверил память и все мои исполнители всегда есть много свободной памяти (по крайней мере 6Go)

EDIT 4: Итак, я тестировал несколько файлов, и выполнение всегда было успешным, но иногда некоторые исполнители терпят неудачу (с ошибкой выше) и снова запускаются с помощью YARN

+0

Добавьте свой код, чтобы узнать больше .. – Shankar

+0

@Shankar сделано. –

+1

Вы пробовали без переделки? просто догадка .. – Shankar

ответ

0

Какую версию lz4-java вы используете? Это может быть связано с проблемой, которая была исправлена ​​в версии 1.1.2 - см. Это bug report

Кроме того, мне любопытно узнать о вашей функции a2p. В идеале в идеале нужно использовать два объекта Column, а не только Doubles (если вы не зарегистрировали его как UDF).

+0

Да, я зарегистрировал его с помощью udf. EDIT: как узнать версию LZ4? –

+0

Вы должны быть в состоянии найти его, например, в файле МАНИФЕСТ, или в зависимостях вашего проекта. – ShirishT

+0

У меня есть 1.3.0 –

0

Ran в ту же проблему.

Симптомы выглядят точно так же, как это problem: SPARK-18105.

По состоянию на 1/29/17 еще не зафиксировано.

Смежные вопросы