2015-12-14 4 views
1

Есть ли способ сгладить произвольно вложенный Spark Dataframe? Большая часть работы, которую я вижу, написана для конкретной схемы, и я хотел бы иметь возможность обобщать Dataframe с различными вложенными типами (например, StructType, ArrayType, MapType и т. Д.).Сгладить вложенные свечи Dataframe

Скажем, у меня есть схема, как:

StructType(List(StructField(field1,...), StructField(field2,...), ArrayType(StructType(List(StructField(nested_field1,...), StructField(nested_field2,...)),nested_array,...))) 

Looking адаптировать это в плоский стол со структурой типа:

field1 
field2 
nested_array.nested_field1 
nested_array.nested_field2 

FYI, ищет предложения для Pyspark, но и другие ароматы Искра также оценена.

+3

Что вы подразумеваете под «flatten Dataframe с различными вложенными типами (например, StructType, ArrayType, MapType "? Я не настолько наивна, чтобы ожидать увидеть какой-то код, но может быть полезно как минимум хорошее описание проблемы с примерами. – zero323

ответ

1

Вот мой последний подход:

1) Карта строки в dataframe к РДУ в Словаре. Найдите подходящий код python для выравнивания dict.

flat_rdd = nested_df.map(lambda x : flatten(x)) 

где

def flatten(x): 
    x_dict = x.asDict() 
    ...some flattening code... 
    return x_dict 

2) Преобразование RDD [Dict] назад к dataframe

flat_df = sqlContext.createDataFrame(flat_rdd) 
0

Использование взрываются от от pyspark.sql.functions

+0

Explode создает новую строку для каждого элемента в коллекции, что не является –

+0

не отвечает на вопрос. – Gary

4

Эта проблема может быть немного старым, но для тех, кто там все еще ищет решение, вы можете сгладить сложные типы данных inline, используя select *:

первый давайте создадим вложенную dataframe:

from pyspark.sql import HiveContext 
hc = HiveContext(sc) 
nested_df = hc.read.json(sc.parallelize([""" 
{ 
    "field1": 1, 
    "field2": 2, 
    "nested_array":{ 
    "nested_field1": 3, 
    "nested_field2": 4 
    } 
} 
"""])) 

теперь сплющить его:

flat_df = nested_df.select("field1", "field2", "nested_array.*") 

вы найдете полезные примеры здесь https://docs.databricks.com/spark/latest/spark-sql/complex-types.html

Если у вас есть слишком много вложенных массивов, вы можете использовать:

flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct'] 
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct'] 
flat_df = nested_df.select(*flat_cols, *[c + ".*" for c in nested_cols]) 
Смежные вопросы