Я хочу сделать 2 набора данных из 2 разных баз данных Mongo. В настоящее время я использую официальный коннектор MongoSpark. sparkSession запускается следующим образом.Чтение из нескольких MongoDB для формирования набора данных
SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("test")
.set("spark.mongodb.input.partitioner", "MongoShardedPartitioner")
.set("spark.mongodb.input.uri", "mongodb://192.168.77.62/db1.coll1")
.set("spark.sql.crossJoin.enabled", "true");
SparkSession sparkSession = sparkSession.builder().appName("test1").config(sparkConf).getOrCreate();
Если я хочу изменить spark.mongodb.input.uri, как я это сделаю? Я уже попытался изменить runtimeConfig sparkSession, а также использовать ReadConfig с readOverrides, но это не сработало.
Метод 1:
sparkSession.conf().set("spark.mongodb.input.uri", "mongodb://192.168.77.63/db1.coll2");
Метод 2:
Map<String, String> readOverrides = new HashMap<String, String>();
readoverrides.put("uri","192.168.77.63/db1.coll2");
ReadConfig readConfig = ReadConfig.create(sparkSession).withOptions(readOverrides);
Dataset<Position> ds = MongoSpark.load(sparkSession, readConfig, Position.class);
Edit 1: Как было предложено Кароль я попробовал следующий метод
SparkConf sparkConf = new SparkConf().setMaster("yarn").setAppName("test");
SparkSession sparkSession = SparkSession.builder().appName("test1").config(sparkConf).getOrCreate();
Map<String, String> readOverrides1 = new HashMap<String, String>();
readOverrides1.put("uri", "mongodb://192.168.77.62:27017");
readOverrides1.put("database", "db1");
readOverrides1.put("collection", "coll1");
ReadConfig readConfig = ReadConfig.create(sparkSession).withOptions(readOverrides1);
Это терпит неудачу во время выполнения, говоря:
Exception in thread "main" java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.database' property
Edit 2:
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("test")
.config("spark.worker.cleanup.enabled", "true").config("spark.scheduler.mode", "FAIR").getOrCreate();
String mongoURI2 = "mongodb://192.168.77.63:27017/db1.coll1";
Map<String, String> readOverrides1 = new HashMap<String, String>();
readOverrides1.put("uri", mongoURI2);
ReadConfig readConfig1 = ReadConfig.create(sparkSession).withOptions(readOverrides1);
MongoSpark.load(sparkSession,readConfig1,Position.class).show();
}
Тем не менее это дает такое же исключение, как и предыдущие редактирования.
Но sparkSession должен иметь MongoDB Uri время инициализация ReadConfig. Я пробовал использовать 2 ReadConfigs, но он не справился во время выполнения, говоря, что ему нужен uri для SparkSession. –
И если я передаю один uri SparkSession, я не могу перезаписать его с помощью ReadConfig. –
не нужно устанавливать конфигурацию mongo в искровом сеансе –