2016-12-29 5 views
2

Я новый Спарк, и я пытаюсь разработать сценарий питона, который читает CSV-файл с некоторыми журналами:Проверьте журналы с искрой

userId,timestamp,ip,event 
13,2016-12-29 16:53:44,86.20.90.121,login 
43,2016-12-29 16:53:44,106.9.38.79,login 
66,2016-12-29 16:53:44,204.102.78.108,logoff 
101,2016-12-29 16:53:44,14.139.102.226,login 
91,2016-12-29 16:53:44,23.195.2.174,logoff 

и проверяет, если пользователь имел какое-то странные модели поведения, для например, если он сделал два последовательных «входа», не делая «logoff». Я загрузил csv как Spark dataFrame, и мне захотелось сравнить строки журнала одного пользователя, упорядоченные по метке времени, и проверить, имеют ли два последовательных события одного типа (login - login, logff - logff). Я ищу для этого в режиме «map-reduce», но на данный момент я не могу понять, как использовать функцию уменьшения, которая сравнивает последовательные строки. Код, который я написал, работает, но производительность очень плохая.

sc = SparkContext("local","Data Check") 
sqlContext = SQLContext(sc) 

LOG_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events/*" 
RESULTS_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/spark/script_results/prova/bad_users.csv" 
N_USERS = 10*1000 

dataFrame = sqlContext.read.format("com.databricks.spark.csv").load(LOG_FILE_PATH) 
dataFrame = dataFrame.selectExpr("C0 as userID","C1 as timestamp","C2 as ip","C3 as event") 

wrongUsers = [] 

for i in range(0,N_USERS): 

    userDataFrame = dataFrame.where(dataFrame['userId'] == i) 
    userDataFrame = userDataFrame.sort('timestamp') 

    prevEvent = '' 

    for row in userDataFrame.rdd.collect(): 

     currEvent = row[3] 
     if(prevEvent == currEvent): 
      wrongUsers.append(row[0]) 

     prevEvent = currEvent 

badUsers = sqlContext.createDataFrame(wrongUsers) 
badUsers.write.format("com.databricks.spark.csv").save(RESULTS_FILE_PATH) 

ответ

1

Первый (не связанные, но все же), убедитесь, что число записей для каждого пользователя не так уж велик, потому что collect в for row in userDataFrame.rdd.collect(): опасно.

Во-вторых, вам не нужно оставлять область DataFrame здесь, чтобы использовать классический Python, просто придерживайтесь Spark.

Теперь, ваша проблема. Это в основном «для каждой строки, которую я хочу что-то знать из предыдущей строки»: это относится к концепции функций Window и, если быть точным, к функции lag. Вот две интересные статьи о функциях Window в Spark: одна из Databricks с кодом в Python и одна из Xinh с (я думаю, это проще понять) примерами в Scala.

У меня есть решение в Scala, но я думаю, что вы будете тянуть его перевод его в Python:

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.lag 

import sqlContext.implicits._ 

val LOG_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events/*" 
val RESULTS_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/spark/script_results/prova/bad_users.csv" 

val data = sqlContext 
    .read 
    .format("com.databricks.spark.csv") 
    .option("inferSchema", "true") 
    .option("header", "true") // use the header from your csv 
    .load(LOG_FILE_PATH) 

val wSpec = Window.partitionBy("userId").orderBy("timestamp") 

val badUsers = data 
    .withColumn("previousEvent", lag($"event", 1).over(wSpec)) 
    .filter($"previousEvent" === $"event") 
    .select("userId") 
    .distinct 

badUsers.write.format("com.databricks.spark.csv").save(RESULTS_FILE_PATH) 

В основном вы просто получить значение из предыдущей строки и сравните его со значением на ваш текущая строка, если это совпадение, это неправильное поведение, и вы сохраняете userId. Для первой строки в вашем «блоке» строк для каждого userId предыдущее значение будет null: при сравнении с текущим значением булево выражение будет false, поэтому проблем здесь нет.

+0

Большое спасибо, это именно то решение, которое я искал. Можно также передать функцию методу фильтрации? Потому что мне также нужно кое-что сделать с ip –

+0

Да, конечно, вы можете добавить что-то в фильтр. Просто помните, что если вы не можете выразить то, что хотите, чтобы ваша функция выполнялась с помощью простых сравнений или с помощью [функций] Spark's Column (http://spark.apache.org/docs/latest/api/python/pyspark.sql .html? highlight = functions # module-pyspark.sql.functions), вам нужно будет использовать 'UDF' s, см. это [страница] (https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content /spark-sql-udfs.html) для получения дополнительной информации. –