Я новичок с искры Apache, а также с языком программирования Scala.Сохраните данные MongoDB в формате паркета с помощью Apache Spark
То, что я пытаюсь добиться, чтобы извлечь данные из моей локальной базы данных MongoDB для затем сохранить его в parquet format с помощью Apache Спарк с Hadoop-разъем
Это мой код до сих пор:
package com.examples
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.bson.BSONObject
import com.mongodb.hadoop.{MongoInputFormat, BSONFileInputFormat}
import org.apache.spark.sql
import org.apache.spark.sql.SQLContext
object DataMigrator {
def main(args: Array[String])
{
val conf = new SparkConf().setAppName("Migration App").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// Import statement to implicitly convert an RDD to a DataFrame
import sqlContext.implicits._
val mongoConfig = new Configuration()
mongoConfig.set("mongo.input.uri", "mongodb://localhost:27017/mongosails4.case")
val mongoRDD = sc.newAPIHadoopRDD(mongoConfig, classOf[MongoInputFormat], classOf[Object], classOf[BSONObject]);
val count = countsRDD.count()
// the count value is aprox 100,000
println("================ PRINTING =====================")
println(s"ROW COUNT IS $count")
println("================ PRINTING =====================")
}
}
Дело в том, что для сохранения данных в формат файла паркета сначала необходимо преобразовать переменную mongoRDD в Spark DataFrame. Я пытался что-то вроде этого:
// convert RDD to DataFrame
val myDf = mongoRDD.toDF() // this lines throws an error
myDF.write.save("my/path/myData.parquet")
и ошибка, я получаю это: Exception in thread "main" scala.MatchError: java.lang.Object (of class scala.reflect.internal.Types.$TypeRef$$anon$6)
вы, ребята, есть какие-либо другие идеи, как я мог бы преобразовать RDD в DataFrame, так что я могу спасти данные в паркетном формате?
Вот структура одного документа в коллекции MongoDB: https://gist.github.com/kingtrocko/83a94238304c2d654fe4