2016-08-15 3 views
3

(Это с Спарк 2.0 работает на небольшой три машины Amazon EMR кластера)Spark. ~ 100 миллионов строк. Размер превышает Integer.MAX_VALUE?

У меня есть работа PySpark, который загружает некоторые большие текстовые файлы в Спарк RDD, делает подсчет(), которая успешно возвращает 158,598,155.

Затем задание анализирует каждую строку в экземпляре pyspark.sql.Row, создает DataFrame и выполняет другое подсчет. Этот второй счетчик() в DataFrame вызывает исключение в внутреннем коде Spark Size exceeds Integer.MAX_VALUE. Это работает с меньшими объемами данных. Может кто-нибудь объяснить, почему/как это произойдет?

org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 1.0 failed 4 times, most recent failure: Lost task 22.3 in stage 1.0 (TID 77, ip-172-31-97-24.us-west-2.compute.internal): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) 
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103) 
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287) 
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105) 
    at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439) 
    at org.apache.spark.storage.BlockManager.get(BlockManager.scala:604) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

код PySpark:

raw_rdd = spark_context.textFile(full_source_path) 

# DEBUG: This call to count() is expensive 
# This count succeeds and returns 158,598,155 
logger.info("raw_rdd count = %d", raw_rdd.count()) 
logger.info("completed getting raw_rdd count!!!!!!!") 

row_rdd = raw_rdd.map(row_parse_function).filter(bool) 
data_frame = spark_sql_context.createDataFrame(row_rdd, MySchemaStructType) 

data_frame.cache() 
# This will trigger the Spark internal error 
logger.info("row count = %d", data_frame.count()) 
+0

Каков второй ожидаемый результат 'counts()'? – gsamaras

+0

Просьба поделиться фрагментом, в котором произошла ошибка. – javadba

+0

@gsamaras, в основном то же, что и первый счет. – clay

ответ

0

ошибка исходит не от data_frame.count() сам, а потому, что разбор строк с помощью row_parse_function дает некоторые целые числа, которые не помещаются в указанный целочисленного типа в MySchemaStructType.

Попробуйте увеличить число целых типов в вашей схеме до pyspark.sql.types.LongType() или, альтернативно, искривьте типы, опуская схему (это, однако, может замедлить оценку).

+0

Теперь 'row_parse_function' определенно проверял наличие связанных значений. Исключение происходит в 'FileChannelImpl.map', что не имеет смысла с ошибкой синтаксического анализа вне диапазона. – clay

+0

@clay Не могли бы вы написать 'row_parse_function' и' MySchemaStructType'? – antonislav