2015-08-19 2 views
0

Мои данные хранятся в рамке Спарк данных в виде (примерно)Как агрегировать данные с использованием вычисленных групп

Col1 Col2 

A1 -5 
B1 -20 
C1 7 
A2 3 
B2 -4 
C2 17 

Я хочу, чтобы превратить это в:

Col3 Col4 

A 2 
B -24 
C 24 

(Добавление номера для A и конкатенации X1 и X1 в X)

Как это сделать с использованием API фрейма данных?

редактировать:

В col1 значения фактически произвольные строки (конечные точки), которые я хочу, чтобы объединить в одну колонку (промежуток), может быть в форме «А1-А2». Я планирую отображать конечные точки на другие конечные точки на карте и запрашивать это в моем UDF. Может ли мой UDF вернуться Нет? - скажем, я не хотел включать A в col3, но я хотел включить B и C. Могу ли я добавить еще один случай к вашему примеру, чтобы строки A были пропущены при сопоставлении col1 с col3?

ответ

2

Вы можете просто извлечь столбец группы и использовать его как группу для агрегирования. Если предположить, что данные следует шаблон в примере:

с сырым SQL:

case class Record(Col1: String, Col2: Int) 

val df = sqlContext.createDataFrame(Seq(
    Record("A1", -5), 
    Record("B1", -20), 
    Record("C1", 7), 
    Record("A2", 3), 
    Record("B2", -4), 
    Record("C2", 17))) 

df.registerTempTable("df") 

sqlContext.sql(
    """SELECT col3, sum(col2) AS col4 FROM (
     SELECT col2, SUBSTR(Col1, 1, 1) AS col3 FROM df 
    ) tmp GROUP BY col3""").show 

+----+----+ 
|col3|col4| 
+----+----+ 
| A| -2| 
| B| -24| 
| C| 24| 
+----+----+ 

С Scala API:

import org.apache.spark.sql.functions.{udf, sum} 

val getGroup = udf((s: String) => s.substring(0, 1)) 

df 
    .select(getGroup($"col1").alias("col3"), $"col2") 
    .groupBy($"col3") 
    .agg(sum($"col2").alias("col4")) 

+----+----+ 
|col3|col4| 
+----+----+ 
| A| -2| 
| B| -24| 
| C| 24| 
+----+----+ 

Если группа шаблон является более сложным, вы можете просто настроить SUBSTR или getGroup функцию , Например:

val getGroup = { 
    val pattern = "^[A-Z]+".r 
    udf((s: String) => pattern.findFirstIn(s) match { 
     case Some(g) => g 
     case None => "Unknown" 
    }) 
} 

Edit:

Если вы хотите, чтобы игнорировать некоторые группы просто добавить WHERE пункт. С сырой SQL это простой, но с Scala API требует некоторых усилий:

import org.apache.spark.sql.functions.{not, lit} 

df 
    .select(...) // As before 
    .where(not($"col3".in(lit("A")))) 
    .groupBy(...).agg(...) // As before 

Если вы хотите сбросить несколько столбцов, которые можно использовать переменные аргументы:

val toDiscard = List("A", "B").map(lit(_)) 

df 
    .select(...) 
    .where(not($"col3".in(toDiscard: _*))) 
    .groupBy(...).agg(...) // As before 

Может ли мой обратный UDF None?

Он не может, но он может вернуться null:

val getGroup2 = udf((s: String) => s.substring(0, 1) match { 
    case x if x != "A" => x 
    case _ => null: String 
}) 

df 
    .select(getGroup2($"col1").alias("col3"), $"col2") 
    .where($"col3".isNotNull) 
    .groupBy(...).agg(...) // As before 

+----+----+ 
|col3|col4| 
+----+----+ 
| B| -24| 
| C| 24| 
+----+----+ 
+0

Великий ответ - и я многому научился от него, но я должен был бы сделать мой вопрос более конкретно. Я отредактировал вопрос с более подробной информацией и еще одним небольшим вопросом, если вы хотите иметь другой взгляд. –

+0

Я потянул данные из паркета fileso. Я не определяю классы Case для заполнения моего DataFrame. Как я могу получить доступ к значениям Col1 в этом случае? , например. 'df.select (getGroup ($" col1 ")' - Что должно быть передано getGroup в этом случае? –

+0

Не имеет значения, что является источником.Вы можете обращаться к столбцам точно так же: 'col (" some_column ")'/'$" some_column "'/'some_df (" some_column ")' – zero323

Смежные вопросы