2016-10-31 2 views
4

Я новичок, чтобы искриться scala, и прошу прощения за глупый вопрос (если есть). Я застрял в проблеме, которую я упростил, как показано ниже:Spark scala - как сделать счет() путем кондиционирования на две строки

Существует кадр данных с тремя столбцами, «машинный идентификатор» - это идентификатор машины. «startTime» - это отметка времени начала задачи. «endTime» - это отметка времени окончания задачи.

Моя цель - подсчитать количество интервалов простоя каждой машины.
Например,
в таблице ниже, первая и вторая строки показывают машину №1, начатую в момент времени 0 и заканчивающуюся в момент времени 3, и начинаются снова в момент времени 4, поэтому временной интервал [3, 4] простаивает. Для 3-го и 4-го рядов машина №1 запускается в момент времени 10 и заканчивается в 20-й момент времени и запускается снова сразу, поэтому нет времени простоя.

machineID, startTime, endTime 
1, 0, 3 
1, 4, 8 
1, 10, 20 
1, 20, 31 
... 
1, 412, 578 
... 
2, 231, 311 
2, 781, 790 
... 

Рамка данных уже была groupBy ("machineID").
Я использую искру 2.0.1 и scala 2.11.8

ответ

4

Для доступа к предыдущим/следующим строкам в DataFrame мы можем использовать функции Window. В этом случае мы будем использовать lag для доступа к предыдущему времени окончания, сгруппированному по machineId.

import org.apache.spark.sql.expressions.Window 

// Dataframe Schema 
case class MachineData(id:String, start:Int, end:Int) 
// Sample Data 
machineDF.show 
+---+-----+---+ 
| id|start|end| 
+---+-----+---+ 
| 1| 0| 3| 
| 1| 4| 8| 
| 1| 10| 20| 
| 1| 20| 31| 
| 1| 412|578| 
| 2| 231|311| 
| 2| 781|790| 
+---+-----+---+ 


// define the window as a partition over machineId, ordered by start (time) 
val byMachine = Window.partitionBy($"id").orderBy($"start") 
// we define a new column, "previous end" using the Lag Window function over the previously defined window 
val prevEnd = lag($"end", 1).over(byMachine) 

// new DF with the prevEnd column 
val withPrevEnd = machineDF.withColumn("prevEnd", prevEnd) 
withPrevEnd.show 

+---+-----+---+-------+ 
| id|start|end|prevEnd| 
+---+-----+---+-------+ 
| 1| 0| 3| null| 
| 1| 4| 8|  3| 
| 1| 10| 20|  8| 
| 1| 20| 31|  20| 
| 1| 412|578|  31| 
| 2| 231|311| null| 
| 2| 781|790| 311| 
+---+-----+---+-------+ 

// we're calculating the idle intervals as the numerical diff as an example 
val idleIntervals = withPrevEnd.withColumn("diff", $"start"-$"prevEnd") 
idleIntervals.show 

+---+-----+---+-------+----+ 
| id|start|end|prevEnd|diff| 
+---+-----+---+-------+----+ 
| 1| 0| 3| null|null| 
| 1| 4| 8|  3| 1| 
| 1| 10| 20|  8| 2| 
| 1| 20| 31|  20| 0| 
| 1| 412|578|  31| 381| 
| 2| 231|311| null|null| 
| 2| 781|790| 311| 470| 
+---+-----+---+-------+----+ 

// to calculate the total, we are summing over the differences. Adapt this as your business logic requires. 
val totalIdleIntervals = idleIntervals.select($"id",$"diff").groupBy($"id").agg(sum("diff")) 

+---+---------+ 
| id|sum(diff)| 
+---+---------+ 
| 1|  384| 
| 2|  470| 
+---+---------+ 
+1

Я узнал что-то новое сегодня, функция окна .. +1 – Shankar

+0

Уточненный лаг() сегодня. Объяснение очень понятно и полезно. Спасибо, маас! –

Смежные вопросы