Я пытаюсь прочитать данные из файлов avro в RDD с помощью Kryo. Мой код компилируется нормально, но во время выполнения я получаю ClassCastException
. Вот что мой код делает:Неверный тип исполнения в RDD при чтении из avro с пользовательским сериализатором
SparkConf conf = new SparkConf()...
conf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
conf.set("spark.kryo.registrator", MyKryoRegistrator.class.getName());
JavaSparkContext sc = new JavaSparkContext(conf);
Где MyKryoRegistrator
регистрирует Serializer для MyCustomClass
:
public void registerClasses(Kryo kryo) {
kryo.register(MyCustomClass.class, new MyCustomClassSerializer());
}
Затем я прочитал мой файл данных:
JavaPairRDD<MyCustomClass, NullWritable> records =
sc.newAPIHadoopFile("file:/path/to/datafile.avro",
AvroKeyInputFormat.class, MyCustomClass.class, NullWritable.class,
sc.hadoopConfiguration());
Tuple2<MyCustomClass, NullWritable> first = records.first();
Это, кажется, работает хорошо, но используя отладчик, я вижу, что, хотя у RDD есть kClassTag my.package.containing.MyCustomClass, переменная first
содержит Tuple2<AvroKey, NullWritable>
, а не Tuple2<MyCustomClass, NullWritable>
! И действительно, когда следующая строка выполняет:
System.out.println("Got a result, custom field is: " + first._1.getSomeCustomField());
я получаю исключение:
java.lang.ClassCastException: org.apache.avro.mapred.AvroKey cannot be cast to my.package.containing.MyCustomClass
я делаю что-то не так? И даже так, не следует ли мне получить ошибку компиляции, а не ошибку времени выполнения?
Вы видели [этот] (http://stackoverflow.com/questions/34999783/read-avro-with-spark-in-java) вопрос? –
@YuvalItzchakov да, но это в scala. Я изо всех сил старался перевести его в java, но не смог его компилировать: - /. Вы знаете, как сделать то же самое в java? – Nira
@YuvalItzchakov Мне действительно удалось запустить это на Java, но я думаю, что это не работает с NullWritable. Я получаю исключение во время выполнения: 'org.apache.avro.AvroTypeException: Found Root, ожидающий org.apache.avro.mapreduce.KeyValuePair, отсутствует необходимый полевой ключ'. Я дал ему пустую схему, у NullWritable нет полей: 'SchemaBuilder.record (" NullWritable "). Namespace (" org.apache.hadoop.io "). EndRecord()' – Nira