2015-02-18 2 views
0

Я хочу рассчитать разницу во времени между входом и выходом для каждого идентификатора. Данные в формате:Распаковать список, развернуть и рассчитать разницу

String,Long,String,List[String] 
====================================== 
in, time0, door1, [id1, id2, id3, id4] 
out, time1, door1, [id1, id2, id3] 
out, time2, door1, [id4, id5] 

В конце концов он должен закончить с пар ключ-значение, как:

{(id1, #time1-time0), (id2, #time1-time0), (id3, #time1-time0), (id4, #time2-time0), (id5, N/A)} 

Что бы хороший подход для решения этой проблемы?

EDIT: Я пробовал следующее.

case class Data(direction: String, time:Long, door:String, ids: List[String]) 
val data = sc.parallelize(Seq(Data("in", 5, "d1", List("id1", "id2", "id3", "id4")),Data("out", 20, "d1", List("id1", "id2", "id3")), Data("out",50, "d1", List("id4", "id5")))) 
data.flatMap(x => (x.ids, x)) 
+0

Я вижу тег apache-spark. Вы ищете способ сделать это с помощью Spark? Сколько у вас данных? –

+1

Я хочу увидеть какой-то код .... что ты пробовал ...? –

+0

Да, я хочу сделать это с помощью Spark. У меня нет данных. Я просто хочу изучить Spark, но решение должно работать с миллионами строк. – user684322

ответ

0
scala> case class Data(direction: String, time: Long, door: String, ids: List[ String ]) 
defined class Data 

scala> val data = sc.parallelize(Seq(Data("in", 5, "d1", List("id1", "id2", "id3", "id4")), Data("out", 20, "d1", List("id1", "id2", "id3")), Data("out",50, "d1", List("id4", "id5")))) 
data: org.apache.spark.rdd.RDD[Data] = ParallelCollectionRDD[0] at parallelize at <console>:14 

// Get an RDD entry for each (id, data) pair 
scala> data.flatMap(x => x.ids.map(id => (id, x))) 
res0: org.apache.spark.rdd.RDD[(String, Data)] = FlatMappedRDD[1] at flatMap at <console>:17 

// group by id to get data's with same id's 
scala> res0.groupBy({ case (id, data) => id }) 
res1: org.apache.spark.rdd.RDD[(String, Iterable[(String, Data)])] = ShuffledRDD[3] at groupBy at <console>:19 

// convert Iterable[(String, Data)] to List[Data] 
scala> res1.map({ case (id, iter) => (id, iter.toList.map({ case (i, d) => d })) }) 
res2: org.apache.spark.rdd.RDD[(String, List[Data])] = MappedRDD[4] at map at <console>:21 

// sort list of data's by data.time 
res2.map({ case (id, list) => (id, list.sortBy(d => d.time)) }) 
res3: org.apache.spark.rdd.RDD[(String, List[Data])] = MappedRDD[5] at map at <console>:23 

// get the time diff by doing lastData.time - firstData.time for each id 
scala> :paste 
// Entering paste mode (ctrl-D to finish) 

res3.map({ case (id, list) => { 
    list match { 
     case d :: Nil => (id, None) 
     case d :: tail => (id, Some(list.last.time - d.time)) 
     case _ => (id, None) 
    } 
} }) 

// Exiting paste mode, now interpreting. 

res6: org.apache.spark.rdd.RDD[(String, Option[Long])] = MappedRDD[7] at map at <console>:25 

Теперь res6 имеет свои необходимые данные.

Также ... Я не был уверен, как вы хотели использовать direction, поэтому я не использовал его, изменил часть кода, чтобы получить то, что вы хотите (я думаю, что только последнее res3 вещь нужно немного изменить) или вы можете объяснить это здесь, и, возможно, я дам вам ответ. Если у вас есть другие сомнения ... спросите прочь.

Это также может быть достигнуто более сжатым способом ... но это будет трудно понять. Вот почему я предоставил подробный и простой код.

+0

Это работает. Большое спасибо. Я постараюсь это полностью понять. Идея с направлением заключалась в том, что время должно быть вычтено с соответствующим временем. – user684322

Смежные вопросы