2016-02-04 3 views
11

В документации Spark говорится, что метод RDD reduce требует ассоциативной и коммутативной двоичной функции.Spark: разница семантики между сокращением и сокращениемByKey

Однако метод reduceByKey ТОЛЬКО требует ассоциативной двоичной функции.

sc.textFile("file4kB", 4) 

Я провел несколько тестов, и, по-видимому, это поведение, которое я получаю. Почему это различие? Почему reduceByKey обеспечивает, чтобы бинарная функция всегда применялась в определенном порядке (для обеспечения отсутствия коммутативности), когда reduce не работает?

Например, если нагрузка некоторых (маленький) текст с 4 разделов (минимум):

val r = sc.textFile("file4k", 4) 

затем:

r.reduce(_ + _) 

возвращает строку, в которой части не всегда в том же порядке, тогда как:

r.map(x => (1,x)).reduceByKey(_ + _).first 

всегда возвращает ту же строку (где все находится в том же порядке, что и в оригинале l файл).

(я проверил с r.glom, и содержимое файла действительно распространяется на 4 раздела, нет пустого раздела).

+2

Я думаю, идея с 'reduceByKey' является то, что вы, вероятно, много разных ключей, поэтому вполне можно сократить все для одного ключа в одном потоке, что означает, что вы всегда можете запускать вычисления слева направо. Напротив, 'сокращение' часто будет использоваться в большом наборе данных, поэтому не нужно заботиться о порядке операций. –

+0

Сколько исполнителей вы используете в своих экспериментах? – gprivitera

ответ

7

Насколько я знаю, это ошибка в документации, и результаты, которые вы видите, являются просто случайными. Практика, other resources и простой analysis of the code показывают, что функция, переданная в reduceByKey, должна быть не только ассоциативной, но и коммутативной. не

  • практики - в то время как она выглядит порядок сохраняются в локальном режиме это уже не так, когда вы запускаете искру на кластере, в том числе автономного режима.

  • других ресурсы - процитировать Data Exploration Using Spark из AmpCamp 3:

    Существует удобный метод, называемый reduceByKey в Спарке именно для этой модели. Обратите внимание, что второй аргумент reduceByKey определяет количество используемых редукторов. По умолчанию Spark предполагает, что функция уменьшения является коммутативной и ассоциативной и применяет комбинаторы на стороне отображения.

  • код - reduceByKey реализуется с помощью combineByKeyWithClassTag и создает ShuffledRDD. Поскольку Spark не гарантирует порядок после перетасовки, единственный способ восстановить его - это привязать некоторые метаданные к частично уменьшенным записям. Насколько я могу сказать, ничего подобного не происходит.

На стороне записки reduce, как это реализовано в PySpark будет прекрасно работать с функцией, которая является только коммутативной. Это, конечно, просто деталь реализации, а не часть контракта.

+3

Я бы добавил, что сокращение - это действие, возвращающее данные в драйвер, а reduceByKey - это преобразование, возвращающее другое RDD – rhernando

+0

Спасибо! Но тогда, есть ли какой-то способ в Искры, чтобы обеспечить правильность некоммутативного лечения? Или это выходит за рамки Spark? –

+0

Я не уверен, если пойму вопрос. Вы спрашиваете, можно ли автоматически проверить/доказать коммутативность или просто хотите использовать некоммутативную функцию с 'reduce'? Если это второй случай, имитирующий поведение PySpark ('mapPartitions (сокращениеFunc)' => 'collect' => reduce (reduceFunc)'), должен работать с некоторым снижением производительности. – zero323

1

В соответствии с кодовой документацией, недавно обновленной/исправленной.(thanks @ zero323):

reduceByKey объединяет значения для каждого ключа, используя ассоциативную и коммутативную функцию уменьшения. Это также будет выполнять слияние локально на каждом картографе перед отправкой результатов в редуктор, подобно «объединителю» в MapReduce.

Таким образом, на самом деле это была ошибка документации, подобная @ zero323, указанная в его ответе.

Вы можете проверить следующие ссылки на код, чтобы убедиться: