2016-12-09 4 views
5

У меня есть большой вложенный NDJ (новый файл с разделителями строк JSON), который мне нужно прочитать в единый блок данных искры и сохранить паркет. В попытке оказать схему я использую эту функцию:Чтение массивных файлов JSON в Spark Dataframe

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { 
     schema.fields.flatMap(f => { 
      val colName = if (prefix == null) f.name else (prefix + "." + f.name) 
      f.dataType match { 
      case st: StructType => flattenSchema(st, colName) 
      case _ => Array(col(colName)) 
      } 
     }) 
    } 

на dataframe, возвращенного считывании

val df = sqlCtx.read.json(sparkContext.wholeTextFiles(path).values)

Я также переключился это val df = spark.read.json(path) так, что это работает только с NDJ, а не с несколькими линиями JSON - та же ошибка.

Это вызывает ошибку в памяти для рабочих java.lang.OutOfMemoryError: Java heap space.

Я изменил параметры JVM памяти и параметры искровой исполнитель/водитель но безрезультатно

Есть ли способ, чтобы поток файла, придавить схемы, и добавить к dataframe пошагово? Некоторые строки JSON содержат новые поля из предыдущего, поэтому они должны быть заполнены позже.

ответ

0

Вы можете достичь этого несколькими способами.

Во-первых, во время чтения вы можете предоставить схему для dataframe для чтения json или вы можете позволить искры вывести схему сама по себе.

Как только json находится в кадре данных, вы можете выполнить следующие способы его сглаживания.

a. Использование explode() в dataframe - сгладить его. b. Использование spark sql и доступ к вложенным полям. оператор. Вы можете найти примеры here

Наконец, если вы хотите добавить новые столбцы в dataframe a. Первый вариант, используя withColumn(), - это один подход. Однако это будет сделано для каждого добавленного столбца и для всего набора данных. b. Использование sql для генерации нового фрейма данных из существующих - это может быть проще всего c. Наконец, используя карту, затем обращаясь к элементам, получив старую схему, добавьте новые значения, создайте новую схему и, наконец, получите новую df - как показано ниже

One withColumn будет работать на весь rdd. Поэтому обычно не рекомендуется использовать метод для каждого столбца, который вы хотите добавить. Существует способ, которым вы работаете со столбцами и их данными внутри функции карты. Поскольку здесь используется одна функция карты, код для добавления нового столбца и его данных будет выполняться параллельно.

a. вы можете собирать новые значения на основе расчетов

b. Добавьте эти новые значения столбцов в основной RDD, как показано ниже

val newColumns: Seq[Any] = Seq(newcol1,newcol2) 
Row.fromSeq(row.toSeq.init ++ newColumns) 

Вот ряд, является ссылкой на строку в методе карты

гр. Создать новую схему, указанную ниже

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType)) 

d. Добавить в старую схему

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType) 

e.Создать новый фрейм данных с новыми колонками

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema) 
+0

Как он обращается к 'java.lang.OutOfMemoryError' в результате' wholeTextFiles'? –

+0

Я обращался к «Есть ли способ потокового файла, сгладить схему и постепенно добавлять к фреймворку данных? Некоторые строки JSON содержат новые поля из предыдущего, поэтому они должны быть заполнены позже. ». Я не вижу вопроса о разрешении проблемы с памятью. Поэтому он дал ему несколько подходов. – Ramzy

+0

Если NDJ является JSONL, то OP не должен использовать wholeTextFiles. Если это не так, это не поможет. –

2

Работа не работает. Проблема заключалась в ограничении объекта JVM. Я закончил использование scala json parser и построил dataframe вручную.

Смежные вопросы