2016-07-14 3 views
0

У меня есть файл журнала, состоящий из «События», «Время», «Пользовательский».Расчет среднего времени между событиями пользователями с pySpark

+------------+----------------+---------+ 
| Events |  Time  | UserId | 
+------------+----------------+---------+ 
| ClickA  | 7/6/16 10:00am | userA | 
+------------+----------------+---------+ 
| ClickB  | 7/6/16 12:00am | userA | 
+------------+----------------+---------+ 

Я бы хотел, чтобы каждый пользователь вычислил среднее время между событиями. Как вы решаете эту проблему? В традиционной среде программирования я бы прошел через каждое событие для пользователя и вычислил временную дельта между событиями n и n-1, добавив это значение в массив A. Затем я вычислил среднее значение для каждого значения в A. Как я могу это сделать с помощью Spark?

ответ

1

Игнорирование даты разбора это выглядит как работа для оконной функции, за которым следует простому складывание так грубо вам нужно что-то вроде этого:

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.{lag, avg} 

val df = Seq(
    ("ClickA", "2016-06-07 10:00:00", "UserA"), 
    ("ClickB", "2016-06-07 12:00:00", "UserA") 
).toDF("events", "time", "userid").withColumn("time", $"time".cast("timestamp")) 

val w = Window.partitionBy("userid").orderBy("time") 

// Difference between consecutive events in seconds 
val diff = $"time".cast("long") - lag($"time", 1).over(w).cast("long") 

df.withColumn("diff", diff).groupBy("userid").agg(avg($"diff")) 
+0

Спасибо zero323! Знаете ли вы, как я могу наложить эту строку (5/1/2016 4:03:34 PM) на метку времени? Я не мог найти правильный путь с pyspark. – Ahmet

+0

В значительной степени, как показано здесь: http://stackoverflow.com/a/36095322/1560062, но вам придется настроить формат (https://docs.oracle.com/javase/7/docs/api/java/text /SimpleDateFormat.html) – zero323

Смежные вопросы