2016-09-20 2 views
2

У меня возникли проблемы с подключением к двум фреймам данных с использованием Spark Data Frames на python. У меня есть два кадра данных, которые я должен был изменить имя столбцов, чтобы сделать их уникальными для каждого кадра данных, поэтому позже я мог бы указать, в каком столбце есть. Я сделал это, чтобы переименовать столбцы (firstDf и secondDf являются свечи DataFrames, созданные с помощью функции createDataFrame):Pyspark DataFrame - Как использовать переменные для соединения?

oldColumns = firstDf.schema.names 
newColumns = list(map(lambda x: "{}.{}".format('firstDf', x), oldColumns)) 
firstDf = firstDf.toDF(*newColumns) 

Я повторил это на второй DataFrame. Затем я попытался присоединиться к ним, используя следующий код:

from pyspark.sql.functions import * 

firstColumn = 'firstDf.firstColumn' 
secondColumn = 'secondDf.firstColumn' 
joinedDF = firstDf.join(secondDf, col(firstColumn) == col(secondColumn), 'inner') 

С его помощью, как это я получаю следующее сообщение об ошибке:

AnalysisException "cannot resolve 'firstDf.firstColumn' given input columns: [firstDf.firstColumn, ...];"

Это было только для иллюстрации, что столбец существует в массиве входных столбцов ,

Если я не переименовывать DataFrames столбцов Я могу присоединиться к ним, используя этот кусок кода:

joinedDf = firstDf.join(secondDf, firstDf.firstColumn == secondDf.firstColumn, 'inner') 

Но это дает мне DataFrame с неоднозначными именами столбцов.

Любые идеи о том, как подойти к этому?

ответ

0

Вообще говоря, не используйте точки в именах. Они имеют особое значение (могут использоваться либо для определения таблицы, либо для доступа к полям struct) и требуют некоторой дополнительной работы для правильного распознавания.

Для оборудования д соединяет все, что вам нужно, это имя столбца:

from pyspark.sql.functions import col 

firstDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn")) 
secondDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn")) 

column = 'firstColumn' 
firstDf.join(secondDf, [column], 'inner') 

## DataFrame[firstColumn: bigint, secondColumn: string, secondColumn: string] 

Для сложных случаев использования псевдонимов таблиц:

firstColumn = 'firstDf.firstColumn' 
secondColumn = 'secondDf.firstColumn' 

firstDf.alias("firstDf").join(
    secondDf.alias("secondDf"), 
    # After alias prefix resolves to table name 
    col(firstColumn) == col(secondColumn), 
    "inner" 
) 

## DataFrame[firstColumn: bigint, secondColumn: string, firstColumn: bigint, secondColumn: string] 

Вы также могли бы использовать родительские кадры непосредственно:

column = 'firstColumn' 

firstDf.join(secondDf, firstDf[column] == secondDf[column]) 
+0

Спасибо за ответ, специально для подсказки о том, чтобы не использовать точки в именах. Первый подход работает, но мне нужно, чтобы объединенный DataFrame имел уникальные имена столбцов для каждого столбца двух связанных DataFrames. Использование табличных псевдонимов, как было предложено, дает мне ту же ошибку AnalysisException, что и в вопросе. –

+0

Он должен работать нормально. Я добавил определения таблиц для полностью воспроизводимого примера. – zero323

+0

Простите, я только понял, что изменение точки заставило ее работать. Спасибо за ответ еще раз! –

Смежные вопросы