Возможно ли обновить столбец данных hiveContext в pyspark, используя сложную функцию, не выполняемую в UDF?Обновление столбца dataframe pyspark со сложной функцией
У меня есть dataframe, содержащий много столбцов, из которых 2 столбца называются меткой времени и данными. Мне нужно получить временную метку из строки JSON в данных и обновить столбец timestamp, если метка времени в данных соответствует определенным критериям. Я знаю, что эти фреймы являются неизменяемыми, но возможно каким-то образом создать новый фреймворк, сохраняющий все столбцы старого фрейма данных, но обновляющий столбец timstamp?
код, иллюстрирующий то, что я хотел бы сделать:
def updateTime(row):
import json
THRESHOLD_TIME = 60 * 30
client_timestamp = json.loads(row['data'])
client_timestamp = float(client_timestamp['timestamp'])
server_timestamp = float(row['timestamp'])
if server_timestamp - client_timestamp <= THRESHOLD_TIME:
new_row = ..... # copy contents of row
new_row['timestamp'] = client_timestamp
return new_row
else:
return row
df = df.map(updateTime)
Я думал отображения содержимого строки в кортеж, а затем преобразовать его обратно в dataframe с .toDF(), но я не могу найти способ скопировать содержимое строки в кортеж, а затем вернуть имена столбцов.
А если вы используете 'UDF'? –
Возможно, эта статья может помочь: http://www.sparktutorials.net/using-sparksql-udfs-to-create-date-times-in-spark-1.5 –
Жаль, что я имел в виду UDF вместо HDF ... typo .. . – SK2