2016-05-10 3 views
0

Возможно ли обновить столбец данных hiveContext в pyspark, используя сложную функцию, не выполняемую в UDF?Обновление столбца dataframe pyspark со сложной функцией

У меня есть dataframe, содержащий много столбцов, из которых 2 столбца называются меткой времени и данными. Мне нужно получить временную метку из строки JSON в данных и обновить столбец timestamp, если метка времени в данных соответствует определенным критериям. Я знаю, что эти фреймы являются неизменяемыми, но возможно каким-то образом создать новый фреймворк, сохраняющий все столбцы старого фрейма данных, но обновляющий столбец timstamp?

код, иллюстрирующий то, что я хотел бы сделать:

def updateTime(row): 
    import json 

    THRESHOLD_TIME = 60 * 30 
    client_timestamp = json.loads(row['data']) 
    client_timestamp = float(client_timestamp['timestamp']) 
    server_timestamp = float(row['timestamp']) 
    if server_timestamp - client_timestamp <= THRESHOLD_TIME: 
     new_row = ..... # copy contents of row 
     new_row['timestamp'] = client_timestamp 
     return new_row 
    else: 
     return row 

df = df.map(updateTime) 

Я думал отображения содержимого строки в кортеж, а затем преобразовать его обратно в dataframe с .toDF(), но я не могу найти способ скопировать содержимое строки в кортеж, а затем вернуть имена столбцов.

+0

А если вы используете 'UDF'? –

+0

Возможно, эта статья может помочь: http://www.sparktutorials.net/using-sparksql-udfs-to-create-date-times-in-spark-1.5 –

+0

Жаль, что я имел в виду UDF вместо HDF ... typo .. . – SK2

ответ

0

Если адаптировать updateTime функцию, чтобы получать Timestamp в качестве параметра и возвращает новый обработанный Timestamp, вы можете создать UDF и использовать его непосредственно на колонке DataFrame в:

from pyspark.sql.functions import * 
from pyspark.sql.types import TimestampType 

myUDF = udf(updateTime, TimestampType()) 
df = df.withColumn("timestamp", myUDF(col("timestamp")) 

Однако в вашем случае я думаю, что это немного сложнее:

from pyspark.sql.functions import * 
from pyspark.sql.types import TimestampType 

myUDF = udf(getClientTime, TimestampType()) 
client_timestamp = myUDF(col("data")) 
server_timestamp = col("timestamp") 
condition = server_timestamp.cast("float") - client_timestamp.cast("float") <= THRESHOLD_TIME  

newCol = when(condition, client_timestamp).otherwise(server_timestamp) 
newDF = df.withColumn("new_timestamp", newCol) 

с помощью этого второго подхода, функция getClientTime принимает значение из столбца data и возвращает клиенту метку времени для этого значения. Затем вы можете использовать его для создания нового столбца (client_timestamp), который содержит эту информацию. Наконец, вы можете использовать when для создания нового столбца условно, исходя из значений столбца server_timestamp и вновь созданного столбца client_timestamp.

Ссылка:

+1

Спасибо! Метод работает с некоторыми незначительными изменениями. Похоже, я не понимал, как UDF работали ранее. Отредактировал myUDF, чтобы вернуть StringType() вместо него и использовать df ['column'] вместо col ('column') – SK2

Смежные вопросы