2015-12-28 3 views
3

У меня есть массив [DataFrame], и ​​я хочу проверить для каждой строки каждого фрейма данных, если есть какое-либо изменение значений по столбцу. Скажем, у меня есть первый ряд из трех кадров данных, например:Как объединить (объединить) информацию по массиву [DataFrame]

(0,1.0,0.4,0.1) 
(0,3.0,0.2,0.1) 
(0,5.0,0.4,0.1) 

Первая колонка это идентификатор, и мой идеальный выход для этого ID будет:

(0, 1, 1, 0) 

что означает, что второй и третий столбцы изменились, а третий - нет. я придаю здесь немногих данных для репликации моих настроек

val rdd = sc.parallelize(Array((0,1.0,0.4,0.1), 
           (1,0.9,0.3,0.3), 
           (2,0.2,0.9,0.2), 
           (3,0.9,0.2,0.2), 
           (4,0.3,0.5,0.5))) 
val rdd2 = sc.parallelize(Array((0,3.0,0.2,0.1), 
           (1,0.9,0.3,0.3), 
           (2,0.2,0.5,0.2), 
           (3,0.8,0.1,0.1), 
           (4,0.3,0.5,0.5))) 
val rdd3 = sc.parallelize(Array((0,5.0,0.4,0.1), 
           (1,0.5,0.3,0.3), 
           (2,0.3,0.3,0.5), 
           (3,0.3,0.3,0.1), 
           (4,0.3,0.5,0.5))) 
val df = rdd.toDF("id", "prop1", "prop2", "prop3") 
val df2 = rdd2.toDF("id", "prop1", "prop2", "prop3") 
val df3 = rdd3.toDF("id", "prop1", "prop2", "prop3") 
val result:Array[DataFrame] = new Array[DataFrame](3) 
result.update(0, df) 
result.update(1,df2) 
result.update(2,df3) 

Как можно сопоставить по массиву и получить свой выход?

ответ

2

Вы можете использовать countDistinct с groupBy:

import org.apache.spark.sql.functions.{countDistinct} 

val exprs = Seq("prop1", "prop2", "prop3") 
    .map(c => (countDistinct(c) > 1).cast("integer").alias(c)) 

val combined = result.reduce(_ unionAll _) 

val aggregatedViaGroupBy = combined 
    .groupBy($"id") 
    .agg(exprs.head, exprs.tail: _*) 

aggregatedViaGroupBy.show 
// +---+-----+-----+-----+ 
// | id|prop1|prop2|prop3| 
// +---+-----+-----+-----+ 
// | 0| 1| 1| 0| 
// | 1| 1| 0| 0| 
// | 2| 1| 1| 1| 
// | 3| 1| 1| 1| 
// | 4| 0| 0| 0| 
// +---+-----+-----+-----+ 
+0

Я выбрал этот ответ, так как он играет лучше с моими фактическими данными, но я не знаю, является ли один ответ более эффективным, чем другой, что лучше подходит для хорошего ответа. – user299791

+0

Вы всегда можете обогнать и ждать, пока не будете измерять производительность. Вообще говоря, оба требуют большого сетевого трафика. Лично я, вероятно, переписал бы ответ, предоставленный @marios, с использованием операторов SQL, не отбрасывая на RDD. – zero323

1

Сначала нам нужно объединить все DataFrames.

val combined = result.reduceLeft((a,b) => a.join(b,"id")) 

Для сравнения всех столбцов одной метки (например, «prod1»), я обнаружил, что проще (по крайней мере для меня), чтобы работать на уровне RDD. Мы преобразуем данные в (id, Seq[Double]).

val finalResults = combined.rdd.map{ 
    x => 
    (x.getInt(0), x.toSeq.tail.map(_.asInstanceOf[Double])) 
}.map{ 
    case(i,d) => 
    def checkAllEqual(l: Seq[Double]) = if(l.toSet.size == 1) 0 else 1 
    val g = d.grouped(3).toList 
    val g1 = checkAllEqual(g.map(x => x(0))) 
    val g2 = checkAllEqual(g.map(x => x(1))) 
    val g3 = checkAllEqual(g.map(x => x(2))) 
    (i, g1,g2,g3) 
}.toDF("id", "prod1", "prod2", "prod3") 

finalResults.show() 

Это будет печатать:

+---+-----+-----+-----+ 
| id|prod1|prod2|prod3| 
+---+-----+-----+-----+ 
| 0| 1| 1| 0| 
| 1| 1| 0| 0| 
| 2| 1| 1| 1| 
| 3| 1| 1| 1| 
| 4| 0| 0| 0| 
+---+-----+-----+-----+ 
+0

Вы можете просто использовать 'x.reduceLeft' вместо' x.tail.foldLeft (х .Head) '. Он также будет работать и с массивами размером 1. –

+0

хорошо пункт! позвольте мне добавить исправление. – marios

Смежные вопросы