1

Я пытаюсь вставить искра sql dataframe в удаленную коллекцию mongodb. Ранее я написал java-программу с MongoClient, чтобы проверить, доступен ли удаленный сбор, и я был в состоянии это сделать.Spark Dataframe to MongoDB Document Insertion Issue

Мой подарок искрой код, как показано ниже -

scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 
warning: there was one deprecation warning; re-run with -deprecation for details 
sqlContext: org.apache.spark.sql.hive.HiveContext = [email protected] 
scala> val depts = sqlContext.sql("select * from test.user_details") 
depts: org.apache.spark.sql.DataFrame = [user_id: string, profile_name: string ... 7 more fields] 
scala> depts.write.options(scala.collection.Map("uri" -> "mongodb://<username>:<pwd>@<hostname>:27017/<dbname>.<collection>")).mode(SaveMode.Overwrite).format("com.mongodb.spark.sql").save() 

Ths дает следующее сообщение об ошибке -

java.lang.AbstractMethodError: com.mongodb.spark.sql.DefaultSource.createRelation(Lorg/apache/spark/sql/SQLContext;Lorg/apache/spark/sql/SaveMode;Lscala/collection/immutable/Map;Lorg/apache/spark/sql/Dataset;)Lorg/apache/spark/sql/sources/BaseRelation; 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) 
    ... 84 elided 

Я также попытался следующие, бросает ошибку ниже:

scala> depts.write.options(scala.collection.Map("uri" -> "mongodb://<username>:<pwd>@<host>:27017/<database>.<collection>")).mode(SaveMode.Overwrite).save() 
java.lang.IllegalArgumentException: 'path' is not specified 
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$17.apply(DataSource.scala:438) 
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$17.apply(DataSource.scala:438) 
    at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) 
    at org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.getOrElse(ddl.scala:117) 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:437) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) 
    ... 58 elided 

Я импортировал следующие пакеты -

import org.apache.spark.{SparkConf, SparkContext} 

import org.apache.spark.sql.SQLContext 

import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern} 

import com.mongodb.spark.config._ 

import org.apache.spark.sql.hive.HiveContext 

import org.apache.spark.sql._ 

depts.show() работает как ожидалось, т.е. DataFrame успешно создан.

Возможно, кто-нибудь предоставит мне какие-либо рекомендации/предложения по этому вопросу. Благодаря

ответ

1

Предполагая, что вы используете MongoDB Spark Connector v1.0, Вы можете сохранить DataFrames SQL, как показано ниже:

// DataFrames SQL example 
df.registerTempTable("temporary") 
val depts = sqlContext.sql("select * from test.user_details") 
depts.show() 
// Save out the filtered DataFrame result 
MongoSpark.save(depts.write.option("uri", "mongodb://hostname:27017/database.collection").mode("overwrite")) 

Для получения дополнительной информации см MongoDB Spark Connector: Spark SQL

Для простого демо MongoDB и Спарк с помощью докер см. MongoDB Spark Docker: examples.scala - dataframes