2016-03-30 3 views
7

У меня есть небольшой набор данных, который будет результатом работы Spark. Я думаю о преобразовании этого набора данных в кадр данных для удобства в конце задания, но он попытался правильно определить схему. Проблема заключается в последнем поле ниже (topValues); это ArrayBuffer кортежей - ключи и счетчики.Spark: Программно создавая схему DataFrame в scala

val innerSchema = 
    StructType(
     Array(
     StructField("value", StringType), 
     StructField("count", LongType) 
    ) 
    ) 
    val outputSchema = 
    StructType(
     Array(
     StructField("name", StringType, nullable=false), 
     StructField("index", IntegerType, nullable=false), 
     StructField("count", LongType, nullable=false), 
     StructField("empties", LongType, nullable=false), 
     StructField("nulls", LongType, nullable=false), 
     StructField("uniqueValues", LongType, nullable=false), 
     StructField("mean", DoubleType), 
     StructField("min", DoubleType), 
     StructField("max", DoubleType), 
     StructField("topValues", innerSchema) 
    ) 
    ) 

    val result = stats.columnStats.map{ c => 
    Row(c._2.name, c._1, c._2.count, c._2.empties, c._2.nulls, c._2.uniqueValues, c._2.mean, c._2.min, c._2.max, c._2.topValues.topN) 
    } 

    val rdd = sc.parallelize(result.toSeq) 

    val outputDf = sqlContext.createDataFrame(rdd, outputSchema) 

    outputDf.show() 

Ошибка я получаю это MatchError: scala.MatchError: ArrayBuffer((10,2), (20,3), (8,1)) (of class scala.collection.mutable.ArrayBuffer)

Когда я отладить и проверить свои объекты, я вижу это:

rdd: ParallelCollectionRDD[2] 
rdd.data: "ArrayBuffer" size = 2 
rdd.data(0): [age,2,6,0,0,3,14.666666666666666,8.0,20.0,ArrayBuffer((10,2), (20,3), (8,1))] 
rdd.data(1): [gender,3,6,0,0,2,0.0,0.0,0.0,ArrayBuffer((M,4), (F,2))] 

Мне кажется, что я ve точно описал ArrayBuffer кортежей в моей внутренней шиме, но Спарк не согласен.

Любая идея, как я должен определять схему?

+0

Было бы полезно, если вы предоставите пример данных или, по крайней мере, точный тип 'rdd'. – zero323

ответ

10
val rdd = sc.parallelize(Array(Row(ArrayBuffer(1,2,3,4)))) 
val df = sqlContext.createDataFrame(
    rdd, 
    StructType(Seq(StructField("arr", ArrayType(IntegerType, false), false) 
) 

df.printSchema 
root 
|-- arr: array (nullable = false) 
| |-- element: integer (containsNull = false) 

df.show 
+------------+ 
|   arr| 
+------------+ 
|[1, 2, 3, 4]| 
+------------+ 
+0

Да, ArrayType - правильный подход. Благодаря! Моя окончательная схема в моем ответе. – Stuart

4

Как указал Дэвид, мне нужно было использовать ArrayType. Spark доволен этим:

val outputSchema = 
    StructType(
     Array(
     StructField("name", StringType, nullable=false), 
     StructField("index", IntegerType, nullable=false), 
     StructField("count", LongType, nullable=false), 
     StructField("empties", LongType, nullable=false), 
     StructField("nulls", LongType, nullable=false), 
     StructField("uniqueValues", LongType, nullable=false), 
     StructField("mean", DoubleType), 
     StructField("min", DoubleType), 
     StructField("max", DoubleType), 
     StructField("topValues", ArrayType(StructType(Array(
      StructField("value", StringType), 
      StructField("count", LongType) 
     )))) 
    ) 
    ) 
Смежные вопросы