2016-03-02 3 views
1

В PySpark я хочу сделать полное внешнее соединение двух RDD с парами значений ключа, где ключи могут быть None. Например:Присоединиться к соединениям PySpark Нет значений

rdd1 = sc.parallelize([(None, "a"), (None, "b")]) 
rdd2 = sc.parallelize([(None, "c"), (None, "d")]) 
join_rdd = rdd1.join(rdd2) 

Похоже, что PySpark присоединяется к записи, где ключи не None:

print(rdd1.join(rdd2).take(10)) 
>>> [(None, ('a', 'c')), (None, ('a', 'd')), (None, ('b', 'c')), (None, ('b', 'd'))] 

Однако в SQL, когда я соединить две таблицы:

Table1: Table2: 
key val key val 
NULL a  NULL c 
NULL b  NULL d 

SELECT * FROM Table1 JOIN Table2 ON Table1.key = Table2.key 

У меня есть пустой набор результатов.

Я полагаю, что это связано с тем, что в Python None == None является истинным, а в SQL NULL = NULL является ложным.

У меня есть два вопроса:

  1. Есть ли способ, чтобы эмулировать поведение SQL и силы PySpark не вступать на ноны?

  2. Является ли ошибка или функция? Как пользователь SQL, я ожидал, что присоединение с помощью пустых ключей ничего не возвращает. Я новичок в PySpark и ничего не нашел в документации о joinig Nones. Может быть, стоит обратить внимание на Руководство по программированию Spark?

Или я где-то ошибаюсь?

Спасибо!

ответ

1

Ваши ожидания неверны. API RDD не соответствует семантике SQL и никогда не планировался. RDD.join - это просто ссылка на хэш, связанная с portable_hash, которая предназначена для обеспечения значимого хэширования None.

Если вы хотите SQL-подобный семантику вы должны использовать Спарк SQL/Frames данных:

schema = StructType([ 
    StructField("_1", IntegerType(), True), StructField("_2", StringType(), False) 
]) 

df1 = sqlContext.createDataFrame(rdd1, schema) 
df2 = sqlContext.createDataFrame(rdd2, schema) 
df1.join(df2, ["_1"]) 

Если вы хотите, чтобы достичь подобного результата на РДУ вы отфильтровывать None ключи перед тем join:

rdd1.filter(lambda x: x[0] is not None).join(rdd2)