Вот мой упрощенный код Apache Spark Streaming, который вводится через потоки Kafka, объединяет, печатает и сохраняет их в файл. Но теперь я хочу, чтобы входящий поток данных сохранялся в MongoDB.Сохранить Scala Spark Streaming Data to MongoDB
val conf = new SparkConf().setMaster("local[*]")
.setAppName("StreamingDataToMongoDB")
.set("spark.streaming.concurrentJobs", "2")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val ssc = new StreamingContext(sc, Seconds(1))
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topicName1 = List("KafkaSimple").toSet
val topicName2 = List("SimpleKafka").toSet
val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicName1)
val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicName2)
val lines1 = stream1.map(_._2)
val lines2 = stream2.map(_._2)
val allThelines = lines1.union(lines2)
allThelines.print()
allThelines.repartition(1).saveAsTextFiles("File", "AllTheLinesCombined")
Я пробовал библиотеку Stratio Spark-MongoDB и некоторые другие ресурсы, но до сих пор нет успеха. Кто-то, пожалуйста, помогите мне продолжить или перенаправить меня на какой-то полезный рабочий ресурс/учебник. Cheers :)
Какая ошибка вы видите? –