Каков правильный способ использования Elasticsearch с помощью Spark для обновления существующих объектов?Elasticearch и Spark: обновление существующих объектов
Я хотел что-то вроде следующего:
- Получить существующие данные в виде карты.
- Создайте новую карту и заполните ее обновленными полями.
- Сохранять новую карту.
Однако, есть несколько вопросов:
- Список возвращаемых полей не может содержать _id, as it is not part of the source.
Если для тестирования, я жёстко существующего
_id
на карте новых значений, следующее исключение:org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest
Как должны _id
, и как его следует вернуть обратно в Spark?
Я включил следующий код ниже, чтобы лучше проиллюстрировать то, что я пытаюсь сделать:
JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, INDEX_NAME+"/"+TYPE_NAME,
"?source=,field1,field2).values();
Iterator<Map<String, Object>> iter = esRDD.toLocalIterator();
List<Map<String, Object>> listToPersist = new ArrayList<Map<String, Object>>();
while(iter.hasNext()){
Map<String, Object> map = iter.next();
// Get existing values, and do transformation logic
Map<String, Object> newMap = new HashMap<String, Object>();
newMap.put("_id", ??????);
newMap.put("field1", new_value);
listToPersist.add(newMap);
}
JavaRDD javaRDD = jsc.parallelize(ImmutableList.copyOf(listToPersist));
JavaEsSpark.saveToEs(javaRDD, INDEX_NAME+"/"+TYPE_NAME);
В идеале, я хотел бы обновить существующую карту на месте, а не создавать новый.
У кого-нибудь есть какой-либо пример кода, показывающий при использовании Spark правильный способ обновления существующих объектов в elasticsearch?
Благодаря
Вы можете сделать отображение в поле ID (EsSpark.saveToEs (rdd, «spark/docs», Map («es.mapping.id» -> «id»))), как указано в документации https://www.elastic.co/guide/en/elasticsearch/hadoop/master /spark.html#spark-write – aletapool