13

Я пытаюсь использовать API Spark Dataset, но у меня возникают некоторые проблемы, делающие простое соединение.API-интерфейс Spark Dataset - присоединяется

Скажем, у меня есть два набора данных с полями: date | value, то в случае DataFrame моей присоединиться будет выглядеть так:

val dfA : DataFrame 
val dfB : DataFrame 

dfA.join(dfB, dfB("date") === dfA("date")) 

Однако для Dataset есть метод .joinWith, но тот же подход не работает :

val dfA : Dataset 
val dfB : Dataset 

dfA.joinWith(dfB, ?) 

Каков аргумент, который требуется .joinWith?

ответ

19

Чтобы использовать joinWith, сначала вам нужно создать DataSet, и, скорее всего, два из них. Чтобы создать DataSet, вам необходимо создать класс case, который соответствует вашей схеме, и вызвать DataFrame.as[T], где T - ваш класс case. Итак:

case class KeyValue(key: Int, value: String) 
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value") 
val ds = df.as[KeyValue] 
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string] 

Вы также можете пропустить класс случая и использовать кортеж:

val tupDs = df.as[(Int,String)] 
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string] 

Тогда, если у вас другой случай класс/DF, так сказать:

case class Nums(key: Int, num1: Double, num2: Long) 
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2") 
val ds2 = df2.as[Nums] 
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint] 

Тогда , тогда как синтаксис join и joinWith аналогичны, результаты разные:

df.join(df2, df.col("key") === df2.col("key")).show 
// +---+-----+---+----+----+ 
// |key|value|key|num1|num2| 
// +---+-----+---+----+----+ 
// | 1| asdf| 1| 7.7| 101| 
// | 2|34234| 2| 1.2| 10| 
// +---+-----+---+----+----+ 

ds.joinWith(ds2, df.col("key") === df2.col("key")).show 
// +---------+-----------+ 
// |  _1|   _2| 
// +---------+-----------+ 
// | [1,asdf]|[1,7.7,101]| 
// |[2,34234]| [2,1.2,10]| 
// +---------+-----------+ 

Как вы можете видеть, joinWith оставляет объекты неповрежденными как части кортежа, а join выравнивает столбцы в единое пространство имен. (. Что может вызвать проблемы в приведенном выше случае, так как «ключ» имя столбца повторяется)

Как ни странно, я должен использовать df.col("key") и df2.col("key"), чтобы создать условия для присоединения ds и ds2 - если вы используете только col("key") с обеих сторон он не работает, а ds.col(...) не существует. Однако использование оригинала df.col("key") делает трюк.

+3

подробное объяснение. Только одна путаница. Есть ли лучший способ написать типизированное условие соединения. например. df.col («ключ») может ли мы иметь что-то более безопасное типа, которое может разрешить правильность «ключа» во время компиляции. –

+5

Я полностью согласен, на основе этого синтаксиса нет смысла создавать Dataset, так где же преимущество? Я не могу понять, что нет типичной альтернативы. Такая жалость! – Sparky

2

В приведенном выше примере вы можете попробовать ниже вариант -

  • Определить класс случая для выхода

    case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)

  • Соедините два набора данных с помощью «Seq (« ключ »)», это поможет избежать двух повторяющихся столбцов ключа на выходе.Которая поможет применить класс случая или получать данные в следующем шаге

    ds.join(ds2, Seq("key")).as[JoinOutput] res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]

    scala> ds.join(ds2, Seq("key")).as[JoinOutput].show +---+-----+----+----+ |key|value|num1|num2| +---+-----+----+----+ | 1| asdf| 7.7| 101| | 2|34234| 1.2| 10| +---+-----+----+----+

+0

вы конкретно не отвечаете на вопрос, но подсказка Seq («key») помогла мне – ImDarrenG

Смежные вопросы