Я пытаюсь спасти Spark-DataFrame с помощью соединителя PyMongo. Ниже мой код, но каждый рабочий день я запускаю код, я получаю сообщение об ошибке:Сохранить HDFS To MongoDB с помощью Spark-DataFrame
java.io.IOException: No FileSystem for scheme: mongodb
Ниже мой код:
import pymongo
import pymongo_spark
pymongo_spark.activate()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext=SQLContext(sc)
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
path = "hdfs://localhost:9000/home/hadoop/h_data/sales_ord_univ.csv"
df=sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path)
collections=df.collect()
df.write.format('mongodb://localhost:27017/test.sales_order_2').save()
У меня есть довольно наивный код, так как Im новичок к этому , но любая помощь в этом отношении была бы весьма признательна. Им с помощью искрового 2.0.0, Python 2.7.6, MongoDB: 3.2.9
Это хорошее решение. Однако можем ли мы обрабатывать исключения с помощью этого Spark-коннектора в PySpark? Потому что есть вероятность, что данные могут легко превышать ограничение размера документа MongoDB в размере 16 МБ. –
Вы всегда можете заключить его в оператор 'try/except'. Обратите внимание, что строка CSV будет единственным документом, а не всей CSV станет единым документом. См. Также определение [MongoDB Document] (https://docs.mongodb.com/manual/core/document/). Если значение строки CSV превышает 16 МБ, вы можете пересмотреть схему/модель. –