Вы можете просто извлечь столбец группы и использовать его как группу для агрегирования. Если предположить, что данные следует шаблон в примере:
с сырым SQL:
case class Record(Col1: String, Col2: Int)
val df = sqlContext.createDataFrame(Seq(
Record("A1", -5),
Record("B1", -20),
Record("C1", 7),
Record("A2", 3),
Record("B2", -4),
Record("C2", 17)))
df.registerTempTable("df")
sqlContext.sql(
"""SELECT col3, sum(col2) AS col4 FROM (
SELECT col2, SUBSTR(Col1, 1, 1) AS col3 FROM df
) tmp GROUP BY col3""").show
+----+----+
|col3|col4|
+----+----+
| A| -2|
| B| -24|
| C| 24|
+----+----+
С Scala API:
import org.apache.spark.sql.functions.{udf, sum}
val getGroup = udf((s: String) => s.substring(0, 1))
df
.select(getGroup($"col1").alias("col3"), $"col2")
.groupBy($"col3")
.agg(sum($"col2").alias("col4"))
+----+----+
|col3|col4|
+----+----+
| A| -2|
| B| -24|
| C| 24|
+----+----+
Если группа шаблон является более сложным, вы можете просто настроить SUBSTR
или getGroup
функцию , Например:
val getGroup = {
val pattern = "^[A-Z]+".r
udf((s: String) => pattern.findFirstIn(s) match {
case Some(g) => g
case None => "Unknown"
})
}
Edit:
Если вы хотите, чтобы игнорировать некоторые группы просто добавить WHERE
пункт. С сырой SQL это простой, но с Scala API требует некоторых усилий:
import org.apache.spark.sql.functions.{not, lit}
df
.select(...) // As before
.where(not($"col3".in(lit("A"))))
.groupBy(...).agg(...) // As before
Если вы хотите сбросить несколько столбцов, которые можно использовать переменные аргументы:
val toDiscard = List("A", "B").map(lit(_))
df
.select(...)
.where(not($"col3".in(toDiscard: _*)))
.groupBy(...).agg(...) // As before
Может ли мой обратный UDF None?
Он не может, но он может вернуться null
:
val getGroup2 = udf((s: String) => s.substring(0, 1) match {
case x if x != "A" => x
case _ => null: String
})
df
.select(getGroup2($"col1").alias("col3"), $"col2")
.where($"col3".isNotNull)
.groupBy(...).agg(...) // As before
+----+----+
|col3|col4|
+----+----+
| B| -24|
| C| 24|
+----+----+
Великий ответ - и я многому научился от него, но я должен был бы сделать мой вопрос более конкретно. Я отредактировал вопрос с более подробной информацией и еще одним небольшим вопросом, если вы хотите иметь другой взгляд. –
Я потянул данные из паркета fileso. Я не определяю классы Case для заполнения моего DataFrame. Как я могу получить доступ к значениям Col1 в этом случае? , например. 'df.select (getGroup ($" col1 ")' - Что должно быть передано getGroup в этом случае? –
Не имеет значения, что является источником.Вы можете обращаться к столбцам точно так же: 'col (" some_column ")'/'$" some_column "'/'some_df (" some_column ")' – zero323