2017-02-15 5 views
9

Я пытаюсь создать простой SQL-запрос на события S3, используя Spark. Я загрузка ~ 30GB из JSON файлов следующим образом:SQL-запрос в Spark/scala Размер превышает Integer.MAX_VALUE

val d2 = spark.read.json("s3n://myData/2017/02/01/1234"); 
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK); 
d2.registerTempTable("d2"); 

Тогда я пытаюсь записать в файл результата моего запроса:

val users_count = sql("select count(distinct data.user_id) from d2"); 
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv"); 

Но Спарк бросает следующее исключение:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) 
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103) 
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91) 
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287) 
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105) 
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439) 
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672) 
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
at org.apache.spark.scheduler.Task.run(Task.scala:85) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

Обратите внимание, что тот же запрос работает для меньших объемов данных. В чем проблема?

+0

скорее всего проблема с размером раздела превышает лимиты, попробуйте '.repartition (100)' и т. Д., Это должно решить его –

+0

после чтения данных попробуйте перераспределить 'val d2 = spark.read.json (" s3n: // myData/2017/02/01/1234 "). Repartition (1000)' Ссылка https://issues.apache.org/jira/browse/SPARK-1476 –

+0

В качестве дополнительной заметки вам может понадобиться использовать новую 's3a 'вместо' s3n'; см., например, http://stackoverflow.com/questions/33356041/technically-what-is-the-difference-between-s3n-s3a-and-s3 – sgvd

ответ

22

Отсутствие блока Shuffle Spark может быть больше 2 ГБ (байты Integer.MAX_VALUE), поэтому вам нужно больше/меньше разделов.

Вы должны отрегулировать параметр spark.default.parallelism и spark.sql.shuffle.partitions (по умолчанию 200), чтобы количество разделов могло вмещать ваши данные, не достигнув предела 2 ГБ (вы могли бы попытаться нацелиться на 256 МБ/раздел, чтобы 200 ГБ вы получаете 800 разделов). Тысячи разделов очень распространены, поэтому не бойтесь переделать до 1000, как было предложено.

FYI, вы можете проверить количество разделов для РДА с чем-то вроде rdd.getNumPartitions (т.е. d2.rdd.getNumPartitions)

Там есть история, чтобы отслеживать усилие решения различных ограничений 2Гба (был открыты на данный момент): https://issues.apache.org/jira/browse/SPARK-6235

См. http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/25 для получения дополнительной информации об этой ошибке.

+0

Спасибо за ответ! – eexxoo

+0

Спасибо за объяснение! Также просмотрите https://stackoverflow.com/questions/45704156/what-is-the-difference-between-spark-sql-shuffle-partitions-and-spark-default-pa, чтобы отредактировать количество разделов по умолчанию. – Raphvanns

Смежные вопросы