2016-08-29 1 views
1

Для DataFrame легко создать новый столбец с некоторой операцией, используя udf с df.withColumn("newCol", myUDF("someCol")). Для того, чтобы сделать что-то подобное в Dataset, я думаю, я бы использовать функцию map:Функция карты на наборах данных, оптимизированных для операций над одним столбцом?

def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] 

Вы должны пройти весь корпус класса T в качестве входных данных для функции. Если у Dataset[T] много полей/столбцов, было бы очень неэффективно передавать всю строку, если вы просто хотели сделать один дополнительный столбец, работая на одном из многих столбцов T. Мой вопрос в том, является ли Catalyst достаточно умным, чтобы иметь возможность оптимизировать это?

ответ

0

Я попытался понять себя, так как не мог найти ответа нигде.

Давайте есть набор данных, который содержит классы чехол с несколькими полями:

scala> case class A(x: Int, y: Int) 
scala> val dfA = spark.createDataset[A](Seq(A(1, 2))) 
scala> val dfX = dfA.map(_.x) 

Теперь, если мы проверяем оптимизированного плана мы получаем следующее:

scala> val plan = dfX.queryExecution.optimizedPlan 

SerializeFromObject [input[0, int, true] AS value#8] 
    +- MapElements <function1>, obj#7: int 
     +- DeserializeToObject newInstance(class A), obj#6: A 
      +- LocalRelation [x#2, y#3]  

Согласно более многословным plan.toJSONDeserializeToObject шаг предполагает как x, так и y присутствовать.

Как вы доказали, например, следующий фрагмент, который использует отражение вместо прямого касания полей A, которые все еще работают.

val dfX = dfA.map(
    _.getClass.getMethods.find(_.getName == "x").get.invoke(x).asInstanceOf[Int] 
) 
0

Является ли Catalyst достаточно умны, чтобы быть в состоянии оптимизировать это?

TL; др № см SPARK-14083 Analyze JVM bytecode and turn closures into Catalyst expressions.

В настоящее время нет оптимизатора Catalyst Optimizer от Spark SQL, который знает, что вы делаете в своем коде Scala.

Цитирование SPARK-14083:

Одно большое преимущество Dataset API является безопасность типа, за счет производительности из-за сильной зависимости от определяемых пользователем укупорочных/лямбды. Эти закрытия обычно медленнее, чем выражения, потому что у нас больше гибкости для оптимизации выражений (известные типы данных, вызовы виртуальных функций и т. Д.). Во многих случаях на самом деле не очень сложно заглянуть в байтовый код этих закрытий и выяснить, что они пытаются сделать. Если мы сможем их понять, то мы можем превратить их непосредственно в выражения Catalyst для более оптимизированных исполнений.

И даже ваш упомянутый случай:

df.map(_.name) // эквивалентно выражению col("name")

Как вы можете видеть, что это по-прежнему открыта, и я сомневаюсь, что кто-то работает над этим в настоящее время.

Смежные вопросы