2016-02-23 3 views
2

У меня есть DataSpark PySpark с именем DF с (K, V) парами. Я хотел бы применить несколько функций с помощью ReduceByKey. Например, я следующие три простые функции:Pyspark RDD ReduceByKey Множественная функция

def sumFunc(a,b): return a+b 

def maxFunc(a,b): return max(a,b) 

def minFunc(a,b): return min(a,b) 

Когда я применяю только одну функцию, например ,, следующие три работы:

DF.reduceByKey(sumFunc) #works 
DF.reduceByKey(maxFunc) #works 
DF.reduceByKey(minFunc) #works 

Но, когда я применять более чем одну функцию, ее не работает, например, следующие действия не работают.

DF.reduceByKey(sumFunc, maxfunc, minFunc) #it does not work 
DF.reduceByKey(sumFunc, maxfunc) #it does not work 
DF.reduceByKey(maxfunc, minFunc) #it does not work 
DF.reduceByKey(sumFunc, minFunc) #it does not work 

Я не хочу использовать groupByKey, поскольку он замедляет вычисления.

ответ

1

Если вход является DataFrame просто использовать agg:

import pyspark.sql.functions as sqlf 

df = sc.parallelize([ 
    ("foo", 1.0), ("foo", 2.5), ("bar", -1.0), ("bar", 99.0) 
]).toDF(["k", "v"]) 

df.groupBy("k").agg(sqlf.min("v"), sqlf.max("v"), sqlf.sum("v")).show() 

## +---+------+------+------+ 
## | k|min(v)|max(v)|sum(v)| 
## +---+------+------+------+ 
## |bar| -1.0| 99.0| 98.0| 
## |foo| 1.0| 2.5| 3.5| 
## +---+------+------+------+ 

С РДУ вы можете использовать statcounter:

from pyspark.statcounter import StatCounter 

rdd = df.rdd 
stats = rdd.aggregateByKey(
    StatCounter(), StatCounter.merge, StatCounter.mergeStats 
).mapValues(lambda s: (s.min(), s.max(), s.sum())) 

stats.collect() 
## [('bar', (-1.0, 99.0, 98.0)), ('foo', (1.0, 2.5, 3.5))] 

Используя свои функции, которые вы могли бы сделать что-то вроде этого:

def apply(x, y, funs=[minFunc, maxFunc, sumFunc]): 
    return [f(x_, y_) for f, x_, y_ in zip(*(funs, x, y))] 

rdd.combineByKey(lambda x: (x, x, x), apply, apply).collect() 
## [('bar', [-1.0, 99.0, 98.0]), ('foo', [1.0, 2.5, 3.5])] 
+0

Можете ли вы также использовать метод RDD для нескольких экземпляров StatCounter? Например, если вы хотите использовать min/max для разных столбцов в одной и той же совокупности? – Matthias

+0

Я попытался использовать ваш пример StatCounter. Но когда я пытаюсь использовать его с ключом-значением RDD со строковым-float, то я получаю эту ошибку: TypeError: unbound метод merge() должен быть вызван с экземпляром NoneType в качестве первого аргумента (вместо этого был получен экземпляр StatCounter) – Matthias

+0

@Matthias I don ' t использовать Python 2. Если вы используете 2.x, вам придется использовать функции, которые используют определенные экземпляры, которые вы получаете в seq-op и merge-op. – zero323

Смежные вопросы