2017-01-24 1 views
1

Я пытаюсь прочитать данные из файлов avro в RDD с помощью Kryo. Мой код компилируется нормально, но во время выполнения я получаю ClassCastException. Вот что мой код делает:Неверный тип исполнения в RDD при чтении из avro с пользовательским сериализатором

SparkConf conf = new SparkConf()... 
conf.set("spark.serializer", KryoSerializer.class.getCanonicalName()); 
conf.set("spark.kryo.registrator", MyKryoRegistrator.class.getName()); 
JavaSparkContext sc = new JavaSparkContext(conf); 

Где MyKryoRegistrator регистрирует Serializer для MyCustomClass:

public void registerClasses(Kryo kryo) { 
    kryo.register(MyCustomClass.class, new MyCustomClassSerializer()); 
} 

Затем я прочитал мой файл данных:

JavaPairRDD<MyCustomClass, NullWritable> records = 
       sc.newAPIHadoopFile("file:/path/to/datafile.avro", 
       AvroKeyInputFormat.class, MyCustomClass.class, NullWritable.class, 
       sc.hadoopConfiguration()); 
Tuple2<MyCustomClass, NullWritable> first = records.first(); 

Это, кажется, работает хорошо, но используя отладчик, я вижу, что, хотя у RDD есть kClassTag my.package.containing.MyCustomClass, переменная first содержит Tuple2<AvroKey, NullWritable>, а не Tuple2<MyCustomClass, NullWritable>! И действительно, когда следующая строка выполняет:

System.out.println("Got a result, custom field is: " + first._1.getSomeCustomField()); 

я получаю исключение:

java.lang.ClassCastException: org.apache.avro.mapred.AvroKey cannot be cast to my.package.containing.MyCustomClass 

я делаю что-то не так? И даже так, не следует ли мне получить ошибку компиляции, а не ошибку времени выполнения?

+0

Вы видели [этот] (http://stackoverflow.com/questions/34999783/read-avro-with-spark-in-java) вопрос? –

+0

@YuvalItzchakov да, но это в scala. Я изо всех сил старался перевести его в java, но не смог его компилировать: - /. Вы знаете, как сделать то же самое в java? – Nira

+0

@YuvalItzchakov Мне действительно удалось запустить это на Java, но я думаю, что это не работает с NullWritable. Я получаю исключение во время выполнения: 'org.apache.avro.AvroTypeException: Found Root, ожидающий org.apache.avro.mapreduce.KeyValuePair, отсутствует необходимый полевой ключ'. Я дал ему пустую схему, у NullWritable нет полей: 'SchemaBuilder.record (" NullWritable "). Namespace (" org.apache.hadoop.io "). EndRecord()' – Nira

ответ

0

************* EDIT **************

мне удалось загрузить пользовательские объекты из Avro файлов и создал GitHub repository с код. Однако, если avro lib не загружает данные в пользовательский класс, вместо этого он возвращает объекты GenericData $ Record. И в этом случае Spark Java API не проверяет назначение на пользовательский класс, поэтому вы получаете только ClassCastException при попытке получить доступ к базе данных AvroKey. Это является нарушением гарантии безопасности данных.

************* EDIT **************

Для кого-то пытается сделать это, у меня есть хак, чтобы получить вокруг этой проблемы, но это не может быть правильным решением: Я создал класс для чтения GenericData.Record из Avro файлов:

public class GenericRecordFileInputFormat extends FileInputFormat<GenericData.Record, NullWritable> { 
    private static final Logger LOG = LoggerFactory.getLogger(GenericRecordFileInputFormat.class); 

    /** 
    * {@inheritDoc} 
    */ 
    @Override 
    public RecordReader<GenericData.Record, NullWritable> createRecordReader(
      InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
     Schema readerSchema = AvroJob.getInputKeySchema(context.getConfiguration()); 
     if (null == readerSchema) { 
      LOG.warn("Reader schema was not set. Use AvroJob.setInputKeySchema() if desired."); 
      LOG.info("Using a reader schema equal to the writer schema."); 
     } 
     return new GenericDataRecordReader(readerSchema); 
    } 


    public static class GenericDataRecordReader extends RecordReader<GenericData.Record, NullWritable> { 

     AvroKeyRecordReader<GenericData.Record> avroReader; 

     public GenericDataRecordReader(Schema readerSchema) { 
      super(); 
      avroReader = new AvroKeyRecordReader<>(readerSchema); 
     } 

     @Override 
     public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { 
      avroReader.initialize(inputSplit, taskAttemptContext); 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      return avroReader.nextKeyValue(); 
     } 

     @Override 
     public GenericData.Record getCurrentKey() throws IOException, InterruptedException { 
      AvroKey<GenericData.Record> currentKey = avroReader.getCurrentKey(); 
      return currentKey.datum(); 
     } 

     @Override 
     public NullWritable getCurrentValue() throws IOException, InterruptedException { 
      return avroReader.getCurrentValue(); 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      return avroReader.getProgress(); 
     } 

     @Override 
     public void close() throws IOException { 
      avroReader.close(); 
     } 
    } 
} 

Затем я загружаю записи:

JavaRDD<GenericData.Record> records = sc.newAPIHadoopFile("file:/path/to/datafile.avro", 
       GenericRecordFileInputFormat.class, GenericData.Record.class, NullWritable.class, 
       sc.hadoopConfiguration()).keys(); 

Тогда я преобразовать записи в мой пользовательский класс u петь конструктор, который принимает GenericData.Record.

Снова - не красиво, но работает.

Смежные вопросы