Я обрабатываю много файлов журналов, и я хотел бы переместить задание в Spark, но я не могу понять, как агрегировать события по событию на основе времени, как я могу легко в Пандах.(Py) Spark - группа пользователей за время времени
Вот именно то, что я хочу сделать:
Для лог-файл (моделируемой ниже) пользователей, которые испытали какое-то событие, я хотел бы вернуться назад во времени, семь дней, и вернуть агрегаты для всех другие столбцы.
Здесь он находится в Пандах. Любые идеи, как переносить это в PySpark?
import pandas as pd
df = pd.DataFrame({'user_id':[1,1,1,2,2,2], 'event':[0,1,0,0,0,1], 'other':[12, 20, 16, 84, 11, 15] , 'event_date':['2015-01-01 00:02:43', '2015-01-04 00:02:03', '2015-01-10 00:12:26', '2015-01-01 00:02:43', '2015-01-06 00:02:43', '2015-01-012 18:10:09']})
df['event_date'] = pd.to_datetime(df['event_date'])
df
Дает:
event event_date other user_id
0 0 2015-01-01 00:02:43 12 1
1 1 2015-01-04 00:02:03 20 1
2 0 2015-01-10 00:12:26 16 1
3 0 2015-01-01 00:02:43 84 2
4 0 2015-01-06 00:02:43 11 2
5 1 2015-01-12 18:10:09 15 2
Я хотел бы, чтобы сгруппировать этот DataFrame по user_id, затем исключить строку из агрегации, где строка старше семи дней от «события».
В Панды, например, так:
def f(x):
# Find event
win = x.event == 1
# Get the date when event === 1
event_date = list(x[win]['event_date'])[0]
# Construct the window
min_date = event_date - pd.DateOffset(days=7)
# Set x to this specific date window
x = x[(x.event_date > min_date) & (x.event_date <= event_date)]
# Aggregate other
x['other'] = x.other.sum()
return x[win] #, x[z]])
df.groupby(by='user_id').apply(f).reset_index(drop=True)
Предоставление требуемого выходного сигнала (по одной строке для каждого пользователя, где EVENT_DATE соответствует событию == 1):
event event_date other user_id
0 1 2015-01-04 00:02:03 32 1
1 1 2015-01-12 18:10:09 26 2
Каждый знает, где, чтобы начать получать этот результат в Spark?