Есть несколько эффективных способов реализации этого. Начнем с необходимым импортом:
from pyspark.sql.functions import col, expr, when
Вы можете использовать улую IF
функции внутри выража:
new_column_1 = expr(
"""IF(fruit1 IS NULL OR fruit2 IS NULL, 3, IF(fruit1 = fruit2, 1, 0))"""
)
или when
+ otherwise
:
new_column_2 = when(
col("fruit1").isNull() | col("fruit2").isNull(), 3
).when(col("fruit1") == col("fruit2"), 1).otherwise(0)
Наконец, вы можете использовать следующий трюк:
from pyspark.sql.functions import coalesce, lit
new_column_3 = coalesce((col("fruit1") == col("fruit2")).cast("int"), lit(3))
с примерными данными:
df = sc.parallelize([
("orange", "apple"), ("kiwi", None), (None, "banana"),
("mango", "mango"), (None, None)
]).toDF(["fruit1", "fruit2"])
вы можете использовать это следующим образом:
(df
.withColumn("new_column_1", new_column_1)
.withColumn("new_column_2", new_column_2)
.withColumn("new_column_3", new_column_3))
и результат:
+------+------+------------+------------+------------+
|fruit1|fruit2|new_column_1|new_column_2|new_column_3|
+------+------+------------+------------+------------+
|orange| apple| 0| 0| 0|
| kiwi| null| 3| 3| 3|
| null|banana| 3| 3| 3|
| mango| mango| 1| 1| 1|
| null| null| 3| 3| 3|
+------+------+------------+------------+------------+
Я получил пару ошибок из этого раствора, @David , Первая была решена с помощью 'from pyspark.sql.types import StringType'. Второй - это: 'TypeError: 'int' object не вызываем', и я не уверен, как его разрешить. Обратите внимание, что 'df' является' pyspark.sql.dataframe.DataFrame'. – user2205916
@ user2205916 У меня было несколько опечаток. В строке 'def func (...' у меня был 'fruit 1' (с пробелом) вместо' fruit1'. В строке, начинающейся с 'func_udf = ...' у меня был 'StringType' вместо' IntegerType'. Попробуйте с обновленным кодом и дайте мне знать, если у вас все еще есть проблемы – David
Я получаю то же сообщение об ошибке. Кроме того, я думаю, что в конце 'df =. .4444 отсутствует парень. – user2205916