4

Получение этой нулевой ошибки в искре Dataset.filterСпарк исключение 2 Dataset Нулевое значение

Input CSV:

name,age,stat 
abc,22,m 
xyz,,s 

Работа Код:

case class Person(name: String, age: Long, stat: String) 

val peopleDS = spark.read.option("inferSchema","true") 
    .option("header", "true").option("delimiter", ",") 
    .csv("./people.csv").as[Person] 
peopleDS.show() 
peopleDS.createOrReplaceTempView("people") 
spark.sql("select * from people where age > 30").show() 

При отсутствии кода (Добавление следующие строки возврата погрешность):

val filteredDS = peopleDS.filter(_.age > 30) 
filteredDS.show() 

Возвращает нулевой ошибки

java.lang.RuntimeException: Null value appeared in non-nullable field: 
- field (class: "scala.Long", name: "age") 
- root class: "com.gcp.model.Person" 
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int). 

ответ

9

Exception вы получите должны объяснить все, но давайте идти шаг за шагом:

  • При загрузке данных с использованием csv источника данных все поля отмечены как nullable:

    val path: String = ??? 
    
    val peopleDF = spark.read 
        .option("inferSchema","true") 
        .option("header", "true") 
        .option("delimiter", ",") 
        .csv(path) 
    
    peopleDF.printSchema 
    
    root 
    |-- name: string (nullable = true) 
    |-- age: integer (nullable = true) 
    |-- stat: string (nullable = true) 
    
  • недостающего поля представляются в виде SQL NULL

    peopleDF.where($"age".isNull).show 
    
    +----+----+----+ 
    |name| age|stat| 
    +----+----+----+ 
    | xyz|null| s| 
    +----+----+----+ 
    
  • Далее конвертировать Dataset[Row] в Dataset[Person], который использует Long для кодирования age поля. Long в Scala не может быть null. Поскольку вход схемы является nullable, выход схемы остается nullable, несмотря на это:

    val peopleDS = peopleDF.as[Person] 
    
    peopleDS.printSchema 
    
    root 
    |-- name: string (nullable = true) 
    |-- age: integer (nullable = true) 
    |-- stat: string (nullable = true) 
    

    Обратите внимание, что это as[T] не влияет на схему вообще.

  • При запросе Dataset с использованием SQL (на зарегистрированной таблице) или DataFrame API Spark не будет десериализовать объект. Поскольку схема еще nullable мы можем выполнить:

    peopleDS.where($"age" > 30).show 
    
    +----+---+----+ 
    |name|age|stat| 
    +----+---+----+ 
    +----+---+----+ 
    

    без каких-либо проблем. Это просто обычная логика SQL, и NULL является допустимым значением.

  • Когда мы используем статически типизированных Dataset API:

    peopleDS.filter(_.age > 30) 
    

    Спарк десериализации объекта. Поскольку Long не может быть null (SQL NULL), он не работает, если вы его видели.

    Если бы вы не получили NPE.

  • Correct статически типизированное представление данных должны использовать Optional типов:

    case class Person(name: String, age: Option[Long], stat: String) 
    

    с исправленной функцией фильтра:

    peopleDS.filter(_.age.map(_ > 30).getOrElse(false)) 
    
    +----+---+----+ 
    |name|age|stat| 
    +----+---+----+ 
    +----+---+----+ 
    

    При желании вы можете использовать поиск по шаблону:

    peopleDS.filter { 
        case Some(age) => age > 30 
        case _   => false  // or case None => false 
    } 
    

    Обратите внимание, что вам необязательно (но было бы рекомендовано в любом случае) использовать необязательные типы для name и stat. Потому что Scala String - это просто Java String это может быть null. Конечно, если вы пойдете с таким подходом, вам нужно явно проверить, имеют ли доступные значения null или нет.

Связанные Spark 2.0 Dataset vs DataFrame

Смежные вопросы