2016-09-07 5 views
1

У меня есть требование прочитать файлы csv, которые будут перекачиваться с телеметрического оборудования в облако и хранить соответствующие данные в магазине Mongodb. Я использую Spark Streaming для чтения новых файлов (они появляются каждую минуту, иногда даже чаще) и с использованием соединителя MomgoDB-Spark. Проблема в том, что данные не загружаются в MomgoDB. Я добавил шаги showf() Dataframe в свой код, и они отображаются на консоли правильно, что означает, что приложение Streaming считывает и обрабатывает файлы, как ожидалось. Но последний шаг спасения в MongoDB не происходит. Мой код выглядит следующим образомSpark Streaming с бэкэндом MongoDB

reqdata.foreachRDD { edata => 
    import sqlContext.implicits._ 
    val loaddata = edata.map(w => EnergyData(w(0).toString,w(1).toString,w(2).toString)).toDF() 
    loaddata.show() 
    loaddata.printSchema(); 
    MongoSpark.save(loaddata.write.option("uri","mongodb://127.0.0.1:27017/storedata.energydata").mode("overwrite")) 
} 

ssc.start() 

loaddata.show() функция отображения данных, просто отлично.

Я проверил журналы MongoDB и обнаружили несколько странных линий, как

«2016-09-07T08: 12: 30.109-0700 I СЕТЬ [initandlisten] соединение принимается от 127.0.0.1:55694 # 212 (3 соединений в настоящее время открыты) 2016-09-07T08: 12: 30.111-0700 Я КОМАНДА [conn212] CMD: падение storedata.energydata»

Теперь, я не понимаю, почему Монго сбросят коллекцию на все , Любая помощь будет высоко оценен

+3

_I не понимаю, почему Монго сбросят коллекцию на all_ - '.mode («перезаписать») ' – zero323

ответ

0

Я решил это сам, изменяя режим экономии на append:

MongoSpark.save(loaddata.write.option("uri","mongodb://127.0.0.1:27017/storedata.energydata").mode("append"))