2016-02-22 9 views
0

У меня есть следующая схема:Спарка вложенной агрегация JSON

root 
|-- Id: string (nullable = true) 
|-- Desc: string (nullable = true) 
|-- Measurements: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- time: string (nullable = true) 
| | |-- metric: string (nullable = true) 
| | |-- value: string (nullable = true) 

В моих анализах я хотел бы сохранить вложенную структуру, как есть, но хотелось бы, чтобы добавить столбцы в DataFrame, которые содержат количество элементов в Measurements, значение min/max/avg для некоторых столбцов, в частности value для определенных значений metric, например 'temperature'.

В SQLContext я могу просто использовать sqlContext.sql("SELECT Id, SIZE(Measurements) AS num_entries FROM df", чтобы получить размер, но мне было интересно, есть ли элегантный способ (в Scala) делать то, что я пытаюсь сделать, то есть без создания новых DataFrames, которые должны быть присоединился обратно на основе Id?

ответ

2

Здесь нет универсального подхода. Простые метрики, такие как количество элементов в array, могут быть легко извлечены с использованием встроенных функций (size).

case class Measurement(temperature: Double, speed: Double) 

val df = sc.parallelize(Seq(
    (1L, Array(Measurement(0.5, 10.0), Measurement(6.2, 3.7))), 
    (2L, Array(Measurement(22.0, 5.0))) 
)).toDF("id", "measurements") 

df.select($"*", size($"measurements")).show 

// +---+--------------------+------------------+ 
// | id|  measurements|size(measurements)| 
// +---+--------------------+------------------+ 
// | 1|[[0.5,10.0], [6.2...|     2| 
// | 2|  [[22.0,5.0]]|     1| 
// +---+--------------------+------------------+ 

Более сложные вещи требуют либо exploding:

val expanded = df.withColumn("measurement",explode($"measurements")) 
val withStats = expanded 
.groupBy($"id") 
.agg(
    avg($"measurement.temperature").alias("avg_temp"), 
    avg($"measurement.speed").alias("avg_speed"), 
    first($"measurements")) // This assumes a single row per ID! 

withStats.show 
// +---+--------+---------+---------------------+ 
// | id|avg_temp|avg_speed|first(measurements)()| 
// +---+--------+---------+---------------------+ 
// | 1| 3.35|  6.85| [[0.5,10.0], [6.2...| 
// | 2| 22.0|  5.0|   [[22.0,5.0]]| 
// +---+--------+---------+---------------------+ 

или пользовательских функций (то, что вы хотите, чтобы избежать в PySpark):

def my_mean(c: String) = udf((xs: Seq[Row]) => 
    Try(xs.map(_.getAs[Double](c)).sum/xs.size).toOption 
) 

val withAvgTemp = df.withColumn(
    "avg_temperature", my_mean("temperature")($"measurements")) 

withAvgTemp.show 
// +---+--------------------+---------------+ 
// | id|  measurements|avg_temperature| 
// +---+--------------------+---------------+ 
// | 1|[[0.5,10.0], [6.2...|   3.35| 
// | 2|  [[22.0,5.0]]|   22.0| 
// +---+--------------------+---------------+ 

Вы также можете попробовать Спарк DataSets, но они до сих пор далеко не стабильный.

В целом вложенные структуры полезны главным образом для импорта (и, при необходимости, экспорта), в противном случае это объекты второго класса.

Примечание(Спарк < 1,5):

Если вы используете старую версию Спарк вы можете использовать некоторые из указанных выше с selectExpr (это потребует HiveContext):

df.selectExpr("id", "size(measurements) AS n") 
df.selectExpr("id", "explode(measurements) AS measurement") 
+0

Спасибо много! Моя проблема, похоже, ограничена версией: я застрял с 1.4.1, и функция размера доступна с 1.5.0 ... Bummer! – Ian

+0

Всегда есть 'selectExpr' :) – zero323

Смежные вопросы