2017-02-22 5 views
0

Спарк DataFrame ДФ дается со схемой:Спарк DataFrame CROSS ОТНОСИТЬСЯ для колонн деагрегация

id, agg_values 
432, 11 3.14 45 4.322 
984, 1 9.22 45 22.17 

Мне нужно производить "deaggdegated" колонки:

id, item_id, value 
432, 11, 3.14 
432, 45, 4.322 
984, 1, 98.22 
984, 45, 222.1 

Синтаксический функции:

private def parse_agg_scores(line: String): List[(String, String)] = { 
    val items: Array[String] = line.split(' ') 
    val ids = Iterator.from(0, 2).takeWhile(_ < items.size).map(items(_)) 
    val scores = Iterator.from(1, 2).takeWhile(_ < items.size).map(items(_)) 
    ids.zip(scores).toList 
    } 

Я пробовал flatMap, но он не работает:

val res = df.flatMap{ row => 
     val line = row.getString(1) 

     parse_agg_scores(line) 
} 
+0

Пожалуйста, не просто сказать: «это не работает», не говоря в том, что это не работает. Ошибка времени компиляции, ошибка времени выполнения, неожиданный вывод (и если да, что вы получили и чего вы ожидали)? –

ответ

0

Это была ошибка компиляции времени, связанные с не импортируя sqlContext.implicits._ Это решение, которое я придумал:

def unpivote_scores(df: DataFrame, colName: String, sqlContext: SQLContext): DataFrame = { 
    import sqlContext.implicits._ 

    df.flatMap { row => 
     val video_id = row.getInt(0) 
     val features = row.getString(1) 

     val items: Array[String] = features.split(' ') 
     val ids_string = Iterator.from(0, 2).takeWhile(_ < items.size).map(items(_)) 
     val scores_string = Iterator.from(1, 2).takeWhile(_ < items.size).map(items(_)) 

     val ids = ids_string.map(_.toInt) 
     val scores = scores_string.map(_.toDouble) 

     ids.zip(scores).toList.map(t => (video_id, t._1, t._2)) 
    }.toDF("VideoId", "FeatureId", "Score") 
    } 
2

Вы можете создать новый столбец с 4-элементными массивами разделить на 2-элементные массивы и explode на этой колонке - и затем разделить эти 2-элементные массивы в отдельные столбцы:

val result = dataFrame.withColumn("tuples", explode(array(
    array($"agg_values"(0), $"agg_values"(1)), 
    array($"agg_values"(2), $"agg_values"(3)) 
))) 
    .select($"id", $"tuples"(0) as "item_id", $"tuples"(1) as "value") 

result.show() 
// +---+-------+-----+ 
// | id|item_id|value| 
// +---+-------+-----+ 
// |432| 11.0| 3.14| 
// |432| 45.0|4.322| 
// |984| 1.0| 9.22| 
// |984| 45.0|22.17| 
// +---+-------+-----+ 

EDIT в случае, если каждая запись может иметь различное число «пар» в agg_values, нам понадобится UDF, чтобы преобразовать массив в массив утра пар перед взрывом:

// UDF to turn an array of Doubles into an Array of 2-item Arrays 
val groupPairs = udf { 
    arr: mutable.WrappedArray[Double] => arr.grouped(2).toArray 
} 

val result = dataFrame 
    .withColumn("pair", explode(groupPairs($"agg_values"))) 
    .select($"id", $"pair"(0) as "item_id", $"pair"(1) as "value") 
+0

Это интересное решение, но длина пар (item_id, value) неизвестна. Поэтому я не могу использовать индексы типа $ agg_values ​​(1) и (2). –

+0

Я вижу. Итак, мы знаем, что 'agg_values' состоит из таких« пар »(т. Е. Его длина является четным числом), но мы не знаем, сколько таких пар существует? И каждая запись может содержать различное количество пар? –

+1

Обновлен мой ответ - надеюсь, что это поможет. –