2017-02-16 1 views
2

Я хочу сделать 2 набора данных из 2 разных баз данных Mongo. В настоящее время я использую официальный коннектор MongoSpark. sparkSession запускается следующим образом.Чтение из нескольких MongoDB для формирования набора данных

SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("test") 
         .set("spark.mongodb.input.partitioner", "MongoShardedPartitioner") 
         .set("spark.mongodb.input.uri", "mongodb://192.168.77.62/db1.coll1") 
         .set("spark.sql.crossJoin.enabled", "true"); 
SparkSession sparkSession = sparkSession.builder().appName("test1").config(sparkConf).getOrCreate(); 

Если я хочу изменить spark.mongodb.input.uri, как я это сделаю? Я уже попытался изменить runtimeConfig sparkSession, а также использовать ReadConfig с readOverrides, но это не сработало.

Метод 1:

sparkSession.conf().set("spark.mongodb.input.uri", "mongodb://192.168.77.63/db1.coll2"); 

Метод 2:

Map<String, String> readOverrides = new HashMap<String, String>(); 
readoverrides.put("uri","192.168.77.63/db1.coll2"); 
ReadConfig readConfig = ReadConfig.create(sparkSession).withOptions(readOverrides); 
Dataset<Position> ds = MongoSpark.load(sparkSession, readConfig, Position.class); 

Edit 1: Как было предложено Кароль я попробовал следующий метод

SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("test"); 
SparkSession sparkSession = SparkSession.builder().appName("test1").config(sparkConf).getOrCreate(); 
    Map<String, String> readOverrides1 = new HashMap<String, String>(); 
      readOverrides1.put("uri", "mongodb://192.168.77.62:27017"); 
      readOverrides1.put("database", "db1"); 
      readOverrides1.put("collection", "coll1"); 
      ReadConfig readConfig = ReadConfig.create(sparkSession).withOptions(readOverrides1); 

Это терпит неудачу во время выполнения, говоря:

Exception in thread "main" java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.database' property 

Edit 2:

public static void main(String[] args) { 
    SparkSession sparkSession = SparkSession.builder().appName("test") 
      .config("spark.worker.cleanup.enabled", "true").config("spark.scheduler.mode", "FAIR").getOrCreate(); 
    String mongoURI2 = "mongodb://192.168.77.63:27017/db1.coll1"; 
    Map<String, String> readOverrides1 = new HashMap<String, String>(); 
    readOverrides1.put("uri", mongoURI2); 
    ReadConfig readConfig1 = ReadConfig.create(sparkSession).withOptions(readOverrides1); 
    MongoSpark.load(sparkSession,readConfig1,Position.class).show(); 
} 

Тем не менее это дает такое же исключение, как и предыдущие редактирования.

ответ

1

built.sbt: libraryDependencies += "org.mongodb.spark" % "mongo-spark-connector_2.11" % "2.0.0"

package com.example.app 

import com.mongodb.spark.config.{ReadConfig, WriteConfig} 
import com.mongodb.spark.sql._ 

object App { 


def main(args: Array[String]): Unit = { 

    val MongoUri1 = args(0).toString 
    val MongoUri2 = args(1).toString 
    val SparkMasterUri= args(2).toString 

    def makeMongoURI(uri:String,database:String,collection:String) = (s"${uri}/${database}.${collection}") 

    val mongoURI1 = s"mongodb://${MongoUri1}:27017" 
    val mongoURI2 = s"mongodb://${MongoUri2}:27017" 

    val CONFdb1 = makeMongoURI(s"${mongoURI1}","MyColletion1,"df") 
    val CONFdb2 = makeMongoURI(s"${mongoURI2}","MyColletion2,"df") 

    val WRITEdb1: WriteConfig = WriteConfig(scala.collection.immutable.Map("uri"->CONFdb1)) 
    val READdb1: ReadConfig = ReadConfig(Map("uri" -> CONFdb1)) 

    val WRITEdb2: WriteConfig = WriteConfig(scala.collection.immutable.Map("uri"->CONFdb2)) 
    val READdb2: ReadConfig = ReadConfig(Map("uri" -> CONFdb2)) 

    val spark = SparkSession 
    .builder 
    .appName("AppMongo") 
    .config("spark.worker.cleanup.enabled", "true") 
    .config("spark.scheduler.mode", "FAIR") 
    .getOrCreate() 

    val df1 = spark.read.mongo(READdb1) 
    val df2 = spark.read.mongo(READdb2) 
    df1.write.mode("overwrite").mongo(WRITEdb1) 
    df2.write.mode("overwrite").mongo(WRITEdb2) 

} 

} 

теперь вы можете пройти uri1 и uri2 в /usr/local/spark/bin/spark-submit pathToMyjar.app.jar MongoUri1 MongoUri2 sparkMasterUri в аргументах, а затем создать config для каждого uri

spark.read.mongo(READdb) 
+0

Но sparkSession должен иметь MongoDB Uri время инициализация ReadConfig. Я пробовал использовать 2 ReadConfigs, но он не справился во время выполнения, говоря, что ему нужен uri для SparkSession. –

+0

И если я передаю один uri SparkSession, я не могу перезаписать его с помощью ReadConfig. –

+0

не нужно устанавливать конфигурацию mongo в искровом сеансе –

Смежные вопросы