2017-01-18 4 views
16

Я хотел бы включить null значения в Apache Spark join. Spark не включает строки с нулевым значением по умолчанию.Включая нулевые значения в Apache Spark Join

Это поведение по умолчанию Spark.

val numbersDf = Seq(
    ("123"), 
    ("456"), 
    (null), 
    ("") 
).toDF("numbers") 

val lettersDf = Seq(
    ("123", "abc"), 
    ("456", "def"), 
    (null, "zzz"), 
    ("", "hhh") 
).toDF("numbers", "letters") 

val joinedDf = numbersDf.join(lettersDf, Seq("numbers")) 

Вот выход joinedDf.show():

+-------+-------+ 
|numbers|letters| 
+-------+-------+ 
| 123| abc| 
| 456| def| 
|  | hhh| 
+-------+-------+ 

Это выход я хотел бы:

+-------+-------+ 
|numbers|letters| 
+-------+-------+ 
| 123| abc| 
| 456| def| 
|  | hhh| 
| null| zzz| 
+-------+-------+ 

ответ

26

Scala предоставляет специальный NULL безопасный оператор равенства:

numbersDf 
    .join(lettersDf, numbersDf("numbers") <=> lettersDf("numbers")) 
    .drop(lettersDf("numbers")) 
+-------+-------+ 
|numbers|letters| 
+-------+-------+ 
| 123| abc| 
| 456| def| 
| null| zzz| 
|  | hhh| 
+-------+-------+ 

Будьте осторожны, чтобы не использовать его с Spark 1.5 или ранее. До Spark 1.6 требовалось декартово произведение (SPARK-11111 - Быстрое нулевое безопасное соединение).

В Спарк 2.3.0 или более поздней версии вы можете использовать Column.eqNullSafe в PySpark:

numbers_df = sc.parallelize([ 
    ("123",), ("456",), (None,), ("",) 
]).toDF(["numbers"]) 

letters_df = sc.parallelize([ 
    ("123", "abc"), ("456", "def"), (None, "zzz"), ("", "hhh") 
]).toDF(["numbers", "letters"]) 

numbers_df.join(letters_df, numbers_df.numbers.eqNullSafe(letters_df.numbers)) 

и %<=>% в SparkR:

numbers_df <- createDataFrame(data.frame(numbers = c("123", "456", NA, ""))) 
letters_df <- createDataFrame(data.frame(
    numbers = c("123", "456", NA, ""), 
    letters = c("abc", "def", "zzz", "hhh") 
)) 

head(join(numbers_df, letters_df, numbers_df$numbers %<=>% letters_df$numbers)) 
numbers numbers letters 
1  456  456  def 
2 <NA> <NA>  zzz 
3      hhh 
4  123  123  abc 

С SQL (Спарк 2.2.0+), вы можете использовать IS NOT DISTINCT FROM:

SELECT * FROM numbers JOIN letters 
ON numbers.numbers IS NOT DISTINCT FROM letters.numbers 

Это может использоваться с DataFrame API, а также:

numbersDf.alias("numbers") 
    .join(lettersDf.alias("letters")) 
    .where("numbers.numbers IS NOT DISTINCT FROM letters.numbers") 
+2

Спасибо. [Это еще один хороший ответ] (http://stackoverflow.com/questions/31240148/spark-specify-multiple-column-conditions-for-dataframe-join), который использует оператор '<=>'. Если вы выполняете объединение с несколькими столбцами, условия могут быть скованы с помощью оператора '&&'. – Powers

5
val numbers2 = numbersDf.withColumnRenamed("numbers","num1") //rename columns so that we can disambiguate them in the join 
val letters2 = lettersDf.withColumnRenamed("numbers","num2") 
val joinedDf = numbers2.join(letters2, $"num1" === $"num2" || ($"num1".isNull && $"num2".isNull) ,"outer") 
joinedDf.select("num1","letters").withColumnRenamed("num1","numbers").show //rename the columns back to the original names 
Смежные вопросы