Я не использовал MongoDB, но имел аналогичный вариант использования. Мой сценарий: Существует поток событий, прочитанный из темы кафки. Эти события должны быть сопоставлены и сгруппированы по ключу, для каждого ключа может быть соответствующая запись в хранилище данных (HBase в моем случае, MongoDB в вашем). Если есть запись, объедините ключевые события в существующий объект, если не создайте новый объект и сохраните его в HBase. В моем случае есть и другие осложнения, такие как поиск нескольких таблиц и т. Д., Но суть проблемы, похоже, похожа на вашу.
Мой подход и проблемы, с которыми я столкнулся: Я использовал подход Kafka Direct Streaming, это дает мне пакет (для этого обсуждения является взаимозаменяемым с RDD) данных для заданного времени. Подход Direct Streaming будет считываться из всех разделов kafka, но вы должны установить ручную контрольную точку в потоковом контексте, чтобы сделать вашу программу пригодной для восстановления.
Теперь это RDD представляет все сообщения, считываемые в течение заданной длительности. Вы можете дополнительно настроить максимальный размер этой партии. Когда вы выполняете какую-либо обработку на этом RDD, RDD разбивается на куски, и каждый фрагмент обрабатывается исполнителем. Spark обычно порождает одного исполнителя на ядро на машину в кластере. Я бы посоветовал вам настроить максимальное число для вашей искровой работы. Вы можете амортизировать доступ к источнику данных (HBase в моем случае) для каждого раздела. Поэтому, если у вас есть 10 исполнителей, работающих параллельно, помните, что вы можете открывать 10 подключений ввода/вывода параллельно вашим данным. Так как ваше чтение должно отражать самую последнюю запись на сущности, это, вероятно, самый важный аспект вашего дизайна. Можете ли вы гарантировать стабильность последовательности данных?
С точки зрения кода вашей программы будет выглядеть примерно так
dataStream.foreachRDD(rdd -> {
// for each incoming batch, do any shuffle operations like groupByKey first
// This is because during shuffle data is exchanged between partitions
rdd.groupByKey().mapPartitions(eventsInPartition -> {
// this part of the code executes at each partition.
Connection connection = createConnectionToDataSource()
eventsInPartition.forEachRemaining(eventPair -> {
Entity entity = connection.getStuffFromDB(eventPair._1)
entity.addNewEvents(eventPair._2) // your merge step
connection.writeStuffToDB(eventPair._1, entity)
})
})
})
Вы начинаете с foreachRDD действовать на каждом входящем пакете данных. Сделайте любые карты или преобразования, которые могут применяться к каждому отдельному событию параллельно. groupByKey будет перемешать данные по разделам, и вы будете иметь все события с одним и тем же ключом в том же разделе. mapPartitions принимает функцию, которая выполняется в одном разделе. Здесь вы можете сформировать соединения с вашей БД. В этом примере, поскольку мы делаем группу по ключу, у вас есть параRDD, который является RDD кортежа события key + Iterable последовательности событий. Вы можете использовать соединение для поиска, слияния, сделать другую магию, выписать объект в БД. Попробуйте использовать различные конфигурации для продолжительности пакета, max core, maxRatePerPartitions для управления потоком данных на основе того, как ваша БД и ваш кластер обрабатывают нагрузки.
Проверьте мое редактирование, я обновил, что я подразумеваю под слиянием. – TheM00s3
Тогда это одностороннее направление, и вы можете использовать 'upsert'. Перед сохранением просто переименуйте 'orderId' в' _id'. –