Не уверен, что это то, что вы ищете, но здесь два примера, как использовать sqlContext рассчитать накопленную сумму:
Первый, когда вы хотите, чтобы разделить его на несколько категорий:
from pyspark.sql.types import StructType, StringType, LongType
from pyspark.sql import SQLContext
rdd = sc.parallelize([
("Tablet", 6500),
("Tablet", 5500),
("Cell Phone", 6000),
("Cell Phone", 6500),
("Cell Phone", 5500)
])
schema = StructType([
StructField("category", StringType(), False),
StructField("revenue", LongType(), False)
])
df = sqlContext.createDataFrame(rdd, schema)
df.registerTempTable("test_table")
df2 = sqlContext.sql("""
SELECT
category,
revenue,
sum(revenue) OVER (PARTITION BY category ORDER BY revenue) as cumsum
FROM
test_table
""")
Выход:
[Row(category='Tablet', revenue=5500, cumsum=5500),
Row(category='Tablet', revenue=6500, cumsum=12000),
Row(category='Cell Phone', revenue=5500, cumsum=5500),
Row(category='Cell Phone', revenue=6000, cumsum=11500),
Row(category='Cell Phone', revenue=6500, cumsum=18000)]
Se cond, когда вы только хотите взять cumsum одной переменной. Изменение df2 к этому:
df2 = sqlContext.sql("""
SELECT
category,
revenue,
sum(revenue) OVER (ORDER BY revenue, category) as cumsum
FROM
test_table
""")
Выход:
[Row(category='Cell Phone', revenue=5500, cumsum=5500),
Row(category='Tablet', revenue=5500, cumsum=11000),
Row(category='Cell Phone', revenue=6000, cumsum=17000),
Row(category='Cell Phone', revenue=6500, cumsum=23500),
Row(category='Tablet', revenue=6500, cumsum=30000)]
Надеется, что это помогает. Использование np.cumsum не очень эффективно после сбора данных, особенно если набор данных большой. Другой способ, которым вы можете исследовать, - использовать простые преобразования RDD, такие как groupByKey(), а затем использовать карту для вычисления суммарной суммы каждой группы с помощью некоторого ключа, а затем уменьшить ее в конце.
_need использовать SQLContext в HiveContext не может работать в нескольких processes_ - да? Не могли бы вы уточнить? – zero323
Я активно использовал функции Window с sqlContext. – KrisP
@ zero323 Ограничение HiveContext. Я столкнулся с такой же проблемой, как https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201502.mbox/%[email protected]%3E – Michael