2016-05-27 2 views
-1

Мне нужно обработать некоторые данные из Spark DataFrame до индекса ElasticSearch.Denormalized DataFrame для вложенных документов

Мой DataFrame:

scala> source.printSchema() 
root 
|-- dialogue_id: string (nullable = true) 
|-- dialogue_number: string (nullable = true) 
|-- dialogue_text: string (nullable = true) 
scala> df_echanges.show 
+----------------------+-----------------------+----------------------------+ 
|   dialogue_id|  dialogue_number|    dialogue_text| 
+----------------------+-----------------------+----------------------------+ 
|     DIAL1|      1|      Hello !| 
|     DIAL1|      2|      Hi !| 
|     DIAL1|      3|    How are you ?| 
|     DIAL1|      4|    Fine and you ?| 
|     DIAL1|      5|      Fine !| 
|     DIAL2|      1|  Hello ! How are you ?| 
|     DIAL2|      2|      Fine !| 
+----------------------+-----------------------+----------------------------+ 

Мой назначения индекс ES, поле "диалог" является вложенной:

{ 
    "mappings": { 
     "dialogues": { 
      "properties": { 
       "dialogue_id": { 
       "type": "string" 
      }, 
      "dialogue": { 
       "type": "nested", 
       "properties": { 
        "dialogue_number": { 
        "type": "string" 
        }, 
        "dialogue_text": { 
        "type": "string" 
        } 
       } 
      } 
     } 
     } 
    } 
} 

Так что мне нужно, чтобы превратить мою DataFrame в этом:

scala> dest.printSchema() 
root 
|-- dialogue_id: string (nullable = true) 
|-- dialogue: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- dialogue_number: string (nullable = true) 
| | |-- dialogue_text: boolean (nullable = true) 

Как это сделать?

Спасибо!

Джеффри

ответ

0

Я думаю, самый простой способ сделать это было бы, чтобы избежать мир DataFrames для трансформации. После завершения преобразования вы можете конвертировать обратно в DataFrame и делать то, что вы хотите делать с данными в вашей целевой схеме.

Я хотел бы попробовать что-то вроде следующего:

Сначала убедитесь, что у вас есть некоторые тематические классы, объявленные, которые представляют вашу схему назначения (это самый простой способ, но именовании для членов класса случае не нарушают «стандарт» конвенции Scala код):

case class DialogueElement(dialogue_id: String, dialogue: Array[InnerDialogueElement]) 
case class InnerDialogueElement(dialogue_number: String, dialogue_text: String) 

выполнить следующую трансформацию (в основном, с использованием API RDD):

// Transform to RDD and group by first column (= index 0) 
val groupedRdd = source.rdd.groupBy(row => row.getString(0)) 

// Map the grouped values into a case class that represents 
// your inner dialogue elements 
val mappedInnerElementsRdd = groupedRdd 
    .mapValues(group => group.map(r => InnerDialogueElement(r.getString(1), r.getString(2)))) 

// Map everything into a case class that fully represents your destination schema 
val finalRdd = mappedInnerElementsRdd.map({ case (dialogueId, innerElements) => DialogueElement(dialogueId, innerElements.toArray) }) 

import sqlContext.implicits._ // needed for calling toDF() 

val finalDF = finalRdd.toDF() 

finalDF.printSchema() // should print your desired schema 

Вместо объявления вышеприведенных классов случаев с точными именами полей (например, «Dialogue_id»), можно также назвать те пользователь по-разными и вручную tranform назад от РДА в DataFrame с помощью:

sqlContext.createDataFrame(yourRDD, yourSchemaContainingTheFieldNamesYouWantToHave) 

Надеется, что это помогает :)

PS: Использование groupBy с РДОМ означает, что каждая группа должна полностью вписывается в основную память!

+0

Он отлично работает ... Вы рок! :) –

Смежные вопросы