Вообще не эффективный способ для достижения этой цели, используя искру DataFrames
. Не говоря уже о таких вещах, как заказ, становится довольно сложным в распределенной настройке. Теоретически вы можете использовать lag
функцию следующим образом:
from pyspark.sql.functions import lag, col, unix_timestamp
from pyspark.sql.window import Window
dev_time = (unix_timestamp(col("dev_time")) * 1000).cast("timestamp")
df = sc.parallelize([
("2015-09-18 05:00:20",), ("2015-09-18 05:00:21",),
("2015-09-18 05:00:22",), ("2015-09-18 05:00:23",),
("2015-09-18 05:00:24",), ("2015-09-18 05:00:25",),
("2015-09-18 05:00:26",), ("2015-09-18 05:00:27",),
("2015-09-18 05:00:37",), ("2015-09-18 05:00:37",),
("2015-09-18 05:00:37",), ("2015-09-18 05:00:38",),
("2015-09-18 05:00:39",)
]).toDF(["dev_time"]).withColumn("dev_time", dev_time)
w = Window.orderBy("dev_time")
lag_dev_time = lag("dev_time").over(w).cast("integer")
diff = df.select((col("dev_time").cast("integer") - lag_dev_time).alias("diff"))
## diff.show()
## +----+
## |diff|
## +----+
## |null|
## | 1|
## | 1|
## | 1|
## | 1|
## | 1|
## | 1|
## | 1|
## | 10|
## ...
но крайне неэффективно (как оконные функции переместить все данные в одном разделе, если не PARTITION BY
пункт не предусмотрен). На практике имеет смысл использовать метод sliding
на RDD (Scala) или реализовать собственное скользящее окно (Python). См:
Добро пожаловать на ТАК! Пожалуйста, поделитесь [MCVE] (http://stackoverflow.com/help/mcve), чтобы мы могли попытаться помочь. Это вопрос с очень низким качеством – eliasah