Там не эффективный метод, который может выполнять операции, как этот вне коробки. Вы можете использовать функции окна:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag
df = sc.parallelize([
(1473678846, 2), (1473678852, 1),
(1473679029, 3), (1473679039, 3),
(1473679045, 2), (1473679055, 1)
]).toDF(["time", "door_status"])
w = Window().orderBy("time")
(df
.withColumn("prev_status", lag("door_status", 1).over(w))
.where(col("door_status") != col("prev_status"))
.groupBy("door_status", "prev_status")
.count())
но это просто не будет масштабироваться. Вы можете попробовать mapParitions
. Кулак давайте определим функцию, мы будем использовать для отображения разделов:
from collections import Counter
def process_partition(iter):
"""Given an iterator of (time, state)
return the first state, the last state and
a counter of state changes
>>> process_partition([])
[(None, None, Counter())]
>>> process_partition(enumerate([1, 1, 1]))
[(1, 1, Counter())]
>>> process_partition(enumerate([1, 2, 3]))
[(1, 3, Counter({(1, 2): 1, (2, 3): 1}))]
"""
first = None
prev = None
cnt = Counter()
for i, (_, x) in enumerate(iter):
# Store the first object per partition
if i == 0:
first = x
# If change of state update couter
if prev is not None and prev != x:
cnt[(prev, x)] += 1
prev = x
return [(first, prev, cnt)]
и простые функции слияния:
def merge(x, y):
"""Given a pair of tuples:
(first-state, last-state, counter_of changes)
return a tuple of the same shape representing aggregated results
>>> merge((None, None, Counter()), (1, 1, Counter()))
(None, 1, Counter())
>>> merge((1, 2, Counter([(1, 2)])), (2, 2, Counter()))
(None, 2, Counter({(1, 2): 1}))
>>> merge((1, 2, Counter([(1, 2)])), (3, 2, Counter([(3, 2)])
(None, 2, Counter({(1, 2): 1, (2, 3): 1, (3, 2): 1}))
"""
(_, last_x, cnt_x), (first_y, last_y, cnt_y) = x, y
# If previous partition wasn't empty update counter
if last_x is not None and first_y is not None and last_x != first_y:
cnt_y[(last_x, first_y)] += 1
# Merge counters
cnt_y.update(cnt_x)
return (None, last_y, cnt_y)
С этими двумя мы можем выполнить операцию так:
partials = (df.rdd
.mapPartitions(process_partition)
.collect())
reduce(merge, [(None, None, Counter())] + partials)
@ zero323 Спасибо. Первый метод работал. Как уже упоминалось, для больших объемов данных он кажется медленным. Мне еще предстоит попробовать второй метод. – Sisyphus
@DavidArenburg Я не уверен, следую ли я. Не могли бы вы уточнить? – zero323
@DavidArenburg Нет, конечно нет. Я использовал его для запуска набора тестов с разным расположением разделов. Спасибо! – zero323