2016-11-30 4 views
6

Вопрос в значительной степени в заголовке: есть ли эффективный способ подсчета различных значений в каждом столбце в DataFrame?Spark DataFrame: подсчет различных значений каждого столбца

Метод describe предоставляет только счетчик, но не отдельный счетчик, и я задаюсь вопросом, есть ли способ получить отдельный счет для всех (или некоторых выбранных) столбцов.

+0

Ну, это зависит. Если у вас есть настоящий большой кластер, вы можете разбить свои данные. После этого вы можете создать цикл, который может подсчитывать каждый столбец. Эти подсчеты будут работать параллельно. Чтобы было ясно, если у вас есть кластер с 1000 рабочих, вы можете разбить свои данные на 200. Чем вы можете подсчитывать по пять столбцов каждый раз. Но проблема не в том, что тривиальна. –

ответ

12

Несколько агрегирование будет достаточно дорого, чтобы вычислить, таким образом, я бы посоветовал вам использовать приближение отчетливое количество:

val df = Seq((1,3,4),(1,2,3),(2,3,4),(2,3,5)).toDF("col1","col2","col3") 

val exprs = df.columns.map((_ -> "approx_count_distinct")).toMap 
df.agg(exprs).show() 
// +---------------------------+---------------------------+---------------------------+ 
// |approx_count_distinct(col1)|approx_count_distinct(col2)|approx_count_distinct(col3)| 
// +---------------------------+---------------------------+---------------------------+ 
// |       2|       2|       3| 
// +---------------------------+---------------------------+---------------------------+ 

Метод approx_count_distinct опирается на HyperLogLog под капотом.

HyperLogLog алгоритм и его вариант HyperLogLog ++ (реализованный в Спарк) опирается на следующее умный наблюдения.

Если числа распределены равномерно по диапазону, то количество отдельных элементов может быть аппроксимировано из наибольшего числа начальных нулей в двоичном представлении чисел.

Например, если мы наблюдаем число, цифры которого в двоичной форме имеют вид 0…(k times)…01…1, то мы можем оценить, что в наборе находятся порядка 2^k элементов. Это очень грубая оценка, но ее можно уточнить с помощью алгоритма эскиза.

Подробное объяснение механики этого алгоритма можно найти в original paper.

Примечание: Запуск Спарк 1.6, когда искра вызывает SELECT SOME_AGG(DISTINCT foo)), SOME_AGG(DISTINCT bar)) FROM df каждое предложение должно вызывать отдельную агрегацию для каждого пункта. В то время как это отличается от SELECT SOME_AGG(foo), SOME_AGG(bar) FROM df, где мы агрегируем один раз. Таким образом, производительность не будет сопоставимой при использовании count(distinct(_)) и approxCountDistinct (или approx_count_distinct).

Это один из изменения поведения, начиная с Спарком 1.6:

С улучшенной планировщик запросов для запросов, имеющих различные агрегированных (SPARK-9241), план запроса, имеющий один отчетливое агрегирования был изменен на более надежную версию. Чтобы вернуться к плану, созданному планировщиком Spark 1.5, установите для параметра spark.sql.specializeSingleDistinctAggPlanning значение true. (SPARK-12077)

Код: Approximate Algorithms in Apache Spark: HyperLogLog and Quantiles.

9

В pySpark вы могли бы сделать что-то вроде этого, используя countDistinct():

from pyspark.sql.functions import col, countDistinct 

df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns)) 

Аналогично в Scala:

import org.apache.spark.sql.functions.countDistinct 
import org.apache.spark.sql.functions.col 

df.select(df.columns.map(c => countDistinct(col(c)).alias(c)): _*) 

Если вы хотите ускорить процесс, при потенциальной потере точности, вы могли бы тоже используйте approxCountDistinct().

0

, если вы просто хотите сосчитать для столбчатой ​​колонки, то следующий может помочь. Хотя его поздний ответ. это может помочь кому-то. (pyspark 2.2.0)

from pyspark.sql.functions import col, countDistinct 
df.agg(countDistinct(col("colName")).alias("count")).show() 
Смежные вопросы