20

мне интересно, как я могу достичь следующих в Спарк (Pyspark)Спарк добавить новый столбец dataframe со значением из предыдущего ряда

Начальная Dataframe:

+--+---+ 
|id|num| 
+--+---+ 
|4 |9.0| 
+--+---+ 
|3 |7.0| 
+--+---+ 
|2 |3.0| 
+--+---+ 
|1 |5.0| 
+--+---+ 

Результирующая Dataframe:

+--+---+-------+ 
|id|num|new_Col| 
+--+---+-------+ 
|4 |9.0| 7.0 | 
+--+---+-------+ 
|3 |7.0| 3.0 | 
+--+---+-------+ 
|2 |3.0| 5.0 | 
+--+---+-------+ 

Мне удается «добавить» новые столбцы к фреймворку данных, используя что-то вроде: df.withColumn("new_Col", df.num * 10)

Однако я не знаю, как я могу добиться этого «сдвига строк» ​​для нового столбца, так что новый столбец имеет значение поля из предыдущей строки (как показано в примере). Я также не мог найти ничего в документации API о том, как получить доступ к определенной строке в DF по индексу.

Любая помощь будет оценена по достоинству.

ответ

24

Вы можете использовать lag оконной функции следующим образом

from pyspark.sql.functions import lag, col 
from pyspark.sql.window import Window 

df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"]) 
w = Window().partitionBy().orderBy(col("id")) 
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show() 

## +---+---+-------+ 
## | id|num|new_col| 
## +---+---+-------| 
## | 2|3.0| 5.0| 
## | 3|7.0| 3.0| 
## | 4|9.0| 7.0| 
## +---+---+-------+ 

но есть некоторые важные вопросы:

  1. если вам нужна глобальная операция (не распределяли некоторые другие столбцы/столбцами) чрезвычайно неэффективен.
  2. Вам нужен естественный способ заказывать ваши данные.

В то время как вторая проблема почти никогда не является проблемой, первая из них может быть разрывом сделки. Если это так, вы должны просто преобразовать DataFrame в RDD и вычислить lag вручную. Смотрите, например:

Другие полезные ссылки: