Здесь нет универсального подхода. Простые метрики, такие как количество элементов в array
, могут быть легко извлечены с использованием встроенных функций (size
).
case class Measurement(temperature: Double, speed: Double)
val df = sc.parallelize(Seq(
(1L, Array(Measurement(0.5, 10.0), Measurement(6.2, 3.7))),
(2L, Array(Measurement(22.0, 5.0)))
)).toDF("id", "measurements")
df.select($"*", size($"measurements")).show
// +---+--------------------+------------------+
// | id| measurements|size(measurements)|
// +---+--------------------+------------------+
// | 1|[[0.5,10.0], [6.2...| 2|
// | 2| [[22.0,5.0]]| 1|
// +---+--------------------+------------------+
Более сложные вещи требуют либо exploding
:
val expanded = df.withColumn("measurement",explode($"measurements"))
val withStats = expanded
.groupBy($"id")
.agg(
avg($"measurement.temperature").alias("avg_temp"),
avg($"measurement.speed").alias("avg_speed"),
first($"measurements")) // This assumes a single row per ID!
withStats.show
// +---+--------+---------+---------------------+
// | id|avg_temp|avg_speed|first(measurements)()|
// +---+--------+---------+---------------------+
// | 1| 3.35| 6.85| [[0.5,10.0], [6.2...|
// | 2| 22.0| 5.0| [[22.0,5.0]]|
// +---+--------+---------+---------------------+
или пользовательских функций (то, что вы хотите, чтобы избежать в PySpark):
def my_mean(c: String) = udf((xs: Seq[Row]) =>
Try(xs.map(_.getAs[Double](c)).sum/xs.size).toOption
)
val withAvgTemp = df.withColumn(
"avg_temperature", my_mean("temperature")($"measurements"))
withAvgTemp.show
// +---+--------------------+---------------+
// | id| measurements|avg_temperature|
// +---+--------------------+---------------+
// | 1|[[0.5,10.0], [6.2...| 3.35|
// | 2| [[22.0,5.0]]| 22.0|
// +---+--------------------+---------------+
Вы также можете попробовать Спарк DataSets
, но они до сих пор далеко не стабильный.
В целом вложенные структуры полезны главным образом для импорта (и, при необходимости, экспорта), в противном случае это объекты второго класса.
Примечание(Спарк < 1,5):
Если вы используете старую версию Спарк вы можете использовать некоторые из указанных выше с selectExpr
(это потребует HiveContext
):
df.selectExpr("id", "size(measurements) AS n")
df.selectExpr("id", "explode(measurements) AS measurement")
Спасибо много! Моя проблема, похоже, ограничена версией: я застрял с 1.4.1, и функция размера доступна с 1.5.0 ... Bummer! – Ian
Всегда есть 'selectExpr' :) – zero323