Я пытаюсь вставить искра sql dataframe в удаленную коллекцию mongodb. Ранее я написал java-программу с MongoClient, чтобы проверить, доступен ли удаленный сбор, и я был в состоянии это сделать.Spark Dataframe to MongoDB Document Insertion Issue
Мой подарок искрой код, как показано ниже -
scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.hive.HiveContext = [email protected]
scala> val depts = sqlContext.sql("select * from test.user_details")
depts: org.apache.spark.sql.DataFrame = [user_id: string, profile_name: string ... 7 more fields]
scala> depts.write.options(scala.collection.Map("uri" -> "mongodb://<username>:<pwd>@<hostname>:27017/<dbname>.<collection>")).mode(SaveMode.Overwrite).format("com.mongodb.spark.sql").save()
Ths дает следующее сообщение об ошибке -
java.lang.AbstractMethodError: com.mongodb.spark.sql.DefaultSource.createRelation(Lorg/apache/spark/sql/SQLContext;Lorg/apache/spark/sql/SaveMode;Lscala/collection/immutable/Map;Lorg/apache/spark/sql/Dataset;)Lorg/apache/spark/sql/sources/BaseRelation;
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
... 84 elided
Я также попытался следующие, бросает ошибку ниже:
scala> depts.write.options(scala.collection.Map("uri" -> "mongodb://<username>:<pwd>@<host>:27017/<database>.<collection>")).mode(SaveMode.Overwrite).save()
java.lang.IllegalArgumentException: 'path' is not specified
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$17.apply(DataSource.scala:438)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$17.apply(DataSource.scala:438)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.getOrElse(ddl.scala:117)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:437)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
... 58 elided
Я импортировал следующие пакеты -
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern}
import com.mongodb.spark.config._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql._
depts.show() работает как ожидалось, т.е. DataFrame успешно создан.
Возможно, кто-нибудь предоставит мне какие-либо рекомендации/предложения по этому вопросу. Благодаря