53

У меня есть Spark DataFrame (с использованием PySpark 1.5.1) и хотел бы добавить новый столбец.Как добавить новый столбец в Spark DataFrame (используя PySpark)?

Я попытался следующие без успеха:

type(randomed_hours) # => list 

# Create in Python and transform to RDD 

new_col = pd.DataFrame(randomed_hours, columns=['new_col']) 

spark_new_col = sqlContext.createDataFrame(new_col) 

my_df_spark.withColumn("hours", spark_new_col["new_col"]) 

получил также ошибку с помощью этого:

my_df_spark.withColumn("hours", sc.parallelize(randomed_hours)) 

Так как же я могу добавить новый столбец (на основе вектора Python) в существующий DataFrame с PySpark?

ответ

101

Вы не можете добавить произвольный столбец в DataFrame в Spark. Новые столбцы могут быть созданы только с помощью литералов (другие литеральных типов описаны в How to add a constant column in a Spark DataFrame?)

from pyspark.sql.functions import lit 

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) 

df_with_x4 = df.withColumn("x4", lit(0)) 
df_with_x4.show() 

## +---+---+-----+---+ 
## | x1| x2| x3| x4| 
## +---+---+-----+---+ 
## | 1| a| 23.0| 0| 
## | 3| B|-23.0| 0| 
## +---+---+-----+---+ 

преобразования существующего столбца:

from pyspark.sql.functions import exp 

df_with_x5 = df_with_x4.withColumn("x5", exp("x3")) 
df_with_x5.show() 

## +---+---+-----+---+--------------------+ 
## | x1| x2| x3| x4|     x5| 
## +---+---+-----+---+--------------------+ 
## | 1| a| 23.0| 0| 9.744803446248903E9| 
## | 3| B|-23.0| 0|1.026187963170189...| 
## +---+---+-----+---+--------------------+ 

включали использование join:

from pyspark.sql.functions import exp 

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v")) 
df_with_x6 = (df_with_x5 
    .join(lookup, col("x1") == col("k"), "leftouter") 
    .drop("k") 
    .withColumnRenamed("v", "x6")) 

## +---+---+-----+---+--------------------+----+ 
## | x1| x2| x3| x4|     x5| x6| 
## +---+---+-----+---+--------------------+----+ 
## | 1| a| 23.0| 0| 9.744803446248903E9| foo| 
## | 3| B|-23.0| 0|1.026187963170189...|null| 
## +---+---+-----+---+--------------------+----+ 

или сгенерированы с функцией/udf:

from pyspark.sql.functions import rand 

df_with_x7 = df_with_x6.withColumn("x7", rand()) 
df_with_x7.show() 

## +---+---+-----+---+--------------------+----+-------------------+ 
## | x1| x2| x3| x4|     x5| x6|     x7| 
## +---+---+-----+---+--------------------+----+-------------------+ 
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617| 
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873| 
## +---+---+-----+---+--------------------+----+-------------------+ 

Функциональные встроенные функции (pyspark.sql.functions), которые соответствуют выражению Catalyst, обычно предпочтительнее, чем функции, определенные пользователем Python.

Если вы хотите добавить содержимое произвольного РДУ в качестве колонки можно

  • добавить row numbers to existing data frame
  • вызов zipWithIndex на РДУ и преобразовать его в кадр данных
  • присоединиться как с использованием индекса в качестве соединения ключ
+0

«Новые столбцы могут быть созданы только с использованием литералов». Что именно означают литералы в этом контексте? – timbram

35

Для добавления колонки с помощью UDF:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) 

from pyspark.sql.functions import udf 
from pyspark.sql.types import * 

def valueToCategory(value): 
    if value == 1: return 'cat1' 
    elif value == 2: return 'cat2' 
    ... 
    else: return 'n/a' 

# NOTE: it seems that calls to udf() must be after SparkContext() is called 
udfValueToCategory = udf(valueToCategory, StringType()) 
df_with_cat = df.withColumn("category", udfValueToCategory("x1")) 
df_with_cat.show() 

## +---+---+-----+---------+ 
## | x1| x2| x3| category| 
## +---+---+-----+---------+ 
## | 1| a| 23.0|  cat1| 
## | 3| B|-23.0|  n/a| 
## +---+---+-----+---------+ 
13

Для Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen')) 
+1

Нужно быть df.select ('*', (df.age + 10) .alias ('agePlusTen')) –

+0

Спасибо, и если вы введете 'df = df.select ('*', (df.age + 10) .alias ('agePlusTen')) 'вы эффективно _adding произвольный столбец, поскольку @ zero323, предупреждающий нас выше, был невозможным, если только что-то не так с этим в Spark, в Pandas это стандартный способ. – cardamom

+0

Есть ли версия этого для pySpark? – Tagar

-1

Вы можете определить новый udf при добавлении column_name:

u_f = F.udf(lambda :yourstring,StringType()) 
a.select(u_f().alias('column_name') 
-1
from pyspark.sql.functions import udf 
from pyspark.sql.types import * 
func_name = udf(
    lambda val: val, # do sth to val 
    StringType() 
) 
df.withColumn('new_col', func_name(df.old_col)) 
+0

Вам нужно вызвать 'StringType()'. – gberger

0

Я хотел бы предложить обобщенный пример для очень похожего случая использования:

Вариант использования: У меня есть csv, состоящий из:

First|Third|Fifth 
data|data|data 
data|data|data 
...billion more lines 

Мне нужно выполнить некоторые преобразования и конечный CSV должен выглядеть

First|Second|Third|Fourth|Fifth 
data|null|data|null|data 
data|null|data|null|data 
...billion more lines 

мне нужно сделать это, потому что это схема определяется некоторой моделью, и мне нужно, чтобы мои окончательные данные будут совместимы с SQL Bulk Inserts и т. д.

так:

1) Я прочитал исходный файл CSV с помощью spark.read и назовите его "ДФ".

2) Я что-то делаю с данными.

3) добавить нулевые столбцы, используя этот скрипт:

outcols = [] 
for column in MY_COLUMN_LIST: 
    if column in df.columns: 
     outcols.append(column) 
    else: 
     outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column))) 

df = df.select(outcols) 

Таким образом, вы можете структурировать вашу схему после загрузки CSV (также будет работать для изменения порядка столбцов, если вы должны сделать это для многих таблицы).