1

Я создал пользовательский Aggregator[] для строк.Примените собственный счетчик спайков на нескольких столбцах (Spark 2.0)

Я хотел бы применить его ко всем столбцам DataFrame, где все столбцы являются строками, но номер столбца произвольный.

Я застрял в написании правильного выражения. Я хотел бы написать примерно так:

df.agg(df.columns.map(c => myagg(df(c))) : _*) 

, что явно неверно, учитывая различные интерфейсы.

Я посмотрел код RelationalGroupedDataset.agg(expr: Column, exprs: Column*), но я не знаком с манипуляциями с выражениями.

Любая идея?

+3

Просьба указать код агрегатора. и объясните, что вы пытаетесь сделать. –

+0

@AssafMendelson, на самом деле мы планируем иметь множество настраиваемых агрегаторов для разных характеристик по различным типам данных. Я начинаю с малого с агрегатором, чтобы получить самые короткие и длинные строки: class ShortestLongestAggregator() расширяет Aggregator [String, (String, String), (String, String)]. На данный момент я хочу иметь все (самые короткие, длинные) пары для всех столбцов произвольного фрейма данных (учитывая, что он имеет только строковые столбцы). – mathieu

ответ

5

В отличие от UserDefinedAggregateFunctions, которые работают с отдельными полями (столбцами), Aggregtors ожидает полного/значение.

Если вы хотите, и Aggregator, который может использоваться как в вашем фрагменте, он должен быть параметризован именем столбца и использовать как тип значения.

import org.apache.spark.sql.expressions.Aggregator 
import org.apache.spark.sql.{Encoder, Encoders, Row} 

case class Max(col: String) 
    extends Aggregator[Row, Int, Int] with Serializable { 

    def zero = Int.MinValue 
    def reduce(acc: Int, x: Row) = 
    Math.max(acc, Option(x.getAs[Int](col)).getOrElse(zero)) 

    def merge(acc1: Int, acc2: Int) = Math.max(acc1, acc2) 
    def finish(acc: Int) = acc 

    def bufferEncoder: Encoder[Int] = Encoders.scalaInt 
    def outputEncoder: Encoder[Int] = Encoders.scalaInt 
} 

Пример использования:

val df = Seq((1, None, 3), (4, Some(5), -6)).toDF("x", "y", "z") 

@transient val exprs = df.columns.map(c => Max(c).toColumn.alias(s"max($c)")) 

df.agg(exprs.head, exprs.tail: _*) 
+------+------+------+ 
|max(x)|max(y)|max(z)| 
+------+------+------+ 
|  4|  5|  3| 
+------+------+------+ 

Aggregators Возможно сделать гораздо больше смысла, когда в сочетании с статически типизированных Datasets, чем Dataset<Row>.

В зависимости от ваших требований вы также можете объединить несколько столбцов за один проход с помощью аккумулятора Seq[_] и обработать целое (запись) в одном вызове merge.

Смежные вопросы