2016-07-07 2 views
10

У меня есть требование для загрузки данных из таблицы Hive с использованием spark-SQL HiveContext и загрузки в HDFS. По умолчанию DataFrame из SQL-вывода имеет 2 раздела. Чтобы получить больше параллелизма, мне нужно больше разделов из SQL. В HiveContext нет перегруженного метода, чтобы принять число параметров разделов.Как изменить размер раздела в Spark SQL

Переразделение RDD вызывает перетасовку и приводит к увеличению времени обработки.

val result = sqlContext.sql("select * from bt_st_ent") 

Имеет выход лог:

Starting task 0.0 in stage 131.0 (TID 297, aster1.com, partition 0,NODE_LOCAL, 2203 bytes) 
Starting task 1.0 in stage 131.0 (TID 298, aster1.com, partition 1,NODE_LOCAL, 2204 bytes) 

Я хотел бы знать, есть ли способ увеличить размер разделов на выходе SQL.

ответ

-1

Очень распространенная и болезненная проблема. Вы должны искать ключ, который распределяет данные в единых разделах. Вы можете использовать операторы DISTRIBUTE BY и CLUSTER BY, чтобы рассказать искру, чтобы группировать строки в разделе. Это вызовет некоторые накладные расходы на самом запросе. Но это приведет к раздельным размерам разделов. Deepsense имеет очень хорошее руководство по этому вопросу.

-1

Если SQL выполняет перетасовать (например, это объединение, или какой-то группы по), вы можете установить количество разделов, установив свойство «spark.sql.shuffle.partitions»

sqlContext.setConf("spark.sql.shuffle.partitions", 64) 

Следуя указаниям Фокко, вы можете использовать случайную переменную для кластера.

val result = sqlContext.sql(""" 
    select * from (
    select *,random(64) as rand_part from bt_st_ent 
    ) cluster by rand_part""") 
3

Спарк < 2,0:

Вы можете использовать Hadoop параметры конфигурации:

  • mapred.min.split.size.
  • mapred.max.split.size

, а также размер блока HDFS контролировать размер раздела для файловой системы на основе форматов.

val minSplit: Int = ??? 
val maxSplit: Int = ??? 

sc.hadoopConfiguration.setInt("mapred.min.split.size", minSplit) 
sc.hadoopConfiguration.setInt("mapred.max.split.size", maxSplit) 

Спарк 2.0+:

Вы можете использовать spark.sql.files.maxPartitionBytes конфигурацию:

spark.conf.set("spark.sql.files.maxPartitionBytes", maxSplit) 

В обоих случаях эти значения не могут быть в использовании конкретным источником данных API, так что вы всегда должны проверьте документацию/информацию о реализации формата, который вы используете.

+0

Это не сработало в нашем кластере для Spark 2.1.1 с использованием набора данных Luckylukee