2015-08-10 2 views
4

Вот код, чтобы создать pyspark.sql DataFrameКак добавить numpy.array в качестве нового столбца в pyspark.SQL DataFrame?

import numpy as np 
import pandas as pd 
from pyspark import SparkContext 
from pyspark.sql import SQLContext 
df = pd.DataFrame(np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]]), columns=['a','b','c']) 
sparkdf = sqlContext.createDataFrame(df, samplingRatio=0.1) 

Так что sparkdf выглядит

a b c 
1 2 3 
4 5 6 
7 8 9 
10 11 12 

Теперь я хотел бы добавить новый столбец в Numpy массив (или даже список)

new_col = np.array([20,20,20,20]) 

но стандартным способом

sparkdf = sparkdf.withColumn('newcol', new_col) 

не работает. Вероятно, udf - это путь, но я не знаю, как создать udf, который присваивает одно значение в строке DataFrame, т. Е. Итерации через new_col. Я посмотрел на другие pyspark и pyspark.sql, но не смог найти решение. Также мне нужно оставаться внутри pyspark.sql, а не решением scala. Благодаря!

ответ

3

Предполагая, что кадр данных сортируются в соответствии порядок значений в массиве вы можете сжать РД и восстановить кадр данных следующим образом:

n = sparkdf.rdd.getNumPartitions() 

# Parallelize and cast to plain integer (np.int64 won't work) 
new_col = sc.parallelize(np.array([20,20,20,20]), n).map(int) 

def process(pair): 
    return dict(pair[0].asDict().items() + [("new_col", pair[1])]) 

rdd = (sparkdf 
    .rdd # Extract RDD 
    .zip(new_col) # Zip with new col 
    .map(process)) # Add new column 

sqlContext.createDataFrame(rdd) # Rebuild data frame 

Вы также можете использовать присоединяется:

new_col = sqlContext.createDataFrame(
    zip(range(1, 5), [20] * 4), 
    ("rn", "new_col")) 

sparkdf.registerTempTable("df") 

sparkdf_indexed = sqlContext.sql(
    # Make sure we have specific order and add row number 
    "SELECT row_number() OVER (ORDER BY a, b, c) AS rn, * FROM df") 

(sparkdf_indexed 
    .join(new_col, new_col.rn == sparkdf_indexed.rn) 
    .drop(new_col.rn)) 

но компонент оконной функции не является масштабируемым и его следует избегать с помощью более крупных наборов данных.

Конечно, если все, что вам нужно, это столбец одного значения, вы можете просто использовать lit

import pyspark.sql.functions as f 
sparkdf.withColumn("new_col", f.lit(20)) 

, но я предполагаю, что это не так.

+0

Спасибо. Похоже, что (первое решение) вам нужно вернуться в RDD, а затем снова конвертировать в DataFrame, не оставаясь внутри pyspark.sql. Какое из двух решений обеспечивает лучшую производительность, т. Е. Быстрее? – rstreppa

+0

Ну, игнорируя Catalyst, кадры данных - это всего лишь слой абстракции над RDD. Разумеется, накладные расходы из 'sqlContext.createDataFrame', которые могут быть значительно уменьшены путем ручного предоставления схемы. 'zip' сам по себе намного проще, чем объединение, и пока сохраняется порядок, вам не нужна сортировка. Я думаю, что важный вопрос заключается в том, почему вам нужно добавить новый столбец. Если он исходит из другого источника данных, тогда загрузка его в виде таблицы и соединения является естественным выбором. – zero323

+0

На самом деле я получаю сообщение об ошибке: 'ValueError: может только zip с RDD, который имеет такое же количество разделов' – rstreppa

Смежные вопросы