2017-01-02 6 views
0

Spark SQL для меня довольно понятен. Тем не менее, я только начинаю с искрового RDD API. Как spark apply function to columns in parallel подчеркивает, что это должно позволить мне избавиться от медленных тасует дляискрообразование искры-SQL в RDD API

def handleBias(df: DataFrame, colName: String, target: String = this.target) = { 
    val w1 = Window.partitionBy(colName) 
    val w2 = Window.partitionBy(colName, target) 

    df.withColumn("cnt_group", count("*").over(w2)) 
     .withColumn("pre2_" + colName, mean(target).over(w1)) 
     .withColumn("pre_" + colName, coalesce(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D))) 
     .drop("cnt_group") 
    } 
} 

В псевдокоде: df foreach column (handleBias(column) Таким образом, минимальный фрейм данных загружается до

val input = Seq(
    (0, "A", "B", "C", "D"), 
    (1, "A", "B", "C", "D"), 
    (0, "d", "a", "jkl", "d"), 
    (0, "d", "g", "C", "D"), 
    (1, "A", "d", "t", "k"), 
    (1, "d", "c", "C", "D"), 
    (1, "c", "B", "C", "D") 
) 
    val inputDf = input.toDF("TARGET", "col1", "col2", "col3TooMany", "col4") 

но не может корректно отобразить

val rdd1_inputDf = inputDf.rdd.flatMap { x => {(0 until x.size).map(idx => (idx, x(idx)))}} 
     rdd1_inputDf.toDF.show 

Это терпит неудачу с

java.lang.ClassNotFoundException: scala.Any 
java.lang.ClassNotFoundException: scala.Any 

Пример: https://github.com/geoHeil/sparkContrastCodinghttps://github.com/geoHeil/sparkContrastCoding/blob/master/src/main/scala/ColumnParallel.scala для проблемы, изложенной в этом вопросе.

ответ

2

Когда вы звоните .rdd на номер DataFrame, вы получаете RDD[Row], который не является строго типизированным. Если вы хотите, чтобы иметь возможность сопоставить по элементам вам необходимо сопоставление с образцом над :

scala> val input = Seq(
    |  (0, "A", "B", "C", "D"), 
    |  (1, "A", "B", "C", "D"), 
    |  (0, "d", "a", "jkl", "d"), 
    |  (0, "d", "g", "C", "D"), 
    |  (1, "A", "d", "t", "k"), 
    |  (1, "d", "c", "C", "D"), 
    |  (1, "c", "B", "C", "D") 
    | ) 
input: Seq[(Int, String, String, String, String)] = List((0,A,B,C,D), (1,A,B,C,D), (0,d,a,jkl,d), (0,d,g,C,D), (1,A,d,t,k), (1,d,c,C,D), (1,c,B,C,D)) 

scala> val inputDf = input.toDF("TARGET", "col1", "col2", "col3TooMany", "col4") 
inputDf: org.apache.spark.sql.DataFrame = [TARGET: int, col1: string ... 3 more fields] 

scala> import org.apache.spark.sql.Row 
import org.apache.spark.sql.Row 

scala> val rowRDD = inputDf.rdd 
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at rdd at <console>:27 

scala> val typedRDD = rowRDD.map{case Row(a: Int, b: String, c: String, d: String, e: String) => (a,b,c,d,e)} 
typedRDD: org.apache.spark.rdd.RDD[(Int, String, String, String, String)] = MapPartitionsRDD[20] at map at <console>:29 

scala> typedRDD.keyBy(_._1).groupByKey.foreach{println} 
[Stage 7:>               (0 + 0)/4] 
(0,CompactBuffer((A,B,C,D), (d,a,jkl,d), (d,g,C,D))) 
(1,CompactBuffer((A,B,C,D), (A,d,t,k), (d,c,C,D), (c,B,C,D))) 

В противном случае вы можете использовать набранный Dataset:

scala> val ds = input.toDS 
ds: org.apache.spark.sql.Dataset[(Int, String, String, String, String)] = [_1: int, _2: string ... 3 more fields] 

scala> ds.rdd 
res2: org.apache.spark.rdd.RDD[(Int, String, String, String, String)] = MapPartitionsRDD[8] at rdd at <console>:30 

scala> ds.rdd.keyBy(_._1).groupByKey.foreach{println} 
[Stage 0:>               (0 + 0)/4] 
(0,CompactBuffer((0,A,B,C,D), (0,d,a,jkl,d), (0,d,g,C,D))) 
(1,CompactBuffer((1,A,B,C,D), (1,A,d,t,k), (1,d,c,C,D), (1,c,B,C,D))) 
+1

Как я хочу использовать это в мл .Pipeline и шаг вывода - DataFrame, «схема потеряна», например Мне нужно будет использовать сопоставление шаблонов? это верно? Но есть много колонок, есть способ «вывести» их несколько (частичный shcema? –

+0

Да, преобразование 'DF => RDD' вообще не использует схему (и я не думаю, что есть хороший способ заставить его использовать). Однако посмотрите на мой новый пример «Dataset»: нет необходимости использовать посредника «Dataframe», и похоже, что «DataSet» прекрасно описывает типы (в Spark 2.0 я думаю все, что вы могли бы сделать с DF, также можно сделать с помощью DS) –

+0

@GeorgHeiler (не уверен, что вы получили уведомление о ^^^^) –

Смежные вопросы