2015-02-24 4 views
1

Я обрабатываю много файлов журналов, и я хотел бы переместить задание в Spark, но я не могу понять, как агрегировать события по событию на основе времени, как я могу легко в Пандах.(Py) Spark - группа пользователей за время времени

Вот именно то, что я хочу сделать:

Для лог-файл (моделируемой ниже) пользователей, которые испытали какое-то событие, я хотел бы вернуться назад во времени, семь дней, и вернуть агрегаты для всех другие столбцы.

Здесь он находится в Пандах. Любые идеи, как переносить это в PySpark?

import pandas as pd 
df = pd.DataFrame({'user_id':[1,1,1,2,2,2], 'event':[0,1,0,0,0,1], 'other':[12, 20, 16, 84, 11, 15] , 'event_date':['2015-01-01 00:02:43', '2015-01-04 00:02:03', '2015-01-10 00:12:26', '2015-01-01 00:02:43', '2015-01-06 00:02:43', '2015-01-012 18:10:09']}) 
df['event_date'] = pd.to_datetime(df['event_date']) 
df 

Дает:

event event_date   other user_id 
0 0  2015-01-01 00:02:43 12  1 
1 1  2015-01-04 00:02:03 20  1 
2 0  2015-01-10 00:12:26 16  1 
3 0  2015-01-01 00:02:43 84  2 
4 0  2015-01-06 00:02:43 11  2 
5 1  2015-01-12 18:10:09 15  2 

Я хотел бы, чтобы сгруппировать этот DataFrame по user_id, затем исключить строку из агрегации, где строка старше семи дней от «события».

В Панды, например, так:

def f(x): 
    # Find event 
    win = x.event == 1 

    # Get the date when event === 1 
    event_date = list(x[win]['event_date'])[0] 

    # Construct the window 
    min_date = event_date - pd.DateOffset(days=7) 

    # Set x to this specific date window 
    x = x[(x.event_date > min_date) & (x.event_date <= event_date)] 

    # Aggregate other 
    x['other'] = x.other.sum() 

    return x[win] #, x[z]]) 


df.groupby(by='user_id').apply(f).reset_index(drop=True) 

Предоставление требуемого выходного сигнала (по одной строке для каждого пользователя, где EVENT_DATE соответствует событию == 1):

event event_date   other user_id 
0 1  2015-01-04 00:02:03 32  1 
1 1  2015-01-12 18:10:09 26  2 

Каждый знает, где, чтобы начать получать этот результат в Spark?

ответ

2

Скорее SQLish, но вы можете сделать что-то вроде этого:

from pyspark.sql.functions import sum, col, udf 
from pyspark.sql.types import BooleanType 

# With raw SQL you can use datediff but it looks like it is not 
# available as a function yet 
def less_than_n_days(n):              
    return udf(lambda dt1, dt2: 0 <= (dt1 - dt2).days < n, BooleanType()) 

# Select only events 
events = df.where(df.event == 1).select(
     df.event_date.alias("evd"), df.user_id.alias("uid")) 

(events 
    .join(df, (events.uid == df.user_id) & (events.evd >= df.event_date)) 
    .where(less_than_n_days(7)(col("evd"), col("event_date"))) 
    .groupBy("evd", "user_id") 
    .agg(sum("other").alias("other")) 
    .withColumnRenamed("evd", "event_date")) 

К сожалению, мы не можем включить пункт less_than_n_days в join, поскольку udf может получить доступ только столбцы из одной таблицы. Поскольку он не применяется к встроенному datediff, вы можете предпочесть необработанный SQL:

df.registerTempTable("df") 
events.registerTempTable("events") 

sqlContext.sql(""" 
    SELECT evd AS event_date, user_id, SUM(other) AS other 
    FROM df JOIN events ON 
     df.user_id = events.uid AND 
     datediff(evd, event_date) BETWEEN 0 AND 6 
    GROUP by evd, user_id""") 
Смежные вопросы