Мне нужно обработать некоторые данные из Spark DataFrame до индекса ElasticSearch.Denormalized DataFrame для вложенных документов
Мой DataFrame:
scala> source.printSchema()
root
|-- dialogue_id: string (nullable = true)
|-- dialogue_number: string (nullable = true)
|-- dialogue_text: string (nullable = true)
scala> df_echanges.show
+----------------------+-----------------------+----------------------------+
| dialogue_id| dialogue_number| dialogue_text|
+----------------------+-----------------------+----------------------------+
| DIAL1| 1| Hello !|
| DIAL1| 2| Hi !|
| DIAL1| 3| How are you ?|
| DIAL1| 4| Fine and you ?|
| DIAL1| 5| Fine !|
| DIAL2| 1| Hello ! How are you ?|
| DIAL2| 2| Fine !|
+----------------------+-----------------------+----------------------------+
Мой назначения индекс ES, поле "диалог" является вложенной:
{
"mappings": {
"dialogues": {
"properties": {
"dialogue_id": {
"type": "string"
},
"dialogue": {
"type": "nested",
"properties": {
"dialogue_number": {
"type": "string"
},
"dialogue_text": {
"type": "string"
}
}
}
}
}
}
}
Так что мне нужно, чтобы превратить мою DataFrame в этом:
scala> dest.printSchema()
root
|-- dialogue_id: string (nullable = true)
|-- dialogue: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dialogue_number: string (nullable = true)
| | |-- dialogue_text: boolean (nullable = true)
Как это сделать?
Спасибо!
Джеффри
Он отлично работает ... Вы рок! :) –