2015-04-09 3 views
0

Я хочу рассчитать разницу во времени между событиями сеанса с использованием Scala.Как рассчитать разницу между двумя записями с помощью Scala?

- ДАЛО Источника является CSV-файл, как показано ниже:

HEADER 
"session","events","timestamp","Records" 
DATA 
"session_1","event_1","2015-01-01 10:10:00",100 
"session_1","event_2","2015-01-01 11:00:00",500 
"session_1","event_3","2015-01-01 11:30:00",300 
"session_1","event_4","2015-01-01 11:45:00",300 
"session_2","event_1","2015-01-01 10:10:00",100 
"session_2","event_2","2015-01-01 11:00:00",500 

РЕГЛАМЕНТИРУЕТСЯ ВЫХОД

HEADER 
"session","events","time_spent_in_minutes","total_records" 
DATA 
"session_1","event_1","50",100 
"session_1","event_2","30",600 
"session_1","event_3","15",900 
"session_1","event_4","0",1200 
"session_2","event_1","50",100 
"session_2","event_2","0",600 

Где time_spend_in_minutes разница между current_event и следующим событием для данной сессии , Заголовок не требуется для цели, но это хорошо.

Я новичок в Scala, так вот, что я до сих пор:

$ cat test.csv 
"session_1","event_1","2015-01-01 10:10:00",100 
"session_1","event_2","2015-01-01 11:00:00",500 
"session_1","event_3","2015-01-01 11:30:00",300 
"session_1","event_4","2015-01-01 11:45:00",300 
"session_2","event_1","2015-01-01 10:10:00",100 
"session_2","event_2","2015-01-01 11:00:00",500 


scala> val sessionFile = sc.textFile("test.csv"). 
map(_.split(',')). 
map(e => (e(1).trim, Sessions(e(0).trim,e(1).trim,e(2).trim,e(3).trim.toInt))). 
foreach(println) 

("event_1",Sessions("session_2","event_1","2015-01-01 10:10:00",100)) 
("event_1",Sessions("session_1","event_1","2015-01-01 10:10:00",100)) 
("event_2",Sessions("session_2","event_2","2015-01-01 11:00:00",500)) 
("event_2",Sessions("session_1","event_2","2015-01-01 11:00:00",500)) 
("event_3",Sessions("session_1","event_3","2015-01-01 11:30:00",300)) 
("event_4",Sessions("session_1","event_4","2015-01-01 11:45:00",300)) 
sessionFile: Unit =() 

scala> 
+1

Редактировать этот пост. – gsamaras

ответ

3

Вот решение, которое использует библиотеку времени Joda.

val input = 
""""session_1","event_1","2015-01-01 10:10:00",100 
    "session_1","event_2","2015-01-01 11:00:00",500 
    "session_1","event_3","2015-01-01 11:30:00",300 
    "session_1","event_4","2015-01-01 11:45:00",300 
    "session_2","event_1","2015-01-01 10:10:00",100 
    "session_2","event_2","2015-01-01 11:00:00",500""" 

Создание RDD от ввода текста, могут быть считаны из файла с помощью sc.textFile

import org.joda.time.format._ 
import org.joda.time._ 

def strToTime(s: String):Long = { 
    DateTimeFormat.forPattern(""""yyyy-MM-dd HH:mm:ss"""") 
        .parseDateTime(s).getMillis()/1000 
} 

val r1 = sc.parallelize(input.split("\n")) 
      .map(_.split(",")) 
      .map(x => (x(0), (x(1), x(2), x(3)))) 
      .groupBy(_._1) 
      .map(_._2.map{ case(s, (e, timestr, r)) => 
           (s, (e, strToTime(timestr), r))} 
        .toArray 
        .sortBy(z => z match { 
         case (session, (event, time, records)) => time})) 

конвертирована из «2015-01-01 10:10:00» в секундах от эпохи, и сортируются по время.

val r2 = r1.map(x => x :+ { val y = x.last; 
          y match { 
          case (session, (event, time, records)) => 
           (session, (event, time, "0")) }}) 

Добавлен дополнительный случай в каждой сессии со всеми параметрами, такими как последнее событие сеанса, кроме количества записей. Это позволяет вычислять время-продолжительность для «0» в последнем случае.

Используйте sliding, чтобы получить пару событий.

val r3 = r2.map(x => x.sliding(2).toArray) 

val r4 = r3.map(x => x.map{ 
     case Array((s1, (e1, t1, c1)), (s2, (e2, t2, c2))) => 
        (s1, (e1, (t2 - t1)/60, c1)) }) 

Использование scan для добавления записей подсчета в дополнительном пути.

val r5 = r4.map(x => x.zip(x.map{ case (s, (e, t, r)) => r.toInt} 
          .scan(0)(_+_) 
          .drop(1))) 

val r6 = r5.map(x => x.map{ case ((s, (e, t, r)), recordstillnow) => 
          s"${s},${e},${t},${recordstillnow}" }) 

val r7 = r6.flatMap(x => x) 

r7.collect.mkString("\n") 
//"session_2","event_1",50,100 
//"session_2","event_2",0,600 
//"session_1","event_1",50,100 
//"session_1","event_2",30,600 
//"session_1","event_3",15,900 
//"session_1","event_4",0,1200 
+0

Возможно, стоит упомянуть, что из-за 'groupBy' этот подход не может масштабироваться до произвольных длительных сеансов. Наверное, это хорошо на практике и приводит к хорошему решению с «скользящим»! –

0

попробовать что-то вроде этого:

import org.joda.time.format._ 
import org.joda.time._ 
val d1 = DateTime.parse("2015-03-03", DateTimeFormat.forPattern("yyyy-MM-dd")) 
val d2 = DateTime.parse("2015-03-04", DateTimeFormat.forPattern("yyyy-MM-dd")) 
d1.getMillis() - d2.getMillis() 
Смежные вопросы