Я новый Спарк, и я пытаюсь разработать сценарий питона, который читает CSV-файл с некоторыми журналами:Проверьте журналы с искрой
userId,timestamp,ip,event
13,2016-12-29 16:53:44,86.20.90.121,login
43,2016-12-29 16:53:44,106.9.38.79,login
66,2016-12-29 16:53:44,204.102.78.108,logoff
101,2016-12-29 16:53:44,14.139.102.226,login
91,2016-12-29 16:53:44,23.195.2.174,logoff
и проверяет, если пользователь имел какое-то странные модели поведения, для например, если он сделал два последовательных «входа», не делая «logoff». Я загрузил csv как Spark dataFrame, и мне захотелось сравнить строки журнала одного пользователя, упорядоченные по метке времени, и проверить, имеют ли два последовательных события одного типа (login - login, logff - logff). Я ищу для этого в режиме «map-reduce», но на данный момент я не могу понять, как использовать функцию уменьшения, которая сравнивает последовательные строки. Код, который я написал, работает, но производительность очень плохая.
sc = SparkContext("local","Data Check")
sqlContext = SQLContext(sc)
LOG_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/flume/events/*"
RESULTS_FILE_PATH = "hdfs://quickstart.cloudera:8020/user/cloudera/spark/script_results/prova/bad_users.csv"
N_USERS = 10*1000
dataFrame = sqlContext.read.format("com.databricks.spark.csv").load(LOG_FILE_PATH)
dataFrame = dataFrame.selectExpr("C0 as userID","C1 as timestamp","C2 as ip","C3 as event")
wrongUsers = []
for i in range(0,N_USERS):
userDataFrame = dataFrame.where(dataFrame['userId'] == i)
userDataFrame = userDataFrame.sort('timestamp')
prevEvent = ''
for row in userDataFrame.rdd.collect():
currEvent = row[3]
if(prevEvent == currEvent):
wrongUsers.append(row[0])
prevEvent = currEvent
badUsers = sqlContext.createDataFrame(wrongUsers)
badUsers.write.format("com.databricks.spark.csv").save(RESULTS_FILE_PATH)
Большое спасибо, это именно то решение, которое я искал. Можно также передать функцию методу фильтрации? Потому что мне также нужно кое-что сделать с ip –
Да, конечно, вы можете добавить что-то в фильтр. Просто помните, что если вы не можете выразить то, что хотите, чтобы ваша функция выполнялась с помощью простых сравнений или с помощью [функций] Spark's Column (http://spark.apache.org/docs/latest/api/python/pyspark.sql .html? highlight = functions # module-pyspark.sql.functions), вам нужно будет использовать 'UDF' s, см. это [страница] (https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content /spark-sql-udfs.html) для получения дополнительной информации. –