2015-09-10 1 views
1

Мне дано пару файлов Avro, которые генерируются из объектов класса java A. Используя искровую оболочку (Spark 1.4.0), я могу читать эти файлы в dataframes с использованием spark-avro (версия 2.0.1) иКак читать файлы Avro (созданные из класса Java) с использованием оболочки Spark при загрузке исходного класса Java?

val df = sqlContext.read.avro("file.avro") 

, который работает отлично, до тех пор, как искра оболочка не знает о классе А. Если добавить баночку, в том числе класса а к раковине и выдает ту же команду, я получаю следующее исключение:

A cannot be cast to org.apache.avro.generic.IndexedRecord 
    at org.apache.avro.generic.GenericData.setField(GenericData.java:569) 
    at org.apache.avro.generic.GenericData.setField(GenericData.java:586) 
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) 
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) 
    at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) 
    at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:66) 
    at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32) 
    at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:248) 
    at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:216) 
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.isEmpty(Iterator.scala:256) 
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1157) 
    at com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4.apply(AvroRelation.scala:127) 
    at com.databricks.spark.avro.AvroRelation$$anonfun$buildScan$1$$anonfun$4.apply(AvroRelation.scala:126) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Я думаю, это связано с тем, что Avro пытается быть умным и использовать класс A, если он может его найти.

Есть ли способ удержать Avro от этого и просто использовать общие записи напрямую?

+1

Я подал ошибку с искровым авором, чтобы мы убедились в этом (я являюсь автором этого проекта): https://github.com/databricks/spark-avro/issues/90 –

+0

Та же проблема возникает при использовании sc.hadoopFile (path, classOf [AvroInputFormat [GenericRecord]], classOf [AvroWrapper [GenericRecord]], classOf [NullWritable]), который работает только в том случае, если класс генерации не находится в пути к классам. – Philosophus42

ответ

1

Чтобы дать предварительный ответ (более рода обходной путь) на мой вопрос:

Если один не нуждается в dataframe и РДД достаточно хорошо, я в настоящее время с помощью этой функции:

import org.apache.avro.file.DataFileStream 
import org.apache.avro.reflect.{ReflectDatumReader, ReflectData} 
import org.apache.avro.io.DatumReader 
import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 
import scala.reflect.runtime.universe._ 
import scala.collection.JavaConverters._ 

def avroFileToRDD[T](sc: SparkContext, path: String) (implicit ctag: reflect.ClassTag[T]): RDD[T] = { 
    sc.binaryFiles(path).flatMap{ case(fileName, stream) => 
     val schema = ReflectData.get().getSchema(ctag.runtimeClass.asInstanceOf[Class[T]]) 
     val reader: DatumReader[T] = new ReflectDatumReader[T](schema) 
     val dfs = new DataFileStream[T](stream.open, reader) 
     val result = dfs.iterator().asScala.toArray // the toArray is needed to make sure that the entire stream is processed before closing it 
     stream.close 
     result.toIterator 
    } 
    } 

Эта функция, по крайней мере, не обращает внимания на вещи, которые можно найти на пути к классам, и, похоже, работает довольно надежно. Недостаток заключается в том, что входной поток должен быть закрыт, прежде чем покинуть flatMap, и поэтому каждый раздел должен быть полностью проанализирован.

+0

Есть ли обновление к этому? Кажется слабым, что API DataFrame работает только для _some_ .avro файлов. –

Смежные вопросы