4

У меня есть некоторые данные, которые я хочу сгруппировать по определенному столбцу, а затем агрегировать серию полей на основе скользящего временного окна из группы.Как заполнить окно времени прокатки с группами в Spark

Вот несколько примеров данных:

df = spark.createDataFrame([Row(date='2016-01-01', group_by='group1', get_avg=5, get_first=1), 
          Row(date='2016-01-10', group_by='group1', get_avg=5, get_first=2), 
          Row(date='2016-02-01', group_by='group2', get_avg=10, get_first=3), 
          Row(date='2016-02-28', group_by='group2', get_avg=20, get_first=3), 
          Row(date='2016-02-29', group_by='group2', get_avg=30, get_first=3), 
          Row(date='2016-04-02', group_by='group2', get_avg=8, get_first=4)]) 

Я хочу группе group_by, а затем создать временные окна, которые начинаются в кратчайшие сроки и продлить до тех пор, пока 30 дней без каких-либо записи для этой группы. По истечении этих 30 дней следующее окно времени начнется с даты следующей строки, которая не попала в предыдущее окно.

Затем я хочу объединить, например, получить среднее значение get_avg и первый результат get_first.

Так выход для этого примера должно быть:

group_by first date of window get_avg get_first 
group1  2016-01-01    5  1 
group2  2016-02-01    20  3 
group2  2016-04-02    8  4 

редактировать: жаль, что я понял, что мой вопрос не был задан правильно. Я действительно хочу окно, которое заканчивается через 30 дней бездействия. Я соответствующим образом изменил часть2 группы.

ответ

9

Пересмотренный ответ:

Вы можете использовать простые оконные функции обмануть здесь. Пучок импорта: определение

from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as sum_ 
from pyspark.sql.window import Window 

окно:

w = Window.partitionBy("group_by").orderBy("date") 

В ролях date к DateType:

df_ = df.withColumn("date", col("date").cast("date")) 

Определить следующие выражения:

# Difference from the previous record or 0 if this is the first one 
diff = coalesce(datediff("date", lag("date", 1).over(w)), lit(0)) 

# 0 if diff <= 30, 1 otherwise 
indicator = (diff > 30).cast("integer") 

# Cumulative sum of indicators over the window 
subgroup = sum_(indicator).over(w).alias("subgroup") 

Добавить subgroup выражение в таблице:

df_.select("*", subgroup).groupBy("group_by", "subgroup").avg("get_avg") 
+--------+--------+------------+ 
|group_by|subgroup|avg(get_avg)| 
+--------+--------+------------+ 
| group1|  0|   5.0| 
| group2|  0|  20.0| 
| group2|  1|   8.0| 
+--------+--------+------------+ 

first не имеет смысла с скоплениями, но если столбец является монотонно возрастающей вы можете использовать min. В противном случае вам также придется использовать функции окна.

Протестировано с использованием искры 2.1. Может потребоваться подзапросы и Window экземпляр при использовании с более ранним выпуском Spark.

оригинальный ответ (не имеет значения в указанной области)

С Спарк 2.0 вы должны быть в состоянии использовать a window function:

Bucketize строки в один или несколько раз окна с указанием столбца с меткой времени. Запуск окна является включительным, но окна заканчиваются исключительно, например. 12:05 будет в окне [12: 05,12: 10), но не в [12: 00,12: 05].

from pyspark.sql.functions import window 

df.groupBy(window("date", windowDuration="30 days")).count() 

но вы можете видеть из результата,

+---------------------------------------------+-----+ 
|window          |count| 
+---------------------------------------------+-----+ 
|[2016-01-30 01:00:00.0,2016-02-29 01:00:00.0]|1 | 
|[2015-12-31 01:00:00.0,2016-01-30 01:00:00.0]|2 | 
|[2016-03-30 02:00:00.0,2016-04-29 02:00:00.0]|1 | 
+---------------------------------------------+-----+ 

вы должны быть немного осторожны, когда дело доходит до часовых поясов.

Смежные вопросы