2015-06-26 1 views
11

Apache Spark pyspark.RDD В документах API упоминается, что groupByKey() неэффективен. Вместо этого рекомендуется использовать вместо этого reduceByKey(), aggregateByKey(), combineByKey() или foldByKey(). Это приведет к тому, что некоторые из агрегатов у рабочих будут перемещены до тасования, что уменьшит перемещение данных между рабочими.Apache Spark: Какова эквивалентная реализация RDD.groupByKey() с использованием RDD.aggregateByKey()?

Учитывая следующий набор данных и выражение groupByKey(), что эквивалентная и эффективная реализация (сокращение перетасовки данных между работниками), которое не использует groupByKey(), но дает тот же результат?

dataset = [("a", 7), ("b", 3), ("a", 8)] 
rdd = (sc.parallelize(dataset) 
     .groupByKey()) 
print sorted(rdd.mapValues(list).collect()) 

Выход:

[('a', [7, 8]), ('b', [3])] 
+0

Разделяются ли ваши данные случайным образом или по ключу? Если вы можете гарантировать, что все записи с a._1 = «a» находятся на одном разделе, вы можете значительно ускорить их, вы можете уйти без каких-либо перетасовки, кроме тех, которые необходимы для начального разбиения , Может быть, попробуйте использовать хэш-разделитель? –

ответ

18

Что касается Я могу сказать, что в этом конкретном случае нет ничего, чтобы выиграть *, используя aggregateByKey или аналогичную функцию. Поскольку вы создаете список, нет «реального» сокращения, и количество данных, которое нужно перетасовать, более или менее одинаково.

Чтобы действительно наблюдать некоторое увеличение производительности, вам нужны преобразования, которые фактически уменьшают количество передаваемых данных, например, подсчет, вычисление сводных статистических данных, поиск уникальных элементов.

Что касается различий преимуществ использования reduceByKey(), combineByKey() или foldByKey() есть важное концептуальное различие, которое легче увидеть, когда вы рассматриваете Scala API singatures.

И reduceByKey, и foldByKey карта от RDD[(K, V)] до RDD[(K, V)], а вторая - дополнительный нулевой элемент.

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)] 
foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)] 

combineByKey (нет aggregateByKey, но это тот же самый тип преобразования) переходит из RDD[(K, V)] в RDD[(K, C)]:

combineByKey[C](
    createCombiner: (V) ⇒ C, 
    mergeValue: (C, V) ⇒ C, 
    mergeCombiners: (C, C) ⇒ C): RDD[(K, C)] 

Возвращаясь к вашему примеру только combineByKey (и в PySpark aggregateByKey) является действительно применимо, так как вы трансформируетесь от RDD[(String, Int)] до RDD[(String, List[Int])].

Хотя в динамическом языке, как Python фактически можно выполнить такую ​​операцию с помощью foldByKey или reduceByKey это семантика кода неясна и цитировать @ тим-Петерс «Там должно быть одно-- и предпочтительно только один - очевидный способ сделать это "[1].

Разница между aggregateByKey и combineByKey в значительной степени так же, как между reduceByKey и foldByKey поэтому для списка в основном это дело вкуса:

def merge_value(acc, x): 
    acc.append(x) 
    return acc 

def merge_combiners(acc1, acc2): 
    acc1.extend(acc2) 
    return acc1 

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
    .combineByKey(
     lambda x: [x], 
     lambda u, v: u + [v], 
     lambda u1,u2: u1+u2)) 

На практике следует отдавать предпочтение groupByKey хотя. Реализация PySpark значительно более оптимизирована по сравнению с наивной реализацией, подобной той, которая приведена выше.

1.Peters, T. PEP 20 - Дзен Питона. (2004). в https://www.python.org/dev/peps/pep-0020/


* На практике есть на самом деле довольно много потерять здесь, особенно при использовании PySpark. Реализация Python groupByKey значительно оптимизирована, чем наивный комбинат по ключу. Вы можете проверить Be Smart About groupByKey, созданный мной, и @eliasah для дополнительного обсуждения.

+0

Если вы используете разделитель (скажем, раздел хэшем ключа), можете ли вы уйти без каких-либо других тасов? –

+0

@GlennStrycker Насколько я знаю, ответ положительный. Если RDD разделяется ключом, тогда все значения для данного ключа должны обрабатываться локально на одном узле. Возможная проблема заключается в перекошенном распределении ключей. – zero323

3

Вот один вариант, который использует aggregateByKey(). Мне было бы интересно узнать, как это можно сделать, используя reduceByKey(), combineByKey(), или foldByKey(), и какие затраты/выгоды есть для каждой альтернативы.

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
     .aggregateByKey(list(), 
         lambda u,v: u+[v], 
         lambda u1,u2: u1+u2)) 
print sorted(rdd.mapValues(list).collect()) 

Выход:

[('a', [7, 8]), ('b', [3])] 

Ниже приводится несколько больше памяти эффективной реализации, хотя и менее читаемой к питону новичку, который производит тот же результат:

rdd = (sc.parallelize([("a", 7), ("b", 3), ("a", 8)]) 
     .aggregateByKey(list(), 
         lambda u,v: itertools.chain(u,[v]), 
         lambda u1,u2: itertools.chain(u1,u2))) 
print sorted(rdd.mapValues(list).collect()) 
Смежные вопросы