2015-06-17 7 views
14

У меня есть DataFrame, который выглядит примерно так. Я хочу работать в день поля date_time.PySpark добавить столбец в DataFrame из столбца TimeStampType

root 
|-- host: string (nullable = true) 
|-- user_id: string (nullable = true) 
|-- date_time: timestamp (nullable = true) 

Я попытался добавить столбец, чтобы извлечь день. Пока мои попытки потерпели неудачу.

df = df.withColumn("day", df.date_time.getField("day")) 

org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type TimestampType; 

Это также не

df = df.withColumn("day", df.select("date_time").map(lambda row: row.date_time.day)) 

AttributeError: 'PipelinedRDD' object has no attribute 'alias' 

Любая идея, как это можно сделать?

ответ

30

Вы можете использовать простой map:

df.rdd.map(lambda row: 
    Row(row.__fields__ + ["day"])(row + (row.date_time.day,)) 
) 

Другой вариант заключается в регистрации функции и запустить SQL запрос:

sqlContext.registerFunction("day", lambda x: x.day) 
sqlContext.registerDataFrameAsTable(df, "df") 
sqlContext.sql("SELECT *, day(date_time) as day FROM df") 

Наконец, вы можете определить UDF как это:

from pyspark.sql.functions import udf 
from pyspark.sql.types import IntegerType 

day = udf(lambda date_time: date_time.day, IntegerType()) 
df.withColumn("day", day(df.date_time)) 

EDIT:

На самом деле, если вы используете необработанный SQL day, функция уже определена (по крайней мере, в Spark 1.4), поэтому вы можете опустить регистрацию udf. Он также предоставляет целый ряд различных функций обработки даты, включая:

Это также можно использовать простые выражения даты:

current_timestamp() - expr("INTERVAL 1 HOUR") 

Это означает, что вы можете создавать относительно сложные запросы, не передавая данные на Python. Например:

df = sc.parallelize([ 
    (1, "2016-01-06 00:04:21"), 
    (2, "2016-05-01 12:20:00"), 
    (3, "2016-08-06 00:04:21") 
]).toDF(["id", "ts_"]) 

now = lit("2016-06-01 00:00:00").cast("timestamp") 
five_months_ago = now - expr("INTERVAL 5 MONTHS") 

(df 
    # Cast string to timestamp 
    # For Spark 1.5 use cast("double").cast("timestamp") 
    .withColumn("ts", unix_timestamp("ts_").cast("timestamp")) 
    # Find all events in the last five months 
    .where(col("ts").between(five_months_ago, now)) 
    # Find first Sunday after the event 
    .withColumn("next_sunday", next_day(col("ts"), "Sun")) 
    # Compute difference in days 
    .withColumn("diff", datediff(col("ts"), col("next_sunday")))) 
+0

Существует много столбцов, и я хочу добавить еще один. Метод карты может быть слишком громоздким, чтобы перечислять все существующие столбцы. Я попробую использовать функцию регистрации. Спасибо. –

+0

Вам не нужно перечислять все существующие столбцы на карте. Возможно просто воссоздать строку. Я обновил ответ, чтобы это отразить. При таком подходе есть две проблемы. Он возвращает RDD строк, а не DataFrame, и это, скорее всего, медленнее, чем оптимизированный SQL. – zero323

+1

Определение udf кажется чистым способом, который я нашел до сих пор. Добавлено в ответ. – zero323

Смежные вопросы