2015-11-10 4 views
2

У меня есть dataframe с сделок с присоединяемой прейскурантом:Свечи dataframes: Извлечь столбец на основе значения другого столбца

+----------+----------+------+-------+-------+ 
| paid | currency | EUR | USD | GBP | 
+----------+----------+------+-------+-------+ 
| 49.5 | EUR | 99 | 79 | 69 | 
+----------+----------+------+-------+-------+ 

Клиент заплатил 49,5 в евро, как показано в «валюте» колонка. Теперь я хочу сравнить эту оплаченную цену с ценой из прейскуранта.

Therefor Мне нужно получить доступ к правильному колонку, основанный на стоимости «валюта», как так:

df.withColumn("saved", df.col(df.col($"currency")) - df.col("paid")) 

, который я надеялся стать

df.withColumn("saved", df.col("EUR") - df.col("paid")) 

Это не удается, однако. Я пробовал все, что мог, включая и UDF, и никуда не денусь.

Наверное, есть несколько элегантных решений для этого? Может кто-нибудь помочь здесь?

ответ

3

Если предположить, что имена столбцов совпадают со значениями в currency колонке:

import org.apache.spark.sql.functions.{lit, col, coalesce} 
import org.apache.spark.sql.Column 

// Dummy data 
val df = sc.parallelize(Seq(
    (49.5, "EUR", 99, 79, 69), (100.0, "GBP", 80, 120, 50) 
)).toDF("paid", "currency", "EUR", "USD", "GBP") 

// A list of available currencies 
val currencies: List[String] = List("EUR", "USD", "GBP") 

// Select listed value 
val listedPrice: Column = coalesce(
    currencies.map(c => when($"currency" === c, col(c)).otherwise(lit(null))): _*) 

df.select($"*", (listedPrice - $"paid").alias("difference")).show 

// +-----+--------+---+---+---+----------+ 
// | paid|currency|EUR|USD|GBP|difference| 
// +-----+--------+---+---+---+----------+ 
// | 49.5|  EUR| 99| 79| 69|  49.5| 
// |100.0|  GBP| 80|120| 50|  -50.0| 
// +-----+--------+---+---+---+----------+ 

с SQL эквивалентом listedPrice выражения нечто вроде этого:

COALESCE(
    CASE WHEN (currency = 'EUR') THEN EUR ELSE null, 
    CASE WHEN (currency = 'USD') THEN USD ELSE null, 
    CASE WHEN (currency = 'GBP') THEN GBP ELSE null 
) 

Альтернатива использованием foldLeft:

import org.apache.spark.sql.functions.when 

val listedPriceViaFold = currencies.foldLeft(
    lit(null))((acc, c) => when($"currency" === c, col(c)).otherwise(acc)) 

df.select($"*", (listedPriceViaFold - $"paid").alias("difference")).show 

// +-----+--------+---+---+---+----------+ 
// | paid|currency|EUR|USD|GBP|difference| 
// +-----+--------+---+---+---+----------+ 
// | 49.5|  EUR| 99| 79| 69|  49.5| 
// |100.0|  GBP| 80|120| 50|  -50.0| 
// +-----+--------+---+---+---+----------+ 

, где listedPriceViaFold переводит к следующему SQL:

CASE 
    WHEN (currency = 'GBP') THEN GBP 
    ELSE CASE 
    WHEN (currency = 'USD') THEN USD 
    ELSE CASE 
     WHEN (currency = 'EUR') THEN EUR 
     ELSE null 

К сожалению, я не знаю ни встроенных функций, которые могли бы выразить непосредственно SQL, как этот

CASE currency 
    WHEN 'EUR' THEN EUR 
    WHEN 'USD' THEN USD 
    WHEN 'GBP' THEN GBP 
    ELSE null 
END 

, но вы можете использовать эту конструкцию в необработанном SQL.

Это мое предположение неверно, вы можете просто добавить сопоставление между именем столбца и значением в столбце currency.

Edit:

Другой вариант, который может быть эффективным, если источник поддерживает предикат раскрывающееся вниз и эффективную обрезку столбцов, является подмножество от валюты и объединения:

currencies.map(
    // for each currency filter and add difference 
    c => df.where($"currency" === c).withColumn("difference", $"paid" - col(c)) 
).reduce((df1, df2) => df1.unionAll(df2)) // Union 

Это эквивалентно SQL как это:

SELECT *, EUR - paid AS difference FROM df WHERE currency = 'EUR' 
UNION ALL 
SELECT *, USD - paid AS difference FROM df WHERE currency = 'USD' 
UNION ALL 
SELECT *, GBP - paid AS difference FROM df WHERE currency = 'GBP' 
+1

Метод coalesce() Мне нравится и отлично работает, много работы для Spark, но все будет хорошо! Спасибо за это! – TomTom101

+0

Добро пожаловать. Я добавил еще одно решение. – zero323

+0

Этот второй неплохой, умный способ использования союза. – mehmetminanc

0

Я не могу придумать, как это делают с DataFrame с, и я сомневаюсь, что есть простой способ, но если взять эту таблицу в RDD:

// On top of my head, warn if wrong. 
// Would be more elegant with match .. case 
def d(l: (Int, String, Int, Int, Int)): Int = { 
    if(l._2 == "EUR") 
    l._3 - l._1 
    else if (l._2 == "USD") 
    l._4 - l._1 
    else 
    l._5 -l._1 
} 
val rdd = df.rdd 
val diff = rdd.map(r => (r, r(d))) 

Будет ли ошибки типа, скорее всего, поднимают , Я надеюсь, что вы можете ориентироваться вокруг них.

+0

Спасибо! Есть еще несколько валют, поэтому я хотел избежать if/else или вложенных, когда(). Else() конструирует. – TomTom101

+0

Говоря о том, что моя проблема, кажется, я не могу получить буквальное значение этой колонки $ "currency", и мне было интересно, как работает ($ "column", [then]). Мне было интересно, есть ли часть кода [там] (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Column.scala) поможет мне ?! 'lit (condition) .expr' – TomTom101

Смежные вопросы