Мне нужно разработать скрипт Spark с python, который проверяет некоторые журналы и проверяет, изменил ли пользователь страну своего IP-адреса между двумя событиями. У меня есть файл CSV с IP-диапазонами и ассоциированные страны сохраняются на HDFS, как это:Функция Spark Udf с Dataframe во вводе
startIp, endIp, country
0.0.0.0, 10.0.0.0, Italy
10.0.0.1, 20.0.0.0, England
20.0.0.1, 30.0.0.0, Germany
И журнал CSV файл:
userId, timestamp, ip, event
1, 02-01-17 20:45:18, 10.5.10.3, login
24, 02-01-17 20:46:34, 54.23.16.56, login
загружает оба файл с искровым Dataframe, и я уже изменен тот, который содержит журналы с функцией задержки, добавляя столбец с предыдущимIp. Я решил заменить ip и previousIp на ассоциированную страну, чтобы сравнить их и использовать файл dataFrame.filter ("previousIp"! = "Ip"). Мой вопрос в том, есть ли способ сделать это в Spark? Что-то вроде:
dataFrame = dataFrame.select("userId", udfConvert("ip",countryDataFrame).alias("ip"), udfConvert("previousIp",countryDataFrame).alias("previousIp"),...)
Для того, чтобы иметь Dataframe так:
userId, timestamp, ip, event, previousIp
1, 02-01-17 20:45:18, England, login, Italy
Если нет, то как я могу решить мою проблему? Спасибо