2016-11-03 3 views
2

Я в настоящее время пытается извлечь ряд последовательных вхождений в dataframe Pyspark и порядка/ранжировать их, как показано ниже (для удобства я заказал первоначальный dataframe по user_id и метки времени):Pyspark: Пользовательская функция окна

df_ini 
+-------+--------------------+------------+ 
|user_id|  timestamp  | actions | 
+-------+--------------------+------------+ 
| 217498|   100000001| 'A'  | 
| 217498|   100000025| 'A'  | 
| 217498|   100000124| 'A'  | 
| 217498|   100000152| 'B'  | 
| 217498|   100000165| 'C'  | 
| 217498|   100000177| 'C'  | 
| 217498|   100000182| 'A'  | 
| 217498|   100000197| 'B'  | 
| 217498|   100000210| 'B'  | 
| 854123|   100000005| 'A'  | 
| 854123|   100000007| 'A'  | 
| etc. 

к:

expected df_transformed 
+-------+------------+------------+------------+ 
|user_id| actions | nb_of_occ | order | 
+-------+------------+------------+------------+ 
| 217498| 'A'  |  3  |  1  | 
| 217498| 'B'  |  1  |  2  | 
| 217498| 'C'  |  2  |  3  | 
| 217498| 'A'  |  1  |  4  | 
| 217498| 'B'  |  2  |  5  | 
| 854123| 'A'  |  2  |  1  | 
| etc. 

Я предполагаю, что я должен использовать смарт-функцию окна, что секционирование таблицы по user_id и действиями , но только тогда, когда эти действия являются последовательными во время! Который я не могу понять, как это сделать ...

Если кто-то столкнулся с этим типом трансформации в pyspark, прежде чем я был бы рад получить намек!

Приветствия

ответ

4

Это довольно распространенный вид и может быть выражена с помощью оконных функций в несколько шагов. Первый импорт необходимых функций:

from pyspark.sql.functions import sum as sum_, lag, col, coalesce, lit 
from pyspark.sql.window import Window 

Следующая определяют окно:

w = Window.partitionBy("user_id").orderBy("timestamp") 

Отметить первую строку для каждой группы:

is_first = coalesce(
    (lag("actions", 1).over(w) != col("actions")).cast("bigint"), 
    lit(1) 
) 

Определение order:

order = sum_("is_first").over(w) 

И объединитьвсе части вместе с агрегацией:

(df 
    .withColumn("is_first", is_first) 
    .withColumn("order", order) 
    .groupBy("user_id", "actions", "order") 
    .count()) 

Если вы определяете df как:

df = sc.parallelize([ 
    (217498, 100000001, 'A'), (217498, 100000025, 'A'), (217498, 100000124, 'A'), 
    (217498, 100000152, 'B'), (217498, 100000165, 'C'), (217498, 100000177, 'C'), 
    (217498, 100000182, 'A'), (217498, 100000197, 'B'), (217498, 100000210, 'B'), 
    (854123, 100000005, 'A'), (854123, 100000007, 'A') 
]).toDF(["user_id", "timestamp", "actions"]) 

и заказать результат на user_id и order вы получите:

+-------+-------+-----+-----+ 
|user_id|actions|order|count| 
+-------+-------+-----+-----+ 
| 217498|  A| 1| 3| 
| 217498|  B| 2| 1| 
| 217498|  C| 3| 2| 
| 217498|  A| 4| 1| 
| 217498|  B| 5| 2| 
| 854123|  A| 1| 2| 
+-------+-------+-----+-----+ 
+0

зачем вам coalesce()? – mathopt

2

Я боюсь, что это не возможно, используя стандартные функции dataframe оконные. Но вы все равно можете использовать старый RDD API groupByKey() для достижения этой трансформации:

>>> from itertools import groupby 
>>> 
>>> def recalculate(records): 
...  actions = [r.actions for r in sorted(records[1], key=lambda r: r.timestamp)] 
...  groups = [list(g) for k, g in groupby(actions)] 
...  return [(records[0], g[0], len(g), i+1) for i, g in enumerate(groups)] 
... 
>>> df_ini.rdd.map(lambda row: (row.user_id, row)) \ 
...  .groupByKey().flatMap(recalculate) \ 
...  .toDF(['user_id', 'actions', 'nf_of_occ', 'order']).show() 
+-------+-------+---------+-----+ 
|user_id|actions|nf_of_occ|order| 
+-------+-------+---------+-----+ 
| 217498|  A|  3| 1| 
| 217498|  B|  1| 2| 
| 217498|  C|  2| 3| 
| 217498|  A|  1| 4| 
| 217498|  B|  2| 5| 
| 854123|  A|  2| 1| 
+-------+-------+---------+-----+ 
Смежные вопросы