Если предположить, что имена столбцов совпадают со значениями в currency
колонке:
import org.apache.spark.sql.functions.{lit, col, coalesce}
import org.apache.spark.sql.Column
// Dummy data
val df = sc.parallelize(Seq(
(49.5, "EUR", 99, 79, 69), (100.0, "GBP", 80, 120, 50)
)).toDF("paid", "currency", "EUR", "USD", "GBP")
// A list of available currencies
val currencies: List[String] = List("EUR", "USD", "GBP")
// Select listed value
val listedPrice: Column = coalesce(
currencies.map(c => when($"currency" === c, col(c)).otherwise(lit(null))): _*)
df.select($"*", (listedPrice - $"paid").alias("difference")).show
// +-----+--------+---+---+---+----------+
// | paid|currency|EUR|USD|GBP|difference|
// +-----+--------+---+---+---+----------+
// | 49.5| EUR| 99| 79| 69| 49.5|
// |100.0| GBP| 80|120| 50| -50.0|
// +-----+--------+---+---+---+----------+
с SQL эквивалентом listedPrice
выражения нечто вроде этого:
COALESCE(
CASE WHEN (currency = 'EUR') THEN EUR ELSE null,
CASE WHEN (currency = 'USD') THEN USD ELSE null,
CASE WHEN (currency = 'GBP') THEN GBP ELSE null
)
Альтернатива использованием foldLeft
:
import org.apache.spark.sql.functions.when
val listedPriceViaFold = currencies.foldLeft(
lit(null))((acc, c) => when($"currency" === c, col(c)).otherwise(acc))
df.select($"*", (listedPriceViaFold - $"paid").alias("difference")).show
// +-----+--------+---+---+---+----------+
// | paid|currency|EUR|USD|GBP|difference|
// +-----+--------+---+---+---+----------+
// | 49.5| EUR| 99| 79| 69| 49.5|
// |100.0| GBP| 80|120| 50| -50.0|
// +-----+--------+---+---+---+----------+
, где listedPriceViaFold
переводит к следующему SQL:
CASE
WHEN (currency = 'GBP') THEN GBP
ELSE CASE
WHEN (currency = 'USD') THEN USD
ELSE CASE
WHEN (currency = 'EUR') THEN EUR
ELSE null
К сожалению, я не знаю ни встроенных функций, которые могли бы выразить непосредственно SQL, как этот
CASE currency
WHEN 'EUR' THEN EUR
WHEN 'USD' THEN USD
WHEN 'GBP' THEN GBP
ELSE null
END
, но вы можете использовать эту конструкцию в необработанном SQL.
Это мое предположение неверно, вы можете просто добавить сопоставление между именем столбца и значением в столбце currency
.
Edit:
Другой вариант, который может быть эффективным, если источник поддерживает предикат раскрывающееся вниз и эффективную обрезку столбцов, является подмножество от валюты и объединения:
currencies.map(
// for each currency filter and add difference
c => df.where($"currency" === c).withColumn("difference", $"paid" - col(c))
).reduce((df1, df2) => df1.unionAll(df2)) // Union
Это эквивалентно SQL как это:
SELECT *, EUR - paid AS difference FROM df WHERE currency = 'EUR'
UNION ALL
SELECT *, USD - paid AS difference FROM df WHERE currency = 'USD'
UNION ALL
SELECT *, GBP - paid AS difference FROM df WHERE currency = 'GBP'
Метод coalesce() Мне нравится и отлично работает, много работы для Spark, но все будет хорошо! Спасибо за это! – TomTom101
Добро пожаловать. Я добавил еще одно решение. – zero323
Этот второй неплохой, умный способ использования союза. – mehmetminanc