2015-10-14 2 views
4

ДРР был создан в формате Array[Array[String]] и имеет следующие значения:Преобразование РДД в Dataframe в Спарк/Scala

Array[Array[String]] = Array(Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580056797, 0, 2015-07-29 10:38:42, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1), Array(4580057445, 0, 2015-07-29 10:40:37, 0, 1, 1)) 

Я хочу создать dataFrame со схемой:

val schemaString = "callId oCallId callTime duration calltype swId" 

следующие шаги:

scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim)) 
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39 
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema) 

дает следующее сообщение об ошибке:

console:45: error: overloaded method value createDataFrame with alternatives: (rdd: org.apache.spark.api.java.JavaRDD[],beanClass: Class[])org.apache.spark.sql.DataFrame (rdd: org.apache.spark.rdd.RDD[],beanClass: Class[])org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame cannot be applied to (org.apache.spark.rdd.RDD[Array[String]],
org.apache.spark.sql.types.StructType) val calDF = sqlContext.createDataFrame(rowRDD, schema)

ответ

7

Просто вставьте в spark-shell:

val a = 
    Array(
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1")) 

val rdd = sc.makeRDD(a) 

case class X(callId: String, oCallId: String, 
    callTime: String, duration: String, calltype: String, swId: String) 

Тогда map() над РДУ для создания экземпляров класса дела, а затем создать DataFrame с помощью toDF() :

scala> val df = rdd.map { 
    case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF() 
df: org.apache.spark.sql.DataFrame = 
    [callId: string, oCallId: string, callTime: string, 
    duration: string, calltype: string, swId: string] 

Это выводит схему из класса корпуса.

Затем вы можете продолжить:

scala> df.printSchema() 
root 
|-- callId: string (nullable = true) 
|-- oCallId: string (nullable = true) 
|-- callTime: string (nullable = true) 
|-- duration: string (nullable = true) 
|-- calltype: string (nullable = true) 
|-- swId: string (nullable = true) 

scala> df.show() 
+----------+-------+-------------------+--------+--------+----+ 
| callId|oCallId|   callTime|duration|calltype|swId| 
+----------+-------+-------------------+--------+--------+----+ 
|4580056797|  0|2015-07-29 10:38:42|  0|  1| 1| 
|4580056797|  0|2015-07-29 10:38:42|  0|  1| 1| 
+----------+-------+-------------------+--------+--------+----+ 

Если вы хотите использовать toDF() в обычной программе (не в spark-shell), убедитесь что (цитата из here):

  • Для import sqlContext.implicits._ сразу после создания SQLContext
  • Определить класс корпуса вне метода, используя toDF()
+0

Удивительный ответ, я получил все необходимое от этого. большое спасибо – sparkDabbler

1

Я предполагаю, что ваш schema есть, как и в Spark Guide, следующим образом:

val schema = 
    StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) 

Если посмотреть на подпись createDataFrame, вот один, который принимает StructType в качестве 2-го аргумента (для Scala)

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Creates a DataFrame from an RDD containing Rows using the given schema.

так она принимает в качестве первого аргумента RDD[Row]. Что у вас в rowRDD есть RDD[Array[String]], так что есть несоответствие.

Нужна ли вам RDD[Array[String]]?

В противном случае вы можете использовать следующую команду, чтобы создать dataframe:

val rowRDD = rdd.map(p => Row(p(0), p(1), p(2),p(3),p(4),p(5).trim)) 
+0

Спасибо @ccheneson, это также сработает для моей проблемы – sparkDabbler

4

Вы должны сначала преобразовать вас Array в , а затем определить схему. Я сделал предположение о том, что большинство ваших полей Long

val rdd: RDD[Array[String]] = ??? 
    val rows: RDD[Row] = rdd map { 
     case Array(callId, oCallId, callTime, duration, swId) => 
     Row(callId.toLong, oCallId.toLong, callTime, duration.toLong, swId.toLong) 
    } 

    object schema { 
     val callId = StructField("callId", LongType) 
     val oCallId = StructField("oCallId", StringType) 
     val callTime = StructField("callTime", StringType) 
     val duration = StructField("duration", LongType) 
     val swId = StructField("swId", LongType) 

     val struct = StructType(Array(callId, oCallId, callTime, duration, swId)) 
    } 

    sqlContext.createDataFrame(rows, schema.struct) 
+0

Спасибо @Eugene, это решение также работает для меня, и довольно элегантно – sparkDabbler