2016-11-16 4 views
0

Я бег искры 2.0.1 и столкнулся с вопросами в момент запроса только:Спарк 2.0.1: Scala: временный запрос зрения не

  1. создание строки РД из оригинального РДА
  2. создать схему из рдд
  3. создать dataframe

Я проверил еще раз:

case class Person(name: String, age: Long) 

val peopleDF = sparkSession.sparkContext 
    .textFile("/home/raja/scala_code/text2.dat") 
    .map(_.split("|")) 
    .map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF() 

peopleDF.createOrReplaceTempView("people") 

val teenagersDF = sparkSession.sql("SELECT * FROM people") 

teenagersDF.show() 

Когда я выстрелил sparkSession.sql("SELECT name FROM emp"), он дает ошибку ниже.

{ java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1 
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#0 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 0 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) AS age#1 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 1 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) AS salary#2 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 2 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) AS birthplace#3 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 0 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) AS age#1 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 1 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) AS salary#2 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 2 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) AS birthplace#3 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 3 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279) 
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) 
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:200) 
    at org.apache.spark.sql.Row$class.isNullAt(Row.scala:185) 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:192) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276) 
    ... 20 more 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347) 
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) 
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934) 
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576) 
    at org.apache.spark.sql.Dataset.head(Dataset.scala:1934) 
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2149) 
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:526) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:486) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:495) 
    ... 64 elided 
Caused by: java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 1 
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#0 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 0 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) AS age#1 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 1 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, age) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) AS salary#2 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 2 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, salary) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) AS birthplace#3 
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) 
    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt 
    : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
    : : +- input[0, org.apache.spark.sql.Row, true] 
    : +- 3 
    :- null 
    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType), true) 
     +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace), StringType) 
     +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 3, birthplace) 
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) 
       +- input[0, org.apache.spark.sql.Row, true] 

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279) 
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) 
    at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:200) 
    at org.apache.spark.sql.Row$class.isNullAt(Row.scala:185) 
    at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:192) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_1$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276) 
    ... 20 more} 
+0

, пожалуйста, напишите код, создающий 'emp' - Spark оценивает представления _lazily_, так что тот факт, что исключение вызывается только тогда, когда запрос не означает, что ошибка находится в запросе, может произойти из кода, создающего представление. –

+0

Я добавляю две части один здесь, а другой ниже val sparkSession = SparkSession.builder.master ("local"). AppName ("example"). GetOrCreate() val sc = sparkSession.sparkContext import sparkSession.implicits._ // val emprdd = sc.textFile ("/ home/raja/scala_code/text2.dat"). Map (_. Split ("|")). Map (p => Row (p (0), p (1))) val schemaString = "name birthplace" val fields = schemaString.split ("") .map (fieldName => StructField (fieldName, StringType, nullable = true)) val schema = StructType (fields) – user3485352

+0

val semprdd = emprdd.map (value => Row (value)) val empDF = sparkSession.createDataFrame (semprdd, schema) empDF.createOrReplaceTempView (" emp ") val results = sparkSession.sql (" SELECT name FROM emp ") results.show() – user3485352

ответ

1

Во-первых, split("|") не расщепляется трубы, как вы ожидаете, потому что split(s: String) ожидает регулярное выражение в качестве входных данных, а труба представляет собой специальный символ в регулярных выражениях. См. Более подробную информацию и решение здесь: https://stackoverflow.com/a/40359414/5344058

Если проблема не устранена после исправления этого вопроса (ваш вопрос не содержит входные данные образца, поэтому я не могу быть уверен), исключение (java.lang.ArrayIndexOutOfBoundsException: 1) является довольно показательным - ваш код предполагает, что результат split("|") на каждой записи производит массив, по крайней мере, два пунктов:

.map(_.split("|")) 
.map(attributes => Person(attributes(0), attributes(1).trim.toInt)) 
//            ^
//             | 
// this will throw exception if input isn't valid --/ 

Если какая-либо запись не соответствует этому условию, вы увидите это исключение.

Чтобы избежать этого, вы можете взять несколько маршрутов. Если вы просто хотите пропустить недопустимые строки, вы можете использовать collect вместо map с частичной функцией, которая определена только для массивов, по крайней мере, двух элементов:

.map(_.split("\\|")) 
.collect { case Array(a1, a2, _*) => Person(a1, a2.trim.toInt) } 

Этот код будет просто отфильтровать все рекорды который split создает массив с менее чем двумя записями.

+0

Ну, код, который я предложил, работает для простых данных, разделенных каналами (я использовал ' RAJA | 123 | Pynursla'), поэтому, если вы не предоставите некоторые примеры данных, для которых это не удается s (редактируя вопрос снова), я не могу помочь. –

+0

Да, я изменил определение класса с правильным типом, и отображение тоже должно быть точно напечатано, и оно работает. Кажется, что преобразование типа занимает много работы :) – user3485352

Смежные вопросы