2016-10-10 5 views
1

Я пытаюсь спасти Spark-DataFrame с помощью соединителя PyMongo. Ниже мой код, но каждый рабочий день я запускаю код, я получаю сообщение об ошибке:Сохранить HDFS To MongoDB с помощью Spark-DataFrame

java.io.IOException: No FileSystem for scheme: mongodb 

Ниже мой код:

import pymongo 
import pymongo_spark 
pymongo_spark.activate() 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 
conf = SparkConf() 
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc) 
from pyspark.sql import SparkSession 
from pyspark.sql import SparkSession 
path = "hdfs://localhost:9000/home/hadoop/h_data/sales_ord_univ.csv" 
df=sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path) 
collections=df.collect() 
df.write.format('mongodb://localhost:27017/test.sales_order_2').save() 

У меня есть довольно наивный код, так как Im новичок к этому , но любая помощь в этом отношении была бы весьма признательна. Им с помощью искрового 2.0.0, Python 2.7.6, MongoDB: 3.2.9

ответ

1

I'm trying to save a Spark-DataFrame using PyMongo connector

Вы можете попробовать использовать MongoDB Connector for Spark. Использование настройки среды Apache Spark v2.0.x, Python v2.7.x и MongoDB v3.2.x, вы можете сделать что-то, как показано ниже:

from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("Application Name").getOrCreate() 
dataframe = spark.read.csv("path/to/file.csv", header=True, mode="DROPMALFORMED") 
dataframe.write.format("com.mongodb.spark.sql.DefaultSource")\ 
       .option("spark.mongodb.output.uri", "mongodb://localhost:27017/database.collection")\ 
       .save() 

Полная версия примера файла Python можно найти на MongoDB PySpark Docker: examples.py , Который включает пример использования MongoDB Aggregation в Искрах и Spark SQL.

Если вы знакомы с docker, вы можете выполнить проект git MongoDB PySpark Docker с помощью docker-compose и запустить некоторые из примеров PySpark.

Вы можете найти следующие ресурсы полезны:

+0

Это хорошее решение. Однако можем ли мы обрабатывать исключения с помощью этого Spark-коннектора в PySpark? Потому что есть вероятность, что данные могут легко превышать ограничение размера документа MongoDB в размере 16 МБ. –

+0

Вы всегда можете заключить его в оператор 'try/except'. Обратите внимание, что строка CSV будет единственным документом, а не всей CSV станет единым документом. См. Также определение [MongoDB Document] (https://docs.mongodb.com/manual/core/document/). Если значение строки CSV превышает 16 МБ, вы можете пересмотреть схему/модель. –

Смежные вопросы