Вы можете использовать простой map
:
df.rdd.map(lambda row:
Row(row.__fields__ + ["day"])(row + (row.date_time.day,))
)
Другой вариант заключается в регистрации функции и запустить SQL запрос:
sqlContext.registerFunction("day", lambda x: x.day)
sqlContext.registerDataFrameAsTable(df, "df")
sqlContext.sql("SELECT *, day(date_time) as day FROM df")
Наконец, вы можете определить UDF как это:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
day = udf(lambda date_time: date_time.day, IntegerType())
df.withColumn("day", day(df.date_time))
EDIT:
На самом деле, если вы используете необработанный SQL day
, функция уже определена (по крайней мере, в Spark 1.4), поэтому вы можете опустить регистрацию udf. Он также предоставляет целый ряд различных функций обработки даты, включая:
Это также можно использовать простые выражения даты:
current_timestamp() - expr("INTERVAL 1 HOUR")
Это означает, что вы можете создавать относительно сложные запросы, не передавая данные на Python. Например:
df = sc.parallelize([
(1, "2016-01-06 00:04:21"),
(2, "2016-05-01 12:20:00"),
(3, "2016-08-06 00:04:21")
]).toDF(["id", "ts_"])
now = lit("2016-06-01 00:00:00").cast("timestamp")
five_months_ago = now - expr("INTERVAL 5 MONTHS")
(df
# Cast string to timestamp
# For Spark 1.5 use cast("double").cast("timestamp")
.withColumn("ts", unix_timestamp("ts_").cast("timestamp"))
# Find all events in the last five months
.where(col("ts").between(five_months_ago, now))
# Find first Sunday after the event
.withColumn("next_sunday", next_day(col("ts"), "Sun"))
# Compute difference in days
.withColumn("diff", datediff(col("ts"), col("next_sunday"))))
Существует много столбцов, и я хочу добавить еще один. Метод карты может быть слишком громоздким, чтобы перечислять все существующие столбцы. Я попробую использовать функцию регистрации. Спасибо. –
Вам не нужно перечислять все существующие столбцы на карте. Возможно просто воссоздать строку. Я обновил ответ, чтобы это отразить. При таком подходе есть две проблемы. Он возвращает RDD строк, а не DataFrame, и это, скорее всего, медленнее, чем оптимизированный SQL. – zero323
Определение udf кажется чистым способом, который я нашел до сих пор. Добавлено в ответ. – zero323