2016-12-22 2 views
5

Мне нужно создать новый столбец Spark DF MapType на основе существующих столбцов, где имя столбца - это ключ, а значение - это значение.pyspark: Создать столбцы MapType из существующих столбцов

Как пример - я это DF:

rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6), 
         ('d23d', 1.5, 2.0, 2.2), 
         ('as3d', 2.2, 4.3, 9.0) 
          ]) 
schema = StructType([StructField('key', StringType(), True), 
        StructField('metric1', FloatType(), True), 
        StructField('metric2', FloatType(), True), 
        StructField('metric3', FloatType(), True)]) 
df = sqlContext.createDataFrame(rdd, schema) 

+----+-------+-------+-------+ 
| key|metric1|metric2|metric3| 
+----+-------+-------+-------+ 
|123k| 1.3| 6.3| 7.6| 
|d23d| 1.5| 2.0| 2.2| 
|as3d| 2.2| 4.3| 9.0| 
+----+-------+-------+-------+ 

Я уже так далеко, что я могу создать structType из этого:

nameCol = struct([name for name in df.columns if ("metric" in name)]).alias("metric") 
df2 = df.select("key", nameCol) 

+----+-------------+ 
| key|  metric| 
+----+-------------+ 
|123k|[1.3,6.3,7.6]| 
|d23d|[1.5,2.0,2.2]| 
|as3d|[2.2,4.3,9.0]| 
+----+-------------+ 

Но что мне нужно это Метрика столбец с am MapType, где ключ - это имя столбца:

+----+-------------------------+ 
| key|     metric| 
+----+-------------------------+ 
|123k|Map(metric1 -> 1.3, me...| 
|d23d|Map(metric1 -> 1.5, me...| 
|as3d|Map(metric1 -> 2.2, me...| 
+----+-------------------------+ 

Любые подсказки, как я могу преобразовать данные?

Спасибо!

ответ

8

В Spark 2.0 или новее вы можете использовать create_map. Первые несколько импорта:

from pyspark.sql.functions import lit, col, create_map 
from itertools import chain 

create_map ожидает перемежающуюся последовательность keys и values, которые могут быть созданы, например, так:

metric = create_map(list(chain(*(
    (lit(name), col(name)) for name in df.columns if "metric" in name 
)))).alias("metric") 

и используются с select:

df.select("key", metric) 

С примером данных результат:

+----+---------------------------------------------------------+ 
|key |metric             | 
+----+---------------------------------------------------------+ 
|123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6)  | 
|d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2)  | 
|as3d|Map(metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0)  | 
+----+---------------------------------------------------------+ 

Если вы используете более раннюю версию Спарк вам придется использовать UDF:

from pyspark.sql import Column 
from pyspark.sql.functions import struct 
from pyspark.sql.types import DataType 

def as_map(*cols: str, key_type: DataType=DoubleType()) -> Column: 
    args = [struct(lit(name), col(name)) for name in cols] 
    as_map_ = udf(
     lambda *args: dict(args), 
     MapType(StringType(), key_type) 
    ) 
    return as_map_(*args) 

, которые могут быть использованы следующим образом:

df.select("key", 
    as_map(*[name for name in df.columns if "metric" in name]).alias("metric")) 
+0

ваши решения выглядит хорошо, она может быть использована ответить: https://stackoverflow.com/questions/45445077/pyspark-spark-dataframe-aggregate-columns-in-map-type? –

Смежные вопросы