2014-11-20 2 views
15

При уменьшении количества разделов можно использовать coalesce, что отлично, потому что оно не вызывает перетасовку и, кажется, работает мгновенно (не требуется дополнительный этап задания).Spark: увеличить количество разделов, не вызывая перетасовки?

Иногда я хотел бы сделать обратное, но repartition вызывает перетасовку. Я думаю, что несколько месяцев назад я получил эту работу, используя CoalescedRDD с balanceSlack = 1.0. Так что бы это произошло, так это разделение раздела так, чтобы результирующие разделы размещались там, где все на одном узле (так малое сетевое IO).

Этот тип функциональности является автоматическим в Hadoop, один просто изменяет раздельный размер. Кажется, что он не работает таким образом в Spark, если не уменьшается количество разделов. Я думаю, что решение может состоять в том, чтобы написать пользовательский разделитель вместе с настраиваемым RDD, где мы определяем getPreferredLocations ... но я думал, что это такая простая и обычная вещь, чтобы сделать, конечно, должен быть прямой способ сделать это?

Вещи пробовали:

.set("spark.default.parallelism", partitions) на моем SparkConf, и когда в контексте чтения паркета я попытался sqlContext.sql("set spark.sql.shuffle.partitions= ..., что на 1.0.0 приводит к ошибке и на самом деле не хочу я хочу, я хочу раздел номер для изменения по всем типам заданий, а не просто перетасовки.

+0

Любое везение найти решение для этого? – nbubis

ответ

3

Смотреть это пространство

https://issues.apache.org/jira/browse/SPARK-5997

Этот вид очень простой очевидной функции в конечном итоге будет реализован - я думаю, только после того, как они закончат все ненужные функции в Dataset с.

0

Я не совсем понимаю, в чем ваша точка. Вы имеете в виду, что у вас сейчас 5 разделов, но после следующей операции вам нужны данные, распределенные до 10? Поскольку наличие 10, но все еще использование 5 не имеет большого смысла ... Процесс отправки данных в новые разделы должен произойти когда-нибудь.

При выполнении coalesce вы можете избавиться от несанкционированных разделов, например: если у вас было первоначально 100, но затем после reduceByKey вы получили 10 (как там, где всего 10 ключей), вы можете установить coalesce.

Если вы хотите, чтобы процесс шел в другую сторону, вы могли бы просто заставить своего рода перегородки:

[RDD].partitionBy(new HashPartitioner(100)) 

я не уверен, что это то, что вы ищете, но надеюсь.

+3

Каждый раздел имеет местоположение, то есть узел, предположим, что у меня есть 5 разделов и 5 узлов. Если я назову 'repartition' или ваш код, до 10 разделов, это будет перетасовывать данные, то есть данные для каждого из 5 узлов могут проходить по сети на другие узлы. Я хочу, чтобы Spark просто разбивал каждый раздел на 2 без перемещения каких-либо данных - это то, что происходит в Hadoop при настройке настроек разделения. – samthebest

+0

Я не уверен, можете ли вы это сделать. Я предполагаю, что вам понадобится какая-то функция '.forEachNode'. Но я никогда не видел ничего подобного. И я не уверен, что его можно легко реализовать. Разделитель должен возвращать один и тот же раздел для одного и того же объекта каждый раз. По умолчанию Spark использует 'HashPartitioner', которые делают ** hashCode по модулю number_of_partitions **. Если вы просто разделите данные на два новых раздела, они определенно окажутся не в своих местах. Вот почему необходимо перетасовать. Возможно, если у вас есть собственный разделитель, он может увеличить количество разделов без перетасовки по сети. – szefuf

Смежные вопросы