2016-04-16 2 views
2

у меня есть два РДА с одинаковыми столбцами:
rdd1: -Pyspark: Расчет суммы два correspoding столбцов, основанных на условиях двух столбцов в два РДЕ

 
+-----------------+ 
|mid|uid|frequency| 
+-----------------+ 
| m1| u1|  1| 
| m1| u2|  1| 
| m2| u1|  2| 
+-----------------+ 

rdd2: -

 
+-----------------+ 
|mid|uid|frequency| 
+-----------------+ 
| m1| u1|  10| 
| m2| u1|  98| 
| m3| u2|  21| 
+-----------------+ 

Я хочу рассчитать сумму frequencies на основе mid и uid. Результат должен быть примерно таким:

 
+-----------------+ 
|mid|uid|frequency| 
+-----------------+ 
| m1| u1|  11| 
| m2| u1|  100| 
| m3| u2|  21| 
+-----------------+ 

Заранее спасибо.

EDIT: я добился решения в этом случае, а также (с помощью карты-свертка):

from pyspark.sql.functions import col 

data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)] 
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)] 
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency']) 
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency']) 

df3 = df1.unionAll(df2) 
df4 = df3.map(lambda bbb: ((bbb['mid'], bbb['uid']), int(bbb['frequency'])))\ 
      .reduceByKey(lambda a, b: a+b) 

p = df4.map(lambda p: (p[0][0], p[0][1], p[1])).toDF() 

p = p.select(col("_1").alias("mid"), \ 
      col("_2").alias("uid"), \ 
      col("_3").alias("frequency")) 

p.show() 

Выход:

 
+---+---+---------+ 
|mid|uid|frequency| 
+---+---+---------+ 
| m2| u1|  100| 
| m1| u1|  11| 
| m1| u2|  1| 
| m3| u2|  21| 
+---+---+---------+ 
+0

Вы можете написать некоторые питона код, чтобы решить эту проблему. Если вы уже пробовали это, вы должны отредактировать вопрос и добавить свой код. –

+0

вы пропустили группу в ожидаемом выводе – eliasah

+0

@ HåkenLid Обычно мы можем сделать это python, используя pandas esp. Но мне нужна помощь в pyspark. – rootcss

ответ

0

Я также получил решение таким образом (Us ИНГ карта-свертка):

from pyspark.sql.functions import col 

data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)] 
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)] 
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency']) 
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency']) 

df3 = df1.unionAll(df2) 
df4 = df3.map(lambda bbb: ((bbb['mid'], bbb['uid']), int(bbb['frequency'])))\ 
      .reduceByKey(lambda a, b: a+b) 

p = df4.map(lambda p: (p[0][0], p[0][1], p[1])).toDF() 

p = p.select(col("_1").alias("mid"), \ 
      col("_2").alias("uid"), \ 
      col("_3").alias("frequency")) 

p.show() 

Выход:

 
+---+---+---------+ 
|mid|uid|frequency| 
+---+---+---------+ 
| m2| u1|  100| 
| m1| u1|  11| 
| m1| u2|  1| 
| m3| u2|  21| 
+---+---+---------+ 
+0

Единственная проблема с этим решением заключается в том, что вы теряете всю оптимизацию, выполненную вольфрамовым проектом над 'DataFrame'. http://stackoverflow.com/questions/31780677/efficient-pairrdd-operations-on-dataframe-with-spark-sql-group-by – eliasah

1

Вам просто нужно выполнить группу по середине и UID и выполнить операцию суммирования:

data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)] 
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)] 
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency']) 
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency']) 

df3 = df1.unionAll(df2) 

df4 = df3.groupBy(df3.mid,df3.uid).sum() \ 
     .withColumnRenamed("sum(frequency)","frequency") 

df4.show() 

# +---+---+---------+ 
# |mid|uid|frequency| 
# +---+---+---------+ 
# | m1| u1|  11| 
# | m1| u2|  1| 
# | m2| u1|  100| 
# | m3| u2|  21| 
# +---+---+---------+ 
+0

Большое спасибо. :) – rootcss

Смежные вопросы