2016-08-08 2 views
3

Я пытаюсь вычислить взвешенное среднее в pyspark, но не делает большого прогрессаРасчет средневзвешенного показателя в PySpark

# Example data 
df = sc.parallelize([ 
    ("a", 7, 1), ("a", 5, 2), ("a", 4, 3), 
    ("b", 2, 2), ("b", 5, 4), ("c", 1, -1) 
]).toDF(["k", "v1", "v2"]) 
df.show() 

import numpy as np 
def weighted_mean(workclass, final_weight): 
    return np.average(workclass, weights=final_weight) 

weighted_mean_udaf = pyspark.sql.functions.udf(weighted_mean, 
    pyspark.sql.types.IntegerType()) 

, но когда я пытаюсь выполнить этот код

df.groupby('k').agg(weighted_mean_udaf(df.v1,df.v2)).show() 

Я получаю погрешность

u"expression 'pythonUDF' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get 

Мой вопрос: могу ли я указать пользовательскую функцию (принимая несколько аргументов) как аргумент к agg? Если нет, есть ли альтернатива для выполнения таких операций, как взвешенное среднее после группировки с помощью ключа?

+1

вы имели в виду, чтобы переопределить 'weighted_mean' функцию? –

+0

Что я хочу сделать: a) groupby b) выполнить операцию в зависимости от нескольких столбцов блока данных. Среднее взвешенное значение является лишь примером. – MARK

+0

Я думаю, что это означало @ cricket_007, вы намеренно переопределяете 'weighted_mean' этой строкой' weighted_mean = pyspark.sql.functions.udf (weighted_mean, 'или это опечатка? – akarilimano

ответ

3

Определенная пользователем функция агрегирования (UDAF, которая работает на pyspark.sql.GroupedData, но не поддерживается в pyspark) не является определяемой пользователем функцией (UDF, которая работает на pyspark.sql.DataFrame).

Потому что в pyspark вы не можете создать свой собственный UDAF, и поставляемое UDAFs не может решить проблему, вам, возможно, придется вернуться к RDD миру:

from numpy import sum 

def weighted_mean(vals): 
    vals = list(vals) # save the values from the iterator 
    sum_of_weights = sum(tup[1] for tup in vals) 
    return sum(1. * tup[0] * tup[1]/sum_of_weights for tup in vals) 

df.map(
    lambda x: (x[0], tuple(x[1:])) # reshape to (key, val) so grouping could work 
).groupByKey().mapValues(
    weighted_mean 
).collect() 
Смежные вопросы