2016-06-04 2 views
10

Мой набор данных выглядит следующим образом:Как группа по временному интервалу в Спарк SQL

KEY |Event_Type | metric | Time 
001 |event1  | 10  | 2016-05-01 10:50:51 
002 |event2  | 100 | 2016-05-01 10:50:53 
001 |event3  | 20  | 2016-05-01 10:50:55 
001 |event1  | 15  | 2016-05-01 10:51:50 
003 |event1  | 13  | 2016-05-01 10:55:30 
001 |event2  | 12  | 2016-05-01 10:57:00 
001 |event3  | 11  | 2016-05-01 11:00:01 

Я хочу, чтобы получить все, когда ключи, проверить это:

«SUM метрики для конкретного события ">порог в течение 5 минут.

Это представляется мне идеальным кандидатом на использование функций раздвижных окон .

Как это сделать с помощью Spark SQL?

спасибо.

ответ

13

Спарк> = 2,0

Вы ча использовать window (не быть ошибка с функциями окна). В зависимости от варианта он присваивает метку времени, чтобы еще один, потенциально перекрывающиеся ведра:

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric") 

// +---+---------------------------------------------+-----------+ 
// |KEY|window          |sum(metric)| 
// +---+---------------------------------------------+-----------+ 
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45   | 
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12   | 
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13   | 
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11   | 
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100  | 
// +---+---------------------------------------------+-----------+ 

Спарк < 2,0

Начнем с примера данных:

import spark.implicits._ // import sqlContext.implicits._ in Spark < 2.0 

val df = Seq(
    ("001", "event1", 10, "2016-05-01 10:50:51"), 
    ("002", "event2", 100, "2016-05-01 10:50:53"), 
    ("001", "event3", 20, "2016-05-01 10:50:55"), 
    ("001", "event1", 15, "2016-05-01 10:51:50"), 
    ("003", "event1", 13, "2016-05-01 10:55:30"), 
    ("001", "event2", 12, "2016-05-01 10:57:00"), 
    ("001", "event3", 11, "2016-05-01 11:00:01") 
).toDF("KEY", "Event_Type", "metric", "Time") 

Я полагаю, что событие идентифицированный KEY. Если это не так, вы можете отрегулировать GROUP BY/PARTITION BY в соответствии с вашими требованиями.

Если вы заинтересованы в агрегации со статическим оконными независимо от данных преобразования временного метки и круглых цифры

import org.apache.spark.sql.functions.{round, sum} 

// cast string to timestamp 
val ts = $"Time".cast("timestamp").cast("long") 

// Round to 300 seconds interval 
val interval = (round(ts/300L) * 300.0).cast("timestamp").alias("interval") 

df.groupBy($"KEY", interval).sum("metric") 

// +---+---------------------+-----------+ 
// |KEY|interval    |sum(metric)| 
// +---+---------------------+-----------+ 
// |001|2016-05-01 11:00:00.0|11   | 
// |001|2016-05-01 10:55:00.0|12   | 
// |001|2016-05-01 10:50:00.0|45   | 
// |003|2016-05-01 10:55:00.0|13   | 
// |002|2016-05-01 10:50:00.0|100  | 
// +---+---------------------+-----------+ 

Если вы заинтересованы в окнах относительно текущей строки использования функций окна:

import org.apache.spark.sql.expressions.Window 

// Partition by KEY 
// Order by timestamp 
// Consider window of -150 seconds to + 150 seconds relative to the current row 
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150) 
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w)) 

// +---+----------+------+-------------------+----------+----------+ 
// |KEY|Event_Type|metric|Time    |ts  |window_sum| 
// +---+----------+------+-------------------+----------+----------+ 
// |003|event1 |13 |2016-05-01 10:55:30|1462092930|13  | 
// |001|event1 |10 |2016-05-01 10:50:51|1462092651|45  | 
// |001|event3 |20 |2016-05-01 10:50:55|1462092655|45  | 
// |001|event1 |15 |2016-05-01 10:51:50|1462092710|45  | 
// |001|event2 |12 |2016-05-01 10:57:00|1462093020|12  | 
// |001|event3 |11 |2016-05-01 11:00:01|1462093201|11  | 
// |002|event2 |100 |2016-05-01 10:50:53|1462092653|100  | 
// +---+----------+------+-------------------+----------+----------+ 

По соображениям производительности этот подход полезен, только если данные могут разбиваться на несколько отдельных групп. В Spark < 2.0.0 вам также понадобится HiveContext, чтобы он работал.

+0

Привет, я использую Java Как выполнять те же операции в java и spark 2.1.0 – sathiyarajan

+0

@sathiyarajan Должно быть почти то же самое, исключая незначительные различия в синтаксисе. – zero323

0

Для статической границы вы можете сделать следующее:

1) Transform (карта, mapPartitions и т.д.) значение времени для формирования YYYY-MM-DD-чч-мм, где мм свернут на уровне 5 минут. например 01, 02, 03, 05 становится 05; 16,17,18,19,20 становится 20

2) Выполните GroupBy или ReduceBy с event_type и временем и выполнить свою агрегацию (сумма) на метриках

3) Выполните преобразование фильтра для фильтрации метрики> 5

Вы можете написать выше в spark rdd или dataframe (sql) почти так же.

Для других типов границ, где 00-05, 01-06, 02-07, вы должны попытаться взглянуть в концепцию скользящее окно. Если использование проглатывание данных случай подходит потоковый шаблон затем Спарк Streaming API будет совершенно иначе вы можете найти собственное решение, как это: Apache Spark - Dealing with Sliding Windows on Temporal RDDs

Смежные вопросы