2015-08-05 6 views
3

Я новичок с искры Apache, а также с языком программирования Scala.Сохраните данные MongoDB в формате паркета с помощью Apache Spark

То, что я пытаюсь добиться, чтобы извлечь данные из моей локальной базы данных MongoDB для затем сохранить его в parquet format с помощью Apache Спарк с Hadoop-разъем

Это мой код до сих пор:

package com.examples 
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.rdd.RDD 
import org.apache.hadoop.conf.Configuration 
import org.bson.BSONObject 
import com.mongodb.hadoop.{MongoInputFormat, BSONFileInputFormat} 
import org.apache.spark.sql 
import org.apache.spark.sql.SQLContext 

object DataMigrator { 

    def main(args: Array[String]) 
    { 
     val conf = new SparkConf().setAppName("Migration App").setMaster("local") 
     val sc = new SparkContext(conf) 
     val sqlContext = new SQLContext(sc) 

     // Import statement to implicitly convert an RDD to a DataFrame 
     import sqlContext.implicits._ 

     val mongoConfig = new Configuration() 
     mongoConfig.set("mongo.input.uri", "mongodb://localhost:27017/mongosails4.case") 

     val mongoRDD = sc.newAPIHadoopRDD(mongoConfig, classOf[MongoInputFormat], classOf[Object], classOf[BSONObject]);  

     val count = countsRDD.count() 

     // the count value is aprox 100,000 
     println("================ PRINTING =====================") 
     println(s"ROW COUNT IS $count") 
     println("================ PRINTING =====================") 
    } 
} 

Дело в том, что для сохранения данных в формат файла паркета сначала необходимо преобразовать переменную mongoRDD в Spark DataFrame. Я пытался что-то вроде этого:

// convert RDD to DataFrame 
val myDf = mongoRDD.toDF() // this lines throws an error 
myDF.write.save("my/path/myData.parquet") 

и ошибка, я получаю это: Exception in thread "main" scala.MatchError: java.lang.Object (of class scala.reflect.internal.Types.$TypeRef$$anon$6)

вы, ребята, есть какие-либо другие идеи, как я мог бы преобразовать RDD в DataFrame, так что я могу спасти данные в паркетном формате?

Вот структура одного документа в коллекции MongoDB: https://gist.github.com/kingtrocko/83a94238304c2d654fe4

ответ

1

Создать класс Case, представляющий данные, хранящиеся в DBObject.
case class Data(x: Int, s: String)

Затем сопоставьте значения вашего rdd с экземплярами вашего класса case. val dataRDD = mongoRDD.values.map { obj => Data(obj.get("x"), obj.get("s")) }

Теперь с РДУ [Data], вы можете создать DataFrame с sqlContext

val myDF = sqlContext.createDataFrame(dataRDD)

Это должно вас происходит. Я могу объяснить позже, если это необходимо.