В отличие от UserDefinedAggregateFunctions
, которые работают с отдельными полями (столбцами), Aggregtors
ожидает полного/значение.
Если вы хотите, и Aggregator
, который может использоваться как в вашем фрагменте, он должен быть параметризован именем столбца и использовать как тип значения.
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, Row}
case class Max(col: String)
extends Aggregator[Row, Int, Int] with Serializable {
def zero = Int.MinValue
def reduce(acc: Int, x: Row) =
Math.max(acc, Option(x.getAs[Int](col)).getOrElse(zero))
def merge(acc1: Int, acc2: Int) = Math.max(acc1, acc2)
def finish(acc: Int) = acc
def bufferEncoder: Encoder[Int] = Encoders.scalaInt
def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
Пример использования:
val df = Seq((1, None, 3), (4, Some(5), -6)).toDF("x", "y", "z")
@transient val exprs = df.columns.map(c => Max(c).toColumn.alias(s"max($c)"))
df.agg(exprs.head, exprs.tail: _*)
+------+------+------+
|max(x)|max(y)|max(z)|
+------+------+------+
| 4| 5| 3|
+------+------+------+
Aggregators
Возможно сделать гораздо больше смысла, когда в сочетании с статически типизированных Datasets
, чем Dataset<Row>
.
В зависимости от ваших требований вы также можете объединить несколько столбцов за один проход с помощью аккумулятора Seq[_]
и обработать целое (запись) в одном вызове merge
.
Просьба указать код агрегатора. и объясните, что вы пытаетесь сделать. –
@AssafMendelson, на самом деле мы планируем иметь множество настраиваемых агрегаторов для разных характеристик по различным типам данных. Я начинаю с малого с агрегатором, чтобы получить самые короткие и длинные строки: class ShortestLongestAggregator() расширяет Aggregator [String, (String, String), (String, String)]. На данный момент я хочу иметь все (самые короткие, длинные) пары для всех столбцов произвольного фрейма данных (учитывая, что он имеет только строковые столбцы). – mathieu