2016-11-02 3 views
0

Мне интересно, сможет ли Mongo-spark справиться с сценарием, где Im будет импортировать json-данные из потока, но для каждого файла я хочу сначала, если есть соответствующий субъект уже в пределах Монго, и если есть, я хотел бы объединить два документа вручную.агрегирование существующих данных с помощью идентификаторов от mongodb in spark

То, как импортированные данные выглядят выглядит следующим образом

{orderId: 1290edoiadq, from: <Some_address_string>, to: <Some_address_string>, status: "Shipped"}. 

В MongoDB, что у меня есть одни и те же данные, но _id поля содержит orderId. Я ищу, чтобы получить все заказы, а затем проверить, нужно ли им обновлять или вставлять.

EDIT Позвольте мне уточнить, что означает слияние. Если у меня есть заказ с тем же идентификатором, но их статус отличается, я бы хотел обновить статус существующего порядка в db, чтобы быть тем, что находится в данных JSON.

ответ

1

Я хотел бы вручную объединить 2 документа.

В зависимости от вашего определения merge.

Если это одностороннее направление, из входящего потока данных json для обновления документов, хранящихся в MongoDB, вы можете использовать upsert.

С MongoDB Connector for Spark версия 1.1.0, если dataframe содержит _id, соответствующие данным в MongoDB, save() будет использовать upsert. Который будет обновляться, если существует соответствие _id, иначе вставьте.

Например, чтобы изменить status=delivered:

> df.schema 
    org.apache.spark.sql.types.StructType = StructType(StructField(_id,StringType,true), StructField(from,StringType,true), StructField(status,StringType,true), StructField(to,StringType,true)) 

> df.first() 
    org.apache.spark.sql.Row = [1290edoiadq,sender,delivered,receiver] 

> MongoSpark.save(df.write.option("collection", "order").mode("append")) 

Вы просто должны переименовать orderId поле для _id перед вызовом save().

См. SPARK-66 и MongoSpark: save() для получения дополнительной информации.

Однако если значение merge означает двухстороннее обновление (входящий поток и MongoDB), тогда вам нужно будет объединить изменения сначала в Spark. Разрешение любого конфликта, как вы сочтете целесообразным в коде.

+0

Проверьте мое редактирование, я обновил, что я подразумеваю под слиянием. – TheM00s3

+1

Тогда это одностороннее направление, и вы можете использовать 'upsert'. Перед сохранением просто переименуйте 'orderId' в' _id'. –

-1

Я не использовал MongoDB, но имел аналогичный вариант использования. Мой сценарий: Существует поток событий, прочитанный из темы кафки. Эти события должны быть сопоставлены и сгруппированы по ключу, для каждого ключа может быть соответствующая запись в хранилище данных (HBase в моем случае, MongoDB в вашем). Если есть запись, объедините ключевые события в существующий объект, если не создайте новый объект и сохраните его в HBase. В моем случае есть и другие осложнения, такие как поиск нескольких таблиц и т. Д., Но суть проблемы, похоже, похожа на вашу.

Мой подход и проблемы, с которыми я столкнулся: Я использовал подход Kafka Direct Streaming, это дает мне пакет (для этого обсуждения является взаимозаменяемым с RDD) данных для заданного времени. Подход Direct Streaming будет считываться из всех разделов kafka, но вы должны установить ручную контрольную точку в потоковом контексте, чтобы сделать вашу программу пригодной для восстановления.

Теперь это RDD представляет все сообщения, считываемые в течение заданной длительности. Вы можете дополнительно настроить максимальный размер этой партии. Когда вы выполняете какую-либо обработку на этом RDD, RDD разбивается на куски, и каждый фрагмент обрабатывается исполнителем. Spark обычно порождает одного исполнителя на ядро ​​на машину в кластере. Я бы посоветовал вам настроить максимальное число для вашей искровой работы. Вы можете амортизировать доступ к источнику данных (HBase в моем случае) для каждого раздела. Поэтому, если у вас есть 10 исполнителей, работающих параллельно, помните, что вы можете открывать 10 подключений ввода/вывода параллельно вашим данным. Так как ваше чтение должно отражать самую последнюю запись на сущности, это, вероятно, самый важный аспект вашего дизайна. Можете ли вы гарантировать стабильность последовательности данных?

С точки зрения кода вашей программы будет выглядеть примерно так

dataStream.foreachRDD(rdd -> { 
// for each incoming batch, do any shuffle operations like groupByKey first 
// This is because during shuffle data is exchanged between partitions 
rdd.groupByKey().mapPartitions(eventsInPartition -> { 
    // this part of the code executes at each partition. 
    Connection connection = createConnectionToDataSource() 

    eventsInPartition.forEachRemaining(eventPair -> { 

    Entity entity = connection.getStuffFromDB(eventPair._1) 
    entity.addNewEvents(eventPair._2) // your merge step 
    connection.writeStuffToDB(eventPair._1, entity) 

    }) 
}) 
}) 

Вы начинаете с foreachRDD действовать на каждом входящем пакете данных. Сделайте любые карты или преобразования, которые могут применяться к каждому отдельному событию параллельно. groupByKey будет перемешать данные по разделам, и вы будете иметь все события с одним и тем же ключом в том же разделе. mapPartitions принимает функцию, которая выполняется в одном разделе. Здесь вы можете сформировать соединения с вашей БД. В этом примере, поскольку мы делаем группу по ключу, у вас есть параRDD, который является RDD кортежа события key + Iterable последовательности событий. Вы можете использовать соединение для поиска, слияния, сделать другую магию, выписать объект в БД. Попробуйте использовать различные конфигурации для продолжительности пакета, max core, maxRatePerPartitions для управления потоком данных на основе того, как ваша БД и ваш кластер обрабатывают нагрузки.

Смежные вопросы