2015-01-22 3 views
0

Я новичок в Spark имел вопрос о длительности вычисления. У меня есть журнал сервера с IP-адресом и временем соединения с сервером (поле времени даты). Я пытаюсь рассчитать продолжительность между временем соединения для каждой записи в журнале. Я могу отфильтровать набор и форматировать все нужные мне данные, но я не знаю, как сравнивать значения между двумя разными строками для данного IP-адреса.Журнал сервера - расчет продолжительности с Spark

Вот как выглядит мой комплект.

IP Activity 
235.325.23.22, 2014-09-01 03:31 
235.325.23.22, 2014-09-01 03:39 
235.325.23.22, 2014-09-01 03:43 
235.325.23.22, 2014-09-01 03:46 
235.325.23.22, 2014-09-01 03:55 
235.423.25.44, 2014-09-01 17:21 
235.423.25.44, 2014-09-01 17:30 
235.423.25.44, 2014-09-01 17:34 
235.423.25.44, 2014-09-01 17:42 
235.423.25.44, 2014-09-01 17:51 

Я хотел бы получить следующий результат:

235.325.23.22, 2014-09-01 03:31am,0 base--start of the 235.325.23.22 IP set 
235.325.23.22, 2014-09-01 03:39am,8 minutes 
235.325.23.22, 2014-09-01 03:43am,4 minutes 
235.325.23.22, 2014-09-01 03:46am,3 minutes 
235.325.23.22, 2014-09-01 03:55am,9 minutes 
235.423.25.44, 2014-09-01 17:21pm,0 base-- start of the new 235.423.25.44 IP set 
235.423.25.44, 2014-09-01 17:30pm,9 minutes 
235.423.25.44, 2014-09-01 17:34pm,4 minutes 
235.423.25.44, 2014-09-01 17:42pm,8 minutes 
235.423.25.44, 2014-09-01 17:51pm,9 minutes 

Любая помощь очень ценится.

ответ

1

Это не полный ответ, но направление:

давайте называть свой первоначальный набор RDD, таким образом, что это рдд ключом на IP с даты и времени строкой в ​​качестве значения.

Используйте val sortedRDD = rdd.repartitionAndSortWithinPartitions (новый HashPartitioner (numPartitions)), чтобы создать отсортированный раздел на один ключ (выберите numPartitions намного больше, чем ваши ключи). Затем используйте mapPartitions на sortedRDD - вы получите отсортированный итератор, так что вам просто нужно отслеживать предыдущее значение и вычитать по мере того, как вы идете. В очень ленивую попытке что-то вроде

sortedRDD.mapPartitions(iter=>{ var prev=""; iter.map{i=>val t =(i._1,i._2,if (prev=="") 0 else prev);prev=i._2;t}}) 

(я не делал разницы и не разобрать даты, но, надеюсь, это поможет вам с идеей)