2015-08-12 5 views
14

Я использую PySpark, и у меня есть фреймворк Spark с кучей числовых столбцов. Я хочу добавить столбец, который является суммой всех остальных столбцов.Добавить сумму столбца в качестве нового столбца в dataframe PySpark

Предположим, что у моей dataframe есть столбцы «a», «b» и «c». Я знаю, что могу это сделать:

df.withColumn('total_col', df.a + df.b + df.c) 

Проблема заключается в том, что я не хочу, чтобы напечатать каждую колонку по отдельности и добавить их, особенно, если у меня есть много столбцов. Я хочу иметь возможность сделать это автоматически или указав список имен столбцов, которые я хочу добавить. Есть ли другой способ сделать это?

+0

Это намного проще при использовании RDD, чем для данных, например. если данные представляют собой массив, представляющий строку, то вы можете сделать «RDD.map (данные лямбда: (данные, сумма (данные))). Основная причина, по которой это затруднительно с использованием блока данных искры, заключается в выяснении того, что разрешено в качестве выражения столбца в 'withColumn'. Это не очень хорошо документировано. – Paul

+0

Это не похоже на работу (PySpark 1.6.3): 'dftest.withColumn (" times ", sum ((dftest [c]> 2) .cast (" int ") для c в dftest.columns [1:])), а затем 'dftest.select ('a', 'b', 'c', 'd'). Rdd.map (lambda x: (x, sum (x))) ,take (2) ' Кажется не работает –

ответ

24

Это не было очевидно. Я не вижу суммы столбцов, определенных в искровом Dataframes API.

Version 2

Это может быть сделано в довольно простым способом:

newdf = df.withColumn('total', sum(df[col] for col in df.columns)) 

df.columns поставляется pyspark в виде списка строк, дающих все имена столбцов в Спарк Dataframe. Для другой суммы вы можете указать любой другой список имен столбцов.

Я не пробовал это как свое первое решение, потому что я не был уверен, как он себя ведет. Но это работает.

Version 1

Это слишком сложно, но работает хорошо.

Вы можете сделать это:

  1. использовать df.columns, чтобы получить список имен столбцов
  2. использует этот список имен, чтобы сделать список столбцов
  3. пасса этого список то, что будет вызывать перегруженную функцию добавления столбца в fold-type functional manner

с питона reduce, некоторые знания о том, как перегрузка операторов работы, и код pyspark для столбцов here, который становится:

def column_add(a,b): 
    return a.__add__(b) 

newdf = df.withColumn('total_col', 
     reduce(column_add, (df[col] for col in df.columns))) 

Обратите внимание, что это Python уменьшить, а не искры РДД уменьшает, а термин скобки в качестве второго параметра, чтобы уменьшить требует скобок, потому что этим выражение списка генератора.

Протестировано, работает!

$ pyspark 
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache() 
>>> df 
DataFrame[a: bigint, b: bigint, c: bigint] 
>>> df.columns 
['a', 'b', 'c'] 
>>> def column_add(a,b): 
...  return a.__add__(b) 
... 
>>> df.withColumn('total', reduce(column_add, (df[col] for col in df.columns))).collect() 
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)] 
+0

@Salmonerd Спасибо. Иногда это помогает помнить, что класс искроберических кадров неизменен, и поэтому для внесения любых изменений в данные вы должны называть то, что возвращает новый dataframe. – Paul

+3

Версия 2 не работает с Spark 1.5.0 и CDH-5.5.2. и Python версии 3.4. Это порождает ошибку: «AttributeError: объект« generator »не имеет атрибута« _get_object_id » – Hemant

+0

Оба ваших решения приятные и аккуратные. Мне интересно, почему вы не использовали для этого определенные пользователем функции? – ThePrincess

Смежные вопросы