2016-05-19 3 views
9

Я пытаюсь объединить два PySpark dataframes с некоторыми колоннами, которые только на каждом из них:Объединить два PySpark dataframes

from pyspark.sql.functions import randn, rand 

df_1 = sqlContext.range(0, 10) 

+--+ 
|id| 
+--+ 
| 0| 
| 1| 
| 2| 
| 3| 
| 4| 
| 5| 
| 6| 
| 7| 
| 8| 
| 9| 
+--+ 

df_2 = sqlContext.range(11, 20) 

+--+ 
|id| 
+--+ 
| 10| 
| 11| 
| 12| 
| 13| 
| 14| 
| 15| 
| 16| 
| 17| 
| 18| 
| 19| 
+--+ 

df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")) 
df_2 = df_2.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2")) 

и теперь я хочу, чтобы создать третью dataframe. Я хотел бы что-то вроде панд concat:

df_1.show() 
+---+--------------------+--------------------+ 
| id|    uniform|    normal| 
+---+--------------------+--------------------+ 
| 0| 0.8122802274304282| 1.2423430583597714| 
| 1| 0.8642043127063618| 0.3900018344856156| 
| 2| 0.8292577771850476| 1.8077401259195247| 
| 3| 0.198558705368724| -0.4270585782850261| 
| 4|0.012661361966674889| 0.702634599720141| 
| 5| 0.8535692890157796|-0.42355804115129153| 
| 6| 0.3723296190171911| 1.3789648582622995| 
| 7| 0.9529794127670571| 0.16238718777444605| 
| 8| 0.9746632635918108| 0.02448061333761742| 
| 9| 0.513622008243935| 0.7626741803250845| 
+---+--------------------+--------------------+ 

df_2.show() 
+---+--------------------+--------------------+ 
| id|    uniform|   normal_2| 
+---+--------------------+--------------------+ 
| 11| 0.3221262660507942| 1.0269298899109824| 
| 12| 0.4030672316912547| 1.285648175568798| 
| 13| 0.9690555459609131|-0.22986601831364423| 
| 14|0.011913836266515876| -0.678915153834693| 
| 15| 0.9359607054250594|-0.16557488664743034| 
| 16| 0.45680471157575453| -0.3885563551710555| 
| 17| 0.6411908952297819| 0.9161177183227823| 
| 18| 0.5669232696934479| 0.7270125277020573| 
| 19| 0.513622008243935| 0.7626741803250845| 
+---+--------------------+--------------------+ 

#do some concatenation here, how? 

df_concat.show() 

| id|    uniform|    normal| normal_2 | 
+---+--------------------+--------------------+------------+ 
| 0| 0.8122802274304282| 1.2423430583597714| None  | 
| 1| 0.8642043127063618| 0.3900018344856156| None  | 
| 2| 0.8292577771850476| 1.8077401259195247| None  | 
| 3| 0.198558705368724| -0.4270585782850261| None  | 
| 4|0.012661361966674889| 0.702634599720141| None  | 
| 5| 0.8535692890157796|-0.42355804115129153| None  | 
| 6| 0.3723296190171911| 1.3789648582622995| None  | 
| 7| 0.9529794127670571| 0.16238718777444605| None  | 
| 8| 0.9746632635918108| 0.02448061333761742| None  | 
| 9| 0.513622008243935| 0.7626741803250845| None  | 
| 11| 0.3221262660507942| None    | 0.123  | 
| 12| 0.4030672316912547| None    |0.12323  | 
| 13| 0.9690555459609131| None    |0.123  | 
| 14|0.011913836266515876| None    |0.18923  | 
| 15| 0.9359607054250594| None    |0.99123  | 
| 16| 0.45680471157575453| None    |0.123  | 
| 17| 0.6411908952297819| None    |1.123  | 
| 18| 0.5669232696934479| None    |0.10023  | 
| 19| 0.513622008243935| None    |0.916332123 | 
+---+--------------------+--------------------+------------+ 

возможно ли это?

ответ

10
df_concat = df_1.union(df_2) 

В dataframes, возможно, должны иметь одинаковые столбцы, и в этом случае вы можете использовать withColumn() для создания normal_1 и normal_2

+0

Спасибо. Проблема, как я сказал выше, заключается в том, что столбцы не идентичны между двумя файлами данных. – Ivan

15

Может быть, вы можете попробовать создать столбцы unexisting и вызова union (unionAll для Спарк 1.6 или ниже):

cols = ['id', 'uniform', 'normal', 'normal_2']  

df_1_new = df_1.withColumn("normal_2", lit(None)).select(cols) 
df_2_new = df_2.withColumn("normal", lit(None)).select(cols) 

result = df_1_new.union(df_2_new) 
+1

'unionAll()' устарел от искры 2.0. используйте 'union()' вместо –

+0

вы можете переименовать, используя 'withColumnRenamed' https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=selectexpr#pyspark.sql.DataFrame.withColumnRenamed – Hunle

3

Вот один из способов сделать это, в случае, если он по-прежнему полезно: Я побежал это в pyspark оболочке Python версии 2.7.12 и мой Спарк установки была версия 2.0.1.

PS: Я думаю, вы хотели использовать разные семена для df_1 df_2, а приведенный ниже код отражает это.

from pyspark.sql.types import FloatType 
from pyspark.sql.functions import randn, rand 
import pyspark.sql.functions as F 

df_1 = sqlContext.range(0, 10) 
df_2 = sqlContext.range(11, 20) 
df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")) 
df_2 = df_2.select("id", rand(seed=11).alias("uniform"), randn(seed=28).alias("normal_2")) 

def get_uniform(df1_uniform, df2_uniform): 
    if df1_uniform: 
     return df1_uniform 
    if df2_uniform: 
     return df2_uniform 

u_get_uniform = F.udf(get_uniform, FloatType()) 

df_3 = df_1.join(df_2, on = "id", how = 'outer').select("id", u_get_uniform(df_1["uniform"], df_2["uniform"]).alias("uniform"), "normal", "normal_2").orderBy(F.col("id")) 

Вот выходы я получаю:

df_1.show() 
+---+-------------------+--------------------+ 
| id|   uniform|    normal| 
+---+-------------------+--------------------+ 
| 0|0.41371264720975787| 0.5888539012978773| 
| 1| 0.7311719281896606| 0.8645537008427937| 
| 2| 0.1982919638208397| 0.06157382353970104| 
| 3|0.12714181165849525| 0.3623040918178586| 
| 4| 0.7604318153406678|-0.49575204523675975| 
| 5|0.12030715258495939| 1.0854146699817222| 
| 6|0.12131363910425985| -0.5284523629183004| 
| 7|0.44292918521277047| -0.4798519469521663| 
| 8| 0.8898784253886249| -0.8820294772950535| 
| 9|0.03650707717266999| -2.1591956435415334| 
+---+-------------------+--------------------+ 

df_2.show() 
+---+-------------------+--------------------+ 
| id|   uniform|   normal_2| 
+---+-------------------+--------------------+ 
| 11| 0.1982919638208397| 0.06157382353970104| 
| 12|0.12714181165849525| 0.3623040918178586| 
| 13|0.12030715258495939| 1.0854146699817222| 
| 14|0.12131363910425985| -0.5284523629183004| 
| 15|0.44292918521277047| -0.4798519469521663| 
| 16| 0.8898784253886249| -0.8820294772950535| 
| 17| 0.2731073068483362|-0.15116027592854422| 
| 18| 0.7784518091224375| -0.3785563841011868| 
| 19|0.43776394586845413| 0.47700719174464357| 
+---+-------------------+--------------------+ 

df_3.show() 
+---+-----------+--------------------+--------------------+      
| id| uniform|    normal|   normal_2| 
+---+-----------+--------------------+--------------------+ 
| 0| 0.41371265| 0.5888539012978773|    null| 
| 1| 0.7311719| 0.8645537008427937|    null| 
| 2| 0.19829196| 0.06157382353970104|    null| 
| 3| 0.12714182| 0.3623040918178586|    null| 
| 4| 0.7604318|-0.49575204523675975|    null| 
| 5|0.120307155| 1.0854146699817222|    null| 
| 6| 0.12131364| -0.5284523629183004|    null| 
| 7| 0.44292918| -0.4798519469521663|    null| 
| 8| 0.88987845| -0.8820294772950535|    null| 
| 9|0.036507078| -2.1591956435415334|    null| 
| 11| 0.19829196|    null| 0.06157382353970104| 
| 12| 0.12714182|    null| 0.3623040918178586| 
| 13|0.120307155|    null| 1.0854146699817222| 
| 14| 0.12131364|    null| -0.5284523629183004| 
| 15| 0.44292918|    null| -0.4798519469521663| 
| 16| 0.88987845|    null| -0.8820294772950535| 
| 17| 0.27310732|    null|-0.15116027592854422| 
| 18| 0.7784518|    null| -0.3785563841011868| 
| 19| 0.43776396|    null| 0.47700719174464357| 
+---+-----------+--------------------+--------------------+ 
0

Это должно сделать это для вас ...

from pyspark.sql.types import FloatType 
from pyspark.sql.functions import randn, rand, lit, coalesce, col 
import pyspark.sql.functions as F 

df_1 = sqlContext.range(0, 6) 
df_2 = sqlContext.range(3, 10) 
df_1 = df_1.select("id", lit("old").alias("source")) 
df_2 = df_2.select("id") 

df_1.show() 
df_2.show() 
df_3 = df_1.alias("df_1").join(df_2.alias("df_2"), df_1.id == df_2.id, "outer")\ 
    .select(\ 
    [coalesce(df_1.id, df_2.id).alias("id")] +\ 
    [col("df_1." + c) for c in df_1.columns if c != "id"])\ 
    .sort("id") 
df_3.show() 
0

Над ответами очень элегантны. Я написал эту функцию долго назад, где я также пытался объединить два кадра данных с разными столбцами.

Предположим, у вас есть dataframe sdf1 и sdf2

from pyspark.sql import functions as F 
from pyspark.sql.types import * 

def unequal_union_sdf(sdf1, sdf2): 
    s_df1_schema = set((x.name, x.dataType) for x in sdf1.schema) 
    s_df2_schema = set((x.name, x.dataType) for x in sdf2.schema) 

    for i,j in s_df2_schema.difference(s_df1_schema): 
     sdf1 = sdf1.withColumn(i,F.lit(None).cast(j)) 

    for i,j in s_df1_schema.difference(s_df2_schema): 
     sdf2 = sdf2.withColumn(i,F.lit(None).cast(j)) 

    common_schema_colnames = sdf1.columns 
    sdk = \ 
     sdf1.select(common_schema_colnames).union(sdf2.select(common_schema_colnames)) 
    return sdk 

sdf_concat = unequal_union_sdf(sdf1, sdf2)