2016-04-06 2 views
2

Как эффективно объединить/объединить несколько Spark DataFrames (Scala)? Я хочу присоединиться к столбцу, который является общим для всех таблиц, «Дата» ниже и получить (вроде) разреженный массив в результате.Как объединить объединение нескольких DataFrames в Spark Scala Эффективное полное внешнее соединение

Data Set A: 
Date Col A1 Col A2 
----------------------- 
1/1/16 A11  A21 
1/2/16 A12  A22 
1/3/16 A13  A23 
1/4/16 A14  A24 
1/5/16 A15  A25 

Data Set B: 
Date Col B1 Col B2 
----------------------- 
1/1/16 B11  B21 
1/3/16 B13  B23 
1/5/16 B15  B25 

Data Set C: 
Date Col C1 Col C2 
----------------------- 
1/2/16 C12  C22 
1/3/16 C13  C23 
1/4/16 C14  C24 
1/5/16 C15  C25 

Expected Result Set: 
Date Col A1 Col A2 Col B1 Col B2 Col C1 Col C2 
--------------------------------------------------------- 
1/1/16 A11  A21  B11  B12 
1/2/16 A12  A22      C12  C22 
1/3/16 A13  A23  B13  B23  C13  C23 
1/4/16 A14  A24      C14  C24 
1/5/16 A15  A25  B15  B25  C15  C25 

Это похоже на полное внешнее соединение на нескольких столах, но я не уверен. Есть ли более простой/эффективный способ получить этот разреженный массив без метода Join на DataFrames?

ответ

2

Это старый пост, поэтому я не уверен, что OP все еще настроен. В любом случае, простой способ достижения желаемого результата - через cogroup(). Поверните каждый RDD в [K,V] RDD с указанием даты, а затем используйте cogroup. Вот пример:

def mergeFrames(sc: SparkContext, sqlContext: SQLContext) = { 

import sqlContext.implicits._ 

// Create three dataframes. All string types assumed. 
val dfa = sc.parallelize(Seq(A("1/1/16", "A11", "A21"), 
    A("1/2/16", "A12", "A22"), 
    A("1/3/16", "A13", "A23"), 
    A("1/4/16", "A14", "A24"), 
    A("1/5/16", "A15", "A25"))).toDF() 

val dfb = sc.parallelize(Seq(
    B("1/1/16", "B11", "B21"), 
    B("1/3/16", "B13", "B23"), 
    B("1/5/16", "B15", "B25"))).toDF() 

val dfc = sc.parallelize(Seq(
    C("1/2/16", "C12", "C22"), 
    C("1/3/16", "C13", "C23"), 
    C("1/4/16", "C14", "C24"), 
    C("1/5/16", "C15", "C25"))).toDF() 

val rdda = dfa.rdd.map(row => row(0) -> row.toSeq.drop(1)) 
val rddb = dfb.rdd.map(row => row(0) -> row.toSeq.drop(1)) 
val rddc = dfc.rdd.map(row => row(0) -> row.toSeq.drop(1)) 

val schema = StructType("date a1 a2 b1 b2 c1 c2".split(" ").map(fieldName => StructField(fieldName, StringType))) 

// Form cogroups. `date` is assumed to be a key so there's at most one row for each date in an rdd/df 
val cg: RDD[Row] = rdda.cogroup(rddb, rddc).map { case (k, (v1, v2, v3)) => 
    val cols = Seq(k) ++ 
    (if (v1.nonEmpty) v1.head else Seq(null, null)) ++ 
    (if (v2.nonEmpty) v2.head else Seq(null, null)) ++ 
    (if (v3.nonEmpty) v3.head else Seq(null, null)) 
    Row.fromSeq(cols) 
} 

// Turn RDD back to DataFrame 
val cgdf = sqlContext.createDataFrame(cg, schema).sort("date") 

cgdf.show } 
+0

Не могли бы вы рассказать о том, как это будет работать? – banncee

+0

Я отредактировал свой ответ и добавил пример кода. Надеюсь, это поможет. –

Смежные вопросы