2017-01-10 2 views
1

Мне нужно разработать скрипт Spark с python, который проверяет некоторые журналы и проверяет, изменил ли пользователь страну своего IP-адреса между двумя событиями. У меня есть файл CSV с IP-диапазонами и ассоциированные страны сохраняются на HDFS, как это:Функция Spark Udf с Dataframe во вводе

startIp, endIp, country 
0.0.0.0, 10.0.0.0, Italy 
10.0.0.1, 20.0.0.0, England 
20.0.0.1, 30.0.0.0, Germany 

И журнал CSV файл:

userId, timestamp, ip, event 
1, 02-01-17 20:45:18, 10.5.10.3, login 
24, 02-01-17 20:46:34, 54.23.16.56, login 

загружает оба файл с искровым Dataframe, и я уже изменен тот, который содержит журналы с функцией задержки, добавляя столбец с предыдущимIp. Я решил заменить ip и previousIp на ассоциированную страну, чтобы сравнить их и использовать файл dataFrame.filter ("previousIp"! = "Ip"). Мой вопрос в том, есть ли способ сделать это в Spark? Что-то вроде:

dataFrame = dataFrame.select("userId", udfConvert("ip",countryDataFrame).alias("ip"), udfConvert("previousIp",countryDataFrame).alias("previousIp"),...)

Для того, чтобы иметь Dataframe так:

userId, timestamp, ip, event, previousIp 
1, 02-01-17 20:45:18, England, login, Italy 

Если нет, то как я могу решить мою проблему? Спасибо

ответ

0

На самом деле это довольно просто, если вы сначала конвертируете IP-адрес в номер. Вы можете написать свой собственный UDF или использовать код из petrabarus и зарегистрировать функцию, как это:

spark.sql("CREATE TEMPORARY FUNCTION iptolong as 'net.petrabarus.hiveudfs.IPToLong'") 

Затем карту стран CSV в dataframe с номерами:

>>> ipdb = spark.read.csv('ipdb.csv', header=True).select(
      expr('iptolong(startIp)').alias('ip_from'), 
      expr('iptolong(endIp)').alias('ip_to'), 
      'country') 
>>> ipdb.show() 
+---------+---------+-------+ 
| ip_from| ip_to|country| 
+---------+---------+-------+ 
|  0|167772160| Italy| 
|167772161|335544320|England| 
|335544321|503316480|Germany| 
+---------+---------+-------+ 

Кроме того, карта вашего журнала dataframe номера:

>>> log = spark.createDataFrame([('15.0.0.1',)], ['ip']) \ 
      .withColumn('ip', expr('iptolong(ip)')) 
>>> log.show() 
+---------+ 
|  ip| 
+---------+ 
|251658241| 
+---------+ 

Затем вы можете присоединиться к этому dataframe используя between условие:

>>> log.join(broadcast(ipdb), log.ip.between(ipdb.ip_from, ipdb.ip_to)).show() 
+---------+---------+---------+-------+ 
|  ip| ip_from| ip_to|country| 
+---------+---------+---------+-------+ 
|251658241|167772161|335544320|England| 
+---------+---------+---------+-------+