2015-08-23 2 views
3

Я загружаю много версий файлов JSON, чтобы искриться DataFrame. некоторые из файлов имеет столбцы A, B и некоторых A, B, C или A, C ..Искра - установка null если столбец не существует в dataframe

Если я запускаю эту команду

from pyspark.sql import SQLContext 

sqlContext = SQLContext(sc) 

df = sqlContext.sql("SELECT A,B,C FROM table") 

после загрузки нескольких я получаю ошибку «столбец не существуют "Я загрузил только файлы, которые не содержат столбец C. Как установить это значение на null вместо получения ошибки?

ответ

2

DataFrameReader.json метод предоставляет необязательный аргумент схемы, который вы можете использовать здесь. Если ваша схема является сложной самым простым решением является повторным использованием одного выведенных из файла, который содержит все поля:

df_complete = spark.read.json("complete_file") 
schema = df_complete.schema 

df_with_missing = spark.read.json("df_with_missing", schema) 
# or 
# spark.read.schema(schema).("df_with_missing") 

Если вы знаете схему, но по какой-то причине вы не можете использовать выше, вы должны создать его с нуля.

schema = StructType([ 
    StructField("A", LongType(), True), ..., StructField("C", LongType(), True)]) 

Как всегда обязательно выполните некоторые проверки качества после загрузки ваших данных.

Пример (обратите внимание, что все поля являются nullable):

from pyspark.sql.types import * 

schema = StructType([ 
    StructField("x1", FloatType()), 
    StructField("x2", StructType([ 
     StructField("y1", DoubleType()), 
     StructField("y2", StructType([ 
      StructField("z1", StringType()), 
      StructField("z2", StringType()) 
     ])) 
    ])), 
    StructField("x3", StringType()), 
    StructField("x4", IntegerType()) 
]) 

spark.read.json(sc.parallelize(["""{"x4": 1}"""]), schema).printSchema() 
## root 
## |-- x1: float (nullable = true) 
## |-- x2: struct (nullable = true) 
## | |-- y1: double (nullable = true) 
## | |-- y2: struct (nullable = true) 
## | | |-- z1: string (nullable = true) 
## | | |-- z2: string (nullable = true) 
## |-- x3: string (nullable = true) 
## |-- x4: integer (nullable = true) 

spark.read.json(sc.parallelize(["""{"x4": 1}"""]), schema).first() 
## Row(x1=None, x2=None, x3=None, x4=1) 

spark.read.json(sc.parallelize(["""{"x3": "foo", "x1": 1.0}"""]), schema).first() 
## Row(x1=1.0, x2=None, x3='foo', x4=None) 

spark.read.json(sc.parallelize(["""{"x2": {"y2": {"z2": "bar"}}}"""]), schema).first() 
## Row(x1=None, x2=Row(y1=None, y2=Row(z1=None, z2='bar')), x3=None, x4=None) 

Важно:

Этот метод применим только к источнику JSON и зависят от деталей реализации. Не используйте его для таких источников, как паркет.

+1

read.json(), кажется, принимает только один аргумент. это сработало для меня: df_with_missing = sqlContext.read.schema (schema) .json ("df_with_missing") – jgaw

+0

Я не уверен, что это решение работает. Вы можете в значительной степени распечатать схему или показать ее без каких-либо проблем, но когда вы пытаетесь что-либо сделать с этими столбцами (например, проверьте, не являются ли они пустыми), тогда она перестает жаловаться на то, что [столбец] не находится в схеме (проверен с Spark 2.0.2). – marios

+0

@marios Как вы это проверите? Вы используете вход JSON? – zero323

Смежные вопросы