3

Для метода выборочного метода Estimator transformSchema мне нужно иметь возможность сравнить схему фрейма входных данных с схемой, определенной в классе case. Обычно это может быть выполнено как Generate a Spark StructType/Schema from a case class, как описано ниже. Однако, используются неправильная допустимость пустой:Исправленная схема из класса case с правильной нулеутоляемостью

Реальная схема ФРА презюмирован на spark.read.csv().as[MyClass] может выглядеть следующим образом:

root 
|-- CUSTOMER_ID: integer (nullable = false) 

и класс корпуса:

case class MySchema(CUSTOMER_ID: Int) 

Для сравнения я использую:

val rawSchema = ScalaReflection.schemaFor[MySchema].dataType.asInstanceOf[StructType] 
    if (!rawSchema.equals(rawDf.schema)) 

К сожалению, это всегда дает false , Как новая схема вручную вывести из класса случае является установка обнуляемым для true (потому что JA java.Integer на самом деле может быть пустым)

root 
|-- CUSTOMER_ID: integer (nullable = true) 

Как я могу указать nullable = false при создании схемы?

ответ

3

Возможно, вы смешиваете вещи, которые на самом деле не принадлежат одному пространству. ML Pipelines по своей сути динамичны, и введение статически типизированных объектов на самом деле не меняет этого.

Кроме того схема для класса определяется как:

case class MySchema(CUSTOMER_ID: Int) 

будет не обнуляемым CUSTOMER_ID. scala.Int не то же самое, как java.lang.Integer:

scala> import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor 
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor 

scala> case class MySchema(CUSTOMER_ID: Int) 
defined class MySchema 

scala> schemaFor[MySchema].dataType 
res0: org.apache.spark.sql.types.DataType = StructType(StructField(CUSTOMER_ID,IntegerType,false)) 

Это, как говорится, если вы хотите nullable поля Option[Int]:

case class MySchema(CUSTOMER_ID: Option[Int]) 

и если вы хотите не NULLABLE Используйте Int, как описано выше.

Еще одна проблема, которая возникает у вас здесь, заключается в том, что для csv каждое поле может быть обнулено по определению, и это состояние «унаследовано» закодированным Dataset. Так что на практике:

spark.read.csv(...) 

всегда приведет:

root 
|-- CUSTOMER_ID: integer (nullable = true) 

, и именно поэтому вы получаете несоответствие схемы. К сожалению, невозможно переопределить поле nullable для источников, которые не применяют ограничения на неопределенность, например csv или json.

Если не имея обнуляемого схемы является жестким требованием, вы можете попробовать:

spark.createDataFrame(
    spark.read.csv(...).rdd, 
    schemaFor[MySchema].dataType.asInstanceOf[StructType] 
).as[MySchema] 

Этот подход применим только тогда, когда вы знаете, что данные на самом деле null бесплатно. Любое null Значение wiil приводит к исключению во время выполнения.

Смежные вопросы