1

Я загружаю файлы AVRO из AWS S3 и записываю их как паркет.NullPointerException при написании паркета от AVRO in Spark 2.0

from pyspark.sql import SparkSession 
from pyspark.sql.types import * 

spark = SparkSession.builder.appName("AvroParquet").enableHiveSupport().getOrCreate() 

in_path = "s3://my-bucket/avro-path/*.avro" 
out_path = "s3://my-bucket/parquet-path/output.parquet" 

df = spark.read.format("com.databricks.spark.avro").load(in_path) 

df.write.save(out_path, format="parquet") 

По какой-то причине, я получаю эту NullPointerException при написании паркета. Возможно, при чтении avro смотрите java.io.ObjectInputStream.readObject0.

App > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: org.apache.spark.SparkException: Task failed while writing rows 
App > at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:270) 
App > at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
App > at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) 
App > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
App > at org.apache.spark.scheduler.Task.run(Task.scala:85) 
App > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
App > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
App > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
App > at java.lang.Thread.run(Thread.java:745) 
App > Caused by: java.io.IOException: java.lang.NullPointerException 
App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1284) 
App > at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174) 
App > at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65) 
App > at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65) 
App > at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89) 
App > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
App > at com.databricks.spark.avro.DefaultSource$$anonfun$buildReader$1.apply(DefaultSource.scala:145) 
App > at com.databricks.spark.avro.DefaultSource$$anonfun$buildReader$1.apply(DefaultSource.scala:143) 
App > at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:279) 
App > at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:263) 
App > at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:134) 
App > at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106) 
App > at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
App > at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
App > at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
App > at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:262) 
App > at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:261) 
App > at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:261) 
App > at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1359) 
App > at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:267) 
App > ... 8 more 
App > Caused by: java.lang.NullPointerException 
App > at com.databricks.spark.avro.DefaultSource$SerializableConfiguration.tryOrIOException(DefaultSource.scala:217) 
App > at com.databricks.spark.avro.DefaultSource$SerializableConfiguration.readObject(DefaultSource.scala:207) 
App > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
App > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
App > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
App > at java.lang.reflect.Method.invoke(Method.java:606) 
App > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 
App > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 
App > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
App > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
App > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 
App > at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) 
App > at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$10.apply(TorrentBroadcast.scala:254) 
App > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1321) 
App > at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:255) 
App > at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:189) 
App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1277) 

Только что переехал с 1.6 до 2.0 и не уверен, в чем проблема. Использование пакета com.databricks:spark-avro_2.11:3.0.1.

похоже на https://github.com/databricks/spark-avro/issues/147, но это должно быть исправлено с помощью 3.0.1.

ответ

1

Проблема связана с моими настройками конфигурации Spark, поскольку я мог писать на паркет локально и через настройки Spark EMR. Я создал проблему в Github: https://github.com/databricks/spark-avro/issues/188.

Смежные вопросы