5

У меня большой dataframe (1,2 Гбайт более или менее) с этой структурой:Преобразование один столбец на несколько из них в Спарк Dataframe

 
+---------+--------------+------------------------------------------------------------------------------------------------------+ 
| country | date_data |             text             | 
+---------+--------------+------------------------------------------------------------------------------------------------------+ 
| "EEUU" | "2016-10-03" | "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 45ee"  | 
| "EEUU" | "2016-10-03" | "T_D: QQAA\nT_NAME: name_2\nT_IN: ind_2\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 46ee"  | 
| .  | .   | .                         | 
| .  | .   | .                         | 
| "EEUU" | "2016-10-03" | "T_D: QQWE\nT_NAME: name_300000\nT_IN: ind_65\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 47aa" | 
+---------+--------------+------------------------------------------------------------------------------------------------------+ 

Число строк 300,000 и «текст» поле является строкой приблизительно 5000 знаков.

Я хотел бы, чтобы отделить поле «текст» в этом новых полях:

 
+---------+------------+------+-------------+--------+--------+---------+--------+------+ 
| country | date_data | t_d | t_name | t_in | t_c | t_add | ...... | t_r | 
+---------+------------+------+-------------+--------+--------+---------+--------+------+ 
| EEUU | 2016-10-03 | QQWE | name_1  | ind_1 | c1ws12 | Sec_1_P | ...... | 45ee | 
| EEUU | 2016-10-03 | QQAA | name_2  | ind_2 | c1ws12 | Sec_1_P | ...... | 45ee | 
| .  | .   | . | .   | .  | .  | .  | .  |  | 
| .  | .   | . | .   | .  | .  | .  | .  |  | 
| .  | .   | . | .   | .  | .  | .  | .  |  | 
| EEUU | 2016-10-03 | QQWE | name_300000 | ind_65 | c1ws12 | Sec_1_P | ...... | 47aa | 
+---------+------------+------+-------------+--------+--------+---------+--------+------+ 

В настоящее время I'm, используя регулярные выражения, чтобы решить эту проблему. Во-первых, я пишу регулярно как выражения и создать функцию для извлечения отдельных полей из текста (90 регулярных выражений в общей сложности):

val D_text = "((?<=T_D:).*?(?=\\\\n))".r 
val NAME_text = "((?<=nT_NAME:).*?(?=\\\\n))".r 
val IN_text = "((?<=T_IN:).*?(?=\\\\n))".r 
val C_text = "((?<=T_C:).*?(?=\\\\n))".r 
val ADD_text = "((?<=T_ADD:).*?(?=\\\\n))".r 
     . 
     . 
     . 
     . 
val R_text = "((?<=T_R:).*?(?=\\\\n))".r 

//UDF function: 
def getFirst(pattern2: scala.util.matching.Regex) = udf(
      (url: String) => pattern2.findFirstIn(url) match { 
       case Some(texst_new) => texst_new 
       case None => "NULL" 
       case null => "NULL" 
      } 
    ) 

Затем я создаю новый Dataframe (tbl_separate_fields) в результате применения функции с регулярное выражение для извлечения каждого нового поля из текста.

val tbl_separate_fields = hiveDF.select(
      hiveDF("country"), 
      hiveDF("date_data"), 
      getFirst(D_text)(hiveDF("texst")).alias("t_d"), 
      getFirst(NAME_text)(hiveDF("texst")).alias("t_name"), 
      getFirst(IN_text)(hiveDF("texst")).alias("t_in"), 
      getFirst(C_text)(hiveDF("texst")).alias("t_c"), 
      getFirst(ADD_text)(hiveDF("texst")).alias("t_add"), 
          . 
          . 
          . 
          . 

     getFirst(R_text)(hiveDF("texst")).alias("t_r") 

     ) 

Наконец, я вставляю эту dataframe в таблицу улей:

tbl_separate_fields.registerTempTable("tbl_separate_fields") 
hiveContext.sql("INSERT INTO TABLE TABLE_INSERT PARTITION (date_data) SELECT * FROM tbl_separate_fields") 

Это решение действует в течение 1 часа, в течение всего dataframe, так что я хочу, чтобы оптимизировать и сократить время выполнения. Есть ли решение?

Мы используем Hadoop 2.7.1 и Apache-Спарк 1.5.1. Конфигурация для Spark:

val conf = new SparkConf().set("spark.storage.memoryFraction", "0.1") 
val sc = new SparkContext(conf) 
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 

Заранее спасибо.

EDIT DATA:

 
+---------+--------------+------------------------------------------------------------------------------------------------------+ 
| country | date_data |             text             | 
+---------+--------------+------------------------------------------------------------------------------------------------------+ 
| "EEUU" | "2016-10-03" | "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 45ee"  | 
| "EEUU" | "2016-10-03" | "T_NAME: name_2\nT_D: QQAA\nT_IN: ind_2\nT_C: c1ws12 ...........\nT_R: 46ee"       | 
| .  | .   | .                         | 
| .  | .   | .                         | 
| "EEUU" | "2016-10-03" | "T_NAME: name_300000\nT_ADD: Sec_1_P\nT_IN: ind_65\nT_C: c1ws12\n ...........\nT_R: 47aa"   | 
+---------+--------------+------------------------------------------------------------------------------------------------------+ 

ответ

1

Использование регулярных выражений в данном случае идет медленно, а также хрупким.

Если вы знаете, что все записи имеют одинаковую структуру, то есть, что все значения «текст» имеют тот же номер и заказ из «частей», следующий код будет работать (для любого количества столбцов), главным образом, воспользовавшись функцией split в org.apache.spark.sql.functions:

import org.apache.spark.sql.functions._ 

// first - split "text" column values into Arrays 
val textAsArray: DataFrame = inputDF 
    .withColumn("as_array", split(col("text"), "\n")) 
    .drop("text") 
    .cache() 

// get a sample (first row) to get column names, can be skipped if you want to hard-code them: 
val sampleText = textAsArray.first().getAs[mutable.WrappedArray[String]]("as_array").toArray 
val columnNames: Array[(String, Int)] = sampleText.map(_.split(": ")(0)).zipWithIndex 

// add Column per columnName with the right value and drop the no-longer-needed as_array column 
val withValueColumns: DataFrame = columnNames.foldLeft(textAsArray) { 
    case (df, (colName, index)) => df.withColumn(colName, split(col("as_array").getItem(index), ": ").getItem(1)) 
}.drop("as_array") 

withValueColumns.show() 
// for the sample data I created, 
// with just 4 "parts" in "text" column, this prints: 
// +-------+----------+----+------+-----+------+ 
// |country| date_data| T_D|T_NAME| T_IN| T_C| 
// +-------+----------+----+------+-----+------+ 
// | EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12| 
// | EEUU|2016-10-03|QQAA|name_2|ind_2|c1ws12| 
// +-------+----------+----+------+-----+------+ 

Альтернативно, если выше предположение не верно, вы можете использовать пользовательскую функцию, которая преобразует текстовый столбец в Map, а затем выполнять аналогичные reduceLeft оп

import sqlContext.implicits._ 

// sample data: not the same order, not all records have all columns: 
val inputDF: DataFrame = sc.parallelize(Seq(
    ("EEUU", "2016-10-03", "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12"), 
    ("EEUU", "2016-10-03", "T_D: QQAA\nT_IN: ind_2\nT_NAME: name_2") 
)).toDF("country", "date_data", "text") 

// hard-coded list of expected column names: 
val columnNames: Seq[String] = Seq("T_D", "T_NAME", "T_IN", "T_C") 

// UDF to convert text into key-value map 
val asMap = udf[Map[String, String], String] { s => 
    s.split("\n").map(_.split(": ")).map { case Array(k, v) => k -> v }.toMap 
} 


val textAsMap = inputDF.withColumn("textAsMap", asMap(col("text"))).drop("text") 

// for each column name - lookup the value in the map 
val withValueColumns: DataFrame = columnNames.foldLeft(textAsMap) { 
    case (df, colName) => df.withColumn(colName, col("textAsMap").getItem(colName)) 
}.drop("textAsMap") 

withValueColumns.show() 
// prints: 
// +-------+----------+----+------+-----+------+ 
// |country| date_data| T_D|T_NAME| T_IN| T_C| 
// +-------+----------+----+------+-----+------+ 
// | EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12| 
// | EEUU|2016-10-03|QQAA|name_2|ind_2| null| 
// +-------+----------+----+------+-----+------+ 
+0

Спасибо за ваш ответ. Как вы сказали, это решение действительно, если «знаете, что все записи имеют одинаковую структуру, т. Е. Что все« текстовые »значения имеют одинаковый номер и порядок« частей ». В нашем конкретном случае структура текста может изменяться случайным образом (порядок, количество «частей», дублирующихся частей и т. Д.). Более хороший пример этого был добавлен в конце вопроса. –

+0

Я вижу - обновленный ответ с решением для этого тоже. –

Смежные вопросы