2016-09-13 2 views
0

Я пытаюсь читать na Avro файл в искровой работе.
Моя искра версии 1.6.0 (искра-сердечник_2.10-1.6.0-cdh5.7.1).Как читать файл avro в искры с помощью newAPIHadoopFile?

Вот мой Java-код:

JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("ReadAvro")); 
JavaPairRDD <NullWritable, Text> lines = sc.newAPIHadoopFile(args[0],AvroKeyValueInputFormat.class,AvroKey.class,AvroValue.class,new Configuration()); 

Но я получаю время компиляции исключение:

Метод newAPIHadoopFile (String, класс, класс, класс, Configuration) в типе JavaSparkContext не применим для аргументов (String, Class, Class, Класс, конфигурация)

Итак, каков правильный способ использования JavaSparkContext.newAPIHadoopFile() в Java?

ответ

2
public class Utils { 

    public static <T> JavaPairRDD<String, T> loadAvroFile(JavaSparkContext sc, String avroPath) { 
    JavaPairRDD<AvroKey, NullWritable> records = sc.newAPIHadoopFile(avroPath, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, sc.hadoopConfiguration()); 
    return records.keys() 
     .map(x -> (GenericRecord) x.datum()) 
     .mapToPair(pair -> new Tuple2<>((String) pair.get("key"), (T)pair.get("value"))); 
    } 
} 

Используйте утилиту как:

JavaPairRDD<String, YourAvroClassName> records = Utils.<YourAvroClassName>loadAvroFile(sc, inputDir); 

Вы также должны использовать KryoSerializer и зарегистрировать пользовательские KryoRegistrator:

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 
sparkConf.set("spark.kryo.registrator", "com.test.avro.MyKryoRegistrator"); 

public class MyKryoRegistrator implements KryoRegistrator { 

    public static class SpecificInstanceCollectionSerializer<T extends Collection> extends CollectionSerializer { 
    Class<T> type; 
    public SpecificInstanceCollectionSerializer(Class<T> type) { 
     this.type = type; 
    } 

    @Override 
    protected Collection create(Kryo kryo, Input input, Class<Collection> type) { 
     return kryo.newInstance(this.type); 
    } 

    @Override 
    protected Collection createCopy(Kryo kryo, Collection original) { 
     return kryo.newInstance(this.type); 
    } 
    } 


    Logger logger = LoggerFactory.getLogger(this.getClass()); 

    @Override 
    public void registerClasses(Kryo kryo) { 
    // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type 
    // because Kryo is not able to serialize them properly, we use this serializer for them 
    kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializer<>(ArrayList.class)); 
    kryo.register(YourAvroClassName.class); 
    } 
} 

Надеется, что это помогает ...

+0

Еще же исключение времени компиляции в классе Utils. –

Смежные вопросы