7

Пусть мы имеем DataFrame df, состоящий из следующих столбцов:DataFrame/Dataset группеПо поведение/оптимизация

Имя, Фамилия, размер, ширина, длина, взвешивают

Теперь мы хотим, чтобы выполнить пара операций, например, мы хотим создать пару DataFrames, содержащих данные о размере и ширине.

val df1 = df.groupBy("surname").agg(sum("size")) 
val df2 = df.groupBy("surname").agg(sum("width")) 

, как вы можете заметить, другие столбцы, такие как длина, нигде не используются. Является ли Spark достаточно умным, чтобы сбросить избыточные столбцы перед фазой перетасовки или они переносятся? Wil работает:

val dfBasic = df.select("surname", "size", "width") 

до группировки как-то влияет на производительность?

+1

Спарк выбирает столбцы, он попросил его группе на. Вы можете использовать объяснение, чтобы получить физический план вашего запроса – eliasah

ответ

22

Да, это «достаточно умный». groupBy, выполненный на DataFrame, - это не та же операция, что и groupBy, выполненная на равнине RDD. В описанном вами сценарии нет необходимости переводить необработанные данные вообще. Давайте создадим небольшой пример, чтобы проиллюстрировать, что:

val df = sc.parallelize(Seq(
    ("a", "foo", 1), ("a", "foo", 3), ("b", "bar", 5), ("b", "bar", 1) 
)).toDF("x", "y", "z") 

df.groupBy("x").agg(sum($"z")).explain 

// == Physical Plan == 
// *HashAggregate(keys=[x#148], functions=[sum(cast(z#150 as bigint))]) 
// +- Exchange hashpartitioning(x#148, 200) 
// +- *HashAggregate(keys=[x#148], functions=[partial_sum(cast(z#150 as bigint))]) 
//  +- *Project [_1#144 AS x#148, _3#146 AS z#150] 
//   +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#144, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#145, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#146] 
//    +- Scan ExternalRDDScan[obj#143] 

Как вы первая фаза является проекция, в которой сохраняются только необходимые столбцы. Следующие данные агрегируются локально и, наконец, передаются и агрегируются по всему миру. Вы получите немного другой ответ, если используете Spark < = 1.4, но общая структура должна быть точно такой же.

Наконец визуализация ДАГ, показывающая, что описание выше описывает фактическую работу:

group by and agg DAG

Аналогично, Dataset.groupByKey с последующим reduceGroups, содержит как карту на стороне (ObjectHashAggregate с partial_reduceaggregator) и уменьшить бортовые (ObjectHashAggregate с reduceaggregator уменьшение):

case class Foo(x: String, y: String, z: Int) 

val ds = df.as[Foo] 
ds.groupByKey(_.x).reduceGroups((x, y) => x.copy(z = x.z + y.z)).explain 

// == Physical Plan == 
// ObjectHashAggregate(keys=[value#126], functions=[reduceaggregator([email protected], Some(newInstance(class $line40.$read$$iw$$iw$Foo)), Some(class $line40.$read$$iw$$iw$Foo), Some(StructType(StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false))), input[0, scala.Tuple2, true]._1 AS value#128, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) null else named_struct(x, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).x, true, false) AS x#25, y, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).y, true, false) AS y#26, z, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).z AS z#27) AS _2#129, newInstance(class scala.Tuple2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).x, true, false) AS x#25, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).y, true, false) AS y#26, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).z AS z#27, StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false), true, 0, 0)]) 
// +- Exchange hashpartitioning(value#126, 200) 
// +- ObjectHashAggregate(keys=[value#126], functions=[partial_reduceaggregator([email protected], Some(newInstance(class $line40.$read$$iw$$iw$Foo)), Some(class $line40.$read$$iw$$iw$Foo), Some(StructType(StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false))), input[0, scala.Tuple2, true]._1 AS value#128, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) null else named_struct(x, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).x, true, false) AS x#25, y, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).y, true, false) AS y#26, z, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).z AS z#27) AS _2#129, newInstance(class scala.Tuple2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).x, true, false) AS x#25, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).y, true, false) AS y#26, assertnotnull(assertnotnull(input[0, $line40.$read$$iw$$iw$Foo, true])).z AS z#27, StructField(x,StringType,true), StructField(y,StringType,true), StructField(z,IntegerType,false), true, 0, 0)]) 
//  +- AppendColumns <function1>, newInstance(class $line40.$read$$iw$$iw$Foo), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#126] 
//   +- *Project [_1#4 AS x#8, _2#5 AS y#9, _3#6 AS z#10] 
//    +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#5, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#6] 
//    +- Scan ExternalRDDScan[obj#3] 

groupByKey + reduceGroups

Однако другие методы KeyValueGroupedDataset могут работать аналогично RDD.groupByKey. Например, mapGroups (или flatMapGroups) не использует частичную агрегацию.

ds.groupByKey(_.x) 
    .mapGroups((_, iter) => iter.reduce((x, y) => x.copy(z = x.z + y.z))) 
    .explain 

//== Physical Plan == 
//*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, $line15.$read$$iw$$iw$Foo, true]).x, true, false) AS x#37, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, $line15.$read$$iw$$iw$Foo, true]).y, true, false) AS y#38, assertnotnull(input[0, $line15.$read$$iw$$iw$Foo, true]).z AS z#39] 
//+- MapGroups <function2>, value#32.toString, newInstance(class $line15.$read$$iw$$iw$Foo), [value#32], [x#8, y#9, z#10], obj#36: $line15.$read$$iw$$iw$Foo 
// +- *Sort [value#32 ASC NULLS FIRST], false, 0 
//  +- Exchange hashpartitioning(value#32, 200) 
//   +- AppendColumns <function1>, newInstance(class $line15.$read$$iw$$iw$Foo), [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#32] 
//   +- *Project [_1#4 AS x#8, _2#5 AS y#9, _3#6 AS z#10] 
//    +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._2, true, false) AS _2#5, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#6] 
//     +- Scan ExternalRDDScan[obj#3] 

groupByKey + mapGroups

+2

@Niemand я предлагаю прочитать [эту статью] (https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls -catalyst-optimizer.html) относительно катализатора – eliasah

+0

@AB Хорошо, как сказано в ответе, нет! Эта группа не работает так же, как группа по функциям на уровне RDD. – eliasah

+0

@eliasah спасибо за информацию, я попытался найти и прочитать любой источник, объясняющий перетасовку между узлами производительности и распределения этих операций DataFrame (особенно) и RDD над узлами, но может найти, все, что дано, является примером и выводами.вы можете руководствоваться любым курсом, который преподает такие концепции (например, groupbyKey in rdd стоит дорого, а groupby в DF - нет) –

Смежные вопросы