2016-03-21 2 views
0

У меня есть куча событий в искре (пользователь щелкает/действия/нажимает кнопку), характеризуемых note колонки:Sessionization с искрой

>>> df.show(20) 
+-------+-------------+------------+------+ 
| user| timestamp|  note|action| 
+-------+-------------+------------+------+ 
|2376466|1458580817381|event #1 ...|UPDATE| 
|2376466|1458580822034|event #1 ...|UPDATE| 
|2376466|1458580822112|event #2 ...|UPDATE| 
|2376466|1458580822166|event #2 ...|UPDATE| 
|2376466|1458580822216|event #2 ...|UPDATE| 
|2376466|1458580822225|event #2 ...|UPDATE| 
|2376466|1458580822651|event #1 ...|UPDATE| 
|2376466|1458580822660|event #1 ...|UPDATE| 
+-------+-------------+------------+------+ 

Я хотел бы знать, длительность «сеанс» от note. Например, событие №2 началось с 1458580822112 и закончилось на 1458580822225, поэтому продолжительность будет ..225 - ..112 = 113 мс. Есть ли какие-либо свечи или ярлыки для организации данных в «сеансах» или каким-то другим способом для извлечения такой информации? Или идея состоит в том, чтобы добавлять дополнительную информацию о состоянии в каждую строку и свертывать ее, когда столбец идентификатора сеанса готов ?

ПРИМЕЧАНИЕ.. несколько нот одного и того же типа должны рассматриваться как отдельные сессии

+0

Насколько я понимаю, ваши намерения возможны с помощью оконных функций, но они не являются ни хорошенькими, ни особенно эффективными. Тем не менее я уже опубликовал некоторые ответы на подобные вопросы. В общем, было бы проще реализовать что-то вроде этого w RDD (возможно, с наборами данных), но это довольно широкий вопрос. – zero323

+0

Не возражаете ли вы выкапывать подобный вопрос на поверхность? – Oleksiy

+0

Я буду искать завтра, если не забуду, но это всего лишь три основных шага: 1) определить переключение между «сеансами» (отставание); 2) добавить идентификатор сеанса (суммарная сумма по точкам переключения); 3) некоторые статистические данные – zero323

ответ

2

Вы можете использовать Спарк-SQL для того чтобы достигнуть своей цели Вот код, который работает для меня, который будет давать сеансы. Вы можете написать вспомогательную функцию, а затем зарегистрировать ее как UDF. Этот UDF может быть вызван в вашем операторе SQL.

df.registerTempTable("Events")  
import sqlContext.implicits._ 

# (You can modify this according to what exact value have in note column.) 

def process(colname: String):String = {  
    return colname.substring(0,8)  
} 

sqlContext.udf.register("process",process _)  
val dd = sqlContext.sql("select timestamp as timestamp, process(note) as note from Events") 

dd.registerTempTable("SubEvents") 

val dt = sqlContext.sql("select last(timestamp) - first(timestamp) as session, note as note from SubEvents group by note") 

dt.show()  
+--------+--------+  
|session| note|  
+--------+--------+  
|  5|event #1|  
|  2|event #2|  
|  1|event #3|  
+--------+--------+ 

Также в комплекте bluemix искра ноутбук можно рассматривать here: -

Спасибо,

Чарльза.

+0

пожалуйста, отформатируйте свой код, это невозможно прочитать! – eliasah

+0

Это полезно, но он будет охватывать все сеансы в одном и определять таймеры «min» и «max» для всех событий. Я хочу сохранить несколько последовательных событий как один «сеанс» – Oleksiy

+0

Итак, вы хотите группировать по пользователю? –

Смежные вопросы