2015-12-18 6 views
2

Существует DataFrame, «rawDF» и его столбцыКак вернуть выборочно несколько строк из одной строки в Scala

time  |id1|id2|...|avg_value|max_value|min_value|std_value|range_value|.. 
10/1/2015|1 |3 |...|0.0  |0.2  |null  |null  |null  | ... 
10/2/2015|2 |3 |...|null  |null  |0.3  |0.4  |null  | ... 
10/3/2015|3 |5 |...|null  |null  |null  |0.4  |0.5  | ... 

Для каждой строки, я хотел бы вернуть несколько строк на основе этой пятерки " значения "(avg, min, max, std, range). Но, если значение равно null, я бы хотел пропустить.

Так, так, что вывод должен быть

10/1/2015|1 |3 |...|0.0 
10/1/2015|1 |3 |...|0.2 
10/2/2015|2 |3 |...|0.3 
10/2/2015|2 |3 |...|0.4 
10/3/2015|3 |5 |...|0.4 
10/3/2015|3 |5 |...|0.5 

Я не очень знаком с Scala, поэтому, я борюсь с этим.

val procRDD = rawDF.flatMap(x => for(valInd <-10 to 14) yield { 
if(x.get(valInd) != null) { ...)) } 
    }) 

Этот код включает в себя null возвращение. Итак, можете ли вы дать мне представление?

+0

Что именно делать вы подразумеваете под «сплющиванием» здесь? –

+0

Я отредактировал без термина - я мог бы использовать неправильный термин. В любом случае, если вы видите мой пример, вы можете понять, что я хотел бы сделать. – joshsuihn

+0

Вы уверены, что «null» не читается как строка? – Ashalynd

ответ

0

Это мое решение. Если у вас есть лучший, дайте мне знать.

val procRDD = rawDF.flatMap(x => 
    for(valInd <-10 to 14) yield { // valInd represents column number 
    if(x.get(valInd) != null) { 
     try { Some(..) } 
     catch { case e: Exception => None} 
    }else None 
    }) 
    .filter({case Some(y) => true; case None=> false}) 
    .map(_.get) 

На самом деле, я искал filter и map и как поставить команды внутри.

1

Это немного странно требование, но до тех пор, пока вы не нуждаетесь в информации о колонке источника и все значения одного и того же типа, вы можете просто explode и падение аннулирует:

import org.apache.spark.sql.functions.{array, explode} 

val toExpand = Seq(
    "avg_value", "max_value", "min_value", "std_value", "range_value" 
) 

// Collect *_value columns into a single Array and explode 
val expanded = df.withColumn("value", explode(array(toExpand.map(col): _*))) 

val result = toExpand 
    .foldLeft(expanded)((df, c) => df.drop(c)) // Drop obsolete columns 
    .na.drop(Seq("value")) // Drop rows with null value 
Смежные вопросы