2015-10-21 1 views
7

Я пытаюсь вычислить гистограмму всех столбцов из файла CSV, используя Spark Scala.Как получить гистограмму всех столбцов в большом CSV/RDD [Массив [double]] с помощью Apache Spark Scala?

Я нашел, что DoubleRDDFunctions поддерживает гистограмму. Итак, для кодирования всех столбцов я закодирован следующим образом.

  1. Get Колонку подсчитывать
  2. Создать RDD[double] каждого столбца и вычислить гистограмму каждого RDD с помощью DoubleRDDFunctions

    var columnIndexArray = Array.tabulate(rdd.first().length) (_ * 1) 
    
    val histogramData = columnIndexArray.map(columns => { 
        rdd.map(lines => lines(columns)).histogram(6) 
    }) 
    

Это хороший способ? Может ли кто-нибудь предложить несколько лучших способов решить эту проблему?

Заранее спасибо.

ответ

5

Не совсем лучше, но альтернативный путь заключается в преобразовании RDD в DataFrame и использовании histogram_numeric UDF.

Пример данных:

import scala.util.Random 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.{callUDF, lit, col} 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.hive.HiveContext 

val sqlContext = new HiveContext(sc) 

Random.setSeed(1) 

val ncol = 5 

val rdd = sc.parallelize((1 to 1000).map(
    _ => Row.fromSeq(Array.fill(ncol)(Random.nextDouble)) 
)) 

val schema = StructType(
    (1 to ncol).map(i => StructField(s"x$i", DoubleType, false))) 

val df = sqlContext.createDataFrame(rdd, schema) 
df.registerTempTable("df") 

Запрос:

val nBuckets = 3 
val columns = df.columns.map(
    c => callUDF("histogram_numeric", col(c), lit(nBuckets)).alias(c)) 
val histograms = df.select(columns: _*) 

histograms.printSchema 

// root 
// |-- x1: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x2: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x3: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x4: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x5: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 

histograms.select($"x1").collect() 

// Array([WrappedArray([0.16874313309969038,334.0], 
// [0.513382068667877,345.0], [0.8421388886903808,321.0])]) 
+1

Его выдача org.apache.spark.sql.AnalysisException: неопределенная функция histogram_numeric. Я использую искру 1.5.1 –

+0

UDF требуют HiveContext. – zero323

+0

спасибо ... Я редактировал имя переменной в вашем ответе. –

1

(Scala апи) преобразование, countByValue должны делать то, что вы хотите

так, например, для создания гистограммы данных для первого столбца в вашем РДУ:

val histCol1 = RDD.map(record => record.col_1).countByValue() 

в выражение выше, запись относится только к строке данных в RDD, экземпляр класса case, который имеет поле col_1

и так histCol1 возвращает хэш-таблицу (Scala Map), в которой ключи являются уникальными значениями в столбце 1 (col_1) и значение, очевидно, частоты каждого уникального значения

+0

Спасибо за предложение. Но мне нужно также указать размер ведра. Максимальные ведра 10. countByValue() будет работать эффективнее, чем двойная гистограмма RDD? –

+0

«размер ведра» на самом деле возвращен countByValue - каждое значение представляет собой размер ведра, а ключ - это имя ковша – doug

+0

. Можно ли исправить размер ведра до одного значения? вместо того, чтобы рассматривать отдельный счет. Мне не нужно все разное количество, мне нужна гистограмма с максимальными ведрами. 10. –