2016-03-27 3 views
0

Я пытаюсь прочитать данные в искры с помощью разъема mongo-hadoop. Проблема в том, что если я пытаюсь установить ограничение на чтение данных, я получаю в RDD ограничение * количество разделов.MongoHadoop Connector, используемый с Spark, производит дублирование результатов по количеству разделов

mongodbConfig.set("mongo.job.input.format","com.mongodb.hadoop.MongoInputFormat"); 
mongodbConfig.set("mongo.input.uri", "mongodb://localhost:27017/test.restaurants"); 
mongodbConfig.set("mongo.input.limit","3"); 
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
      mongodbConfig,   // Configuration 
      MongoInputFormat.class, // InputFormat: read from a live cluster. 
      Object.class,    // Key class 
      BSONObject.class   // Value class 
    ); 

    long count = documents.count(); 
    System.out.println("Collection Count: " + count); 
    System.out.println("Partitions: " + documents.partitions().size()); 

//9 elements in the RDD = limit * nrOfPartions = 3 * 3 
//3 partitions 

Это поведение воспроизводится для других пределов (я всегда получаю ограничение * 3).

Я получаю подобное поведение, если я пытаюсь запросить просто objectId (он создает RDD с одним и тем же объектом * количество разделов - в моем случае 3 элемента с одним документом).

Я также могу предоставить сценарий для создания коллекции монго, если бы это было полезно.

ответ

1

Это особенность, а не ошибка. mongo.input.limit используется для установки limit parameter для MongoInputSplit, поэтому он применяется к разделу по разделам не по всему миру.

В целом невозможно (или, если быть точным, практичным) ограничить количество принятых записей по всему миру. Каждый раскол обрабатывается независимо и, как правило, нет априорных знаний о количестве записей, которое получается из каждого разделения.

Смежные вопросы