2016-01-11 4 views
1

Я знаю, что мы можем использовать Window function in pyspark для расчета суммарной суммы. Но Window поддерживается только в HiveContext, а не в SQLContext. Мне нужно использовать SQLContext, поскольку HiveContext не может быть запущен в нескольких процессах.Как рассчитать суммарную сумму с использованием sqlContext

Есть ли эффективный способ вычисления суммарной суммы с использованием SQLContext? Простым способом является загрузка данных в память драйвера и использование numpy.cumsum, но con необходимо, чтобы данные могли быть помещены в память.

+0

_need использовать SQLContext в HiveContext не может работать в нескольких processes_ - да? Не могли бы вы уточнить? – zero323

+0

Я активно использовал функции Window с sqlContext. – KrisP

+0

@ zero323 Ограничение HiveContext. Я столкнулся с такой же проблемой, как https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201502.mbox/%[email protected]%3E – Michael

ответ

8

Не уверен, что это то, что вы ищете, но здесь два примера, как использовать sqlContext рассчитать накопленную сумму:

Первый, когда вы хотите, чтобы разделить его на несколько категорий:

from pyspark.sql.types import StructType, StringType, LongType 
from pyspark.sql import SQLContext 

rdd = sc.parallelize([ 
    ("Tablet", 6500), 
    ("Tablet", 5500), 
    ("Cell Phone", 6000), 
    ("Cell Phone", 6500), 
    ("Cell Phone", 5500) 
    ]) 

schema = StructType([ 
    StructField("category", StringType(), False), 
    StructField("revenue", LongType(), False) 
    ]) 

df = sqlContext.createDataFrame(rdd, schema) 

df.registerTempTable("test_table") 

df2 = sqlContext.sql(""" 
SELECT 
    category, 
    revenue, 
    sum(revenue) OVER (PARTITION BY category ORDER BY revenue) as cumsum 
FROM 
test_table 
""") 

Выход:

[Row(category='Tablet', revenue=5500, cumsum=5500), 
Row(category='Tablet', revenue=6500, cumsum=12000), 
Row(category='Cell Phone', revenue=5500, cumsum=5500), 
Row(category='Cell Phone', revenue=6000, cumsum=11500), 
Row(category='Cell Phone', revenue=6500, cumsum=18000)] 

Se cond, когда вы только хотите взять cumsum одной переменной. Изменение df2 к этому:

df2 = sqlContext.sql(""" 
SELECT 
    category, 
    revenue, 
    sum(revenue) OVER (ORDER BY revenue, category) as cumsum 
FROM 
test_table 
""") 

Выход:

[Row(category='Cell Phone', revenue=5500, cumsum=5500), 
Row(category='Tablet', revenue=5500, cumsum=11000), 
Row(category='Cell Phone', revenue=6000, cumsum=17000), 
Row(category='Cell Phone', revenue=6500, cumsum=23500), 
Row(category='Tablet', revenue=6500, cumsum=30000)] 

Надеется, что это помогает. Использование np.cumsum не очень эффективно после сбора данных, особенно если набор данных большой. Другой способ, которым вы можете исследовать, - использовать простые преобразования RDD, такие как groupByKey(), а затем использовать карту для вычисления суммарной суммы каждой группы с помощью некоторого ключа, а затем уменьшить ее в конце.

+0

Спасибо, но ваше решение работает на hiveContext, а не на sqlContext. Вы можете вывести свой sqlContext? Он должен показать, что это hiveContext – Michael

0

Неверно, что функция windows работает только с HiveContext. Вы можете использовать их даже с sqlContext:

from pyspark.sql.window import * 

myPartition=Window.partitionBy(['col1','col2','col3']) 

temp= temp.withColumn("#dummy",sum(temp.col4).over(myPartition)) 
+0

Только на искру 2.0+ можно использовать функции Window с SQLContext. Для версий Spark 1.4 ~ 1.6 необходимо использовать HiveContext –

+0

Нет, они введены из искровой версии 1.4 –

+1

Они существуют с 1.4, но до Spark 2 необходимо было использовать HiveContext. Однако во многих дистрибутивах класс по умолчанию для экземпляра «sqlContext» как в искровой оболочке, так и в pyspark является, по сути, HiveContext, поэтому это может вызвать некоторые недоумения, когда люди думают, что можно использовать функции окна с помощью нормальный SQLContext. Вы можете обратиться к этому вопросу за дополнительной информацией: http://stackoverflow.com/questions/36171349/using-windowing-functions-in-spark –

1

Вот простой пример:

import pyspark 
from pyspark.sql import window 
import pyspark.sql.functions as sf 


sc = pyspark.SparkContext(appName="test") 
sqlcontext = pyspark.SQLContext(sc) 

data = sqlcontext.createDataFrame([("Bob", "M", "Boston", 1, 20), 
            ("Cam", "F", "Cambridge", 1, 25), 
            ("Lin", "F", "Cambridge", 1, 25), 
            ("Cat", "M", "Boston", 1, 20), 
            ("Sara", "F", "Cambridge", 1, 15), 
            ("Jeff", "M", "Cambridge", 1, 25), 
            ("Bean", "M", "Cambridge", 1, 26), 
            ("Dave", "M", "Cambridge", 1, 21),], 
           ["name", 'gender', "city", 'donation', "age"]) 


data.show() 

дает OUTPUT

+----+------+---------+--------+---+ 
|name|gender|  city|donation|age| 
+----+------+---------+--------+---+ 
| Bob|  M| Boston|  1| 20| 
| Cam|  F|Cambridge|  1| 25| 
| Lin|  F|Cambridge|  1| 25| 
| Cat|  M| Boston|  1| 20| 
|Sara|  F|Cambridge|  1| 15| 
|Jeff|  M|Cambridge|  1| 25| 
|Bean|  M|Cambridge|  1| 26| 
|Dave|  M|Cambridge|  1| 21| 
+----+------+---------+--------+---+ 

Теперь здесь ловушка:

temp = data.withColumn('cumsum',sum(data.donation).over(win_spec)) 

с ошибкой:

TypeErrorTraceback (most recent call last) 
<ipython-input-9-b467d24b05cd> in <module>() 
----> 1 temp = data.withColumn('cumsum',sum(data.donation).over(win_spec)) 

/Users/mupadhye/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.pyc in __iter__(self) 
    238 
    239  def __iter__(self): 
--> 240   raise TypeError("Column is not iterable") 
    241 
    242  # string methods 

TypeError: Column is not iterable 

Это связано с использованием sum функции питона вместо pyspark's. Способ исправить это с помощью функции sum из pyspark.sql.functions.sum:

temp = data.withColumn('AgeSum',sf.sum(data.donation).over(win_spec)) 
temp.show() 

даст:

+----+------+---------+--------+---+--------------+ 
|name|gender|  city|donation|age|CumSumDonation| 
+----+------+---------+--------+---+--------------+ 
|Sara|  F|Cambridge|  1| 15|    1| 
| Cam|  F|Cambridge|  1| 25|    2| 
| Lin|  F|Cambridge|  1| 25|    3| 
| Bob|  M| Boston|  1| 20|    1| 
| Cat|  M| Boston|  1| 20|    2| 
|Dave|  M|Cambridge|  1| 21|    1| 
|Jeff|  M|Cambridge|  1| 25|    2| 
|Bean|  M|Cambridge|  1| 26|    3| 
+----+------+---------+--------+---+--------------+ 
1

После посадки на эту тему, пытаясь решить подобную проблему, я решил свою проблему, используя этот код , Не уверен, что если я пропускаю часть ОП, но это способ суммировать SQLContext столбец:

from pyspark.conf import SparkConf 
from pyspark.context import SparkContext 
from pyspark.sql.context import SQLContext 

sc = SparkContext() 
sc.setLogLevel("ERROR") 
conf = SparkConf() 
conf.setAppName('Sum SQLContext Column') 
conf.set("spark.executor.memory", "2g") 
sqlContext = SQLContext(sc) 

def sum_column(table, column): 
    sc_table = sqlContext.table(table) 
    return sc_table.agg({column: "sum"}) 

sum_column("db.tablename", "column").show() 
Смежные вопросы