2015-12-04 3 views
1

Я использую boto3 для чтения файлов с S3, это показалось намного быстрее, чем sc.textFile(...). Эти файлы составляют от 300 МБ до 1 ГБ. Процесс идет как:PySpark бросает java.io.EOFException при чтении больших файлов с помощью boto3

data = sc.parallelize(list_of_files, numSlices=n_partitions) \ 
    .flatMap(read_from_s3_and_split_lines) 

events = data.aggregateByKey(...) 

При выполнении этого процесса, я получаю исключение:

15/12/04 10:58:00 WARN TaskSetManager: Lost task 41.3 in stage 0.0 (TID 68, 10.83.25.233): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:203) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.EOFException 
    at java.io.DataInputStream.readInt(DataInputStream.java:392) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:139) 
    ... 15 more 

Много раз, только некоторые задачи аварии и работа в состоянии восстановить. Однако иногда вся работа падает после ряда этих ошибок. Я не смог найти происхождение этой проблемы и, кажется, появляется и исчезает в зависимости от количества файлов, которые я читаю, точных преобразований, которые я применяю ... Это никогда не сбой при чтении одного файла.

ответ

2

Я столкнулся с подобной проблемой, мое исследование показало, что проблема заключалась в отсутствии свободной памяти для процесса Python. Spark взял всю память, и процесс Python (где работает PySpark) сбой.

Некоторые советы:

  1. добавить некоторую память машины,
  2. unpersist Ненужные РДУ,
  3. управления памятью мудрее (добавить некоторые ограничения на использование памяти Спарк).
Смежные вопросы