2014-09-25 4 views
3

В моей работе MapReduce я использую AvroParquetOutputFormat для записи в файлы Parquet с использованием схемы Avro.Как установить несколько схем Avro с AvroParquetOutputFormat?

Логика приложения требует создания нескольких типов файлов, созданных редуктором, и каждый файл имеет свою схему Avro.

Класс AvroParquetOutputFormat имеет статический метод setSchema() для установки схемы Avro вывода. Рассматривая код, AvroParquetOutputFormat использует AvroWriteSupport.setSchema(), который снова является статической реализацией.

Без расширения AvroWriteSupport и взлома логики существует ли более простой способ получить несколько схем схемы Avro из AvroParquetOutputFormat в одной задаче MR?

Любые указатели/входы высоко оценены.

Благодаря & уважением

MK

ответ

1

Это может быть довольно поздно, чтобы ответить, но я также столкнулся с этим вопросом и пришел к решению.

Во-первых, нет поддержки как «MultipleAvroParquetOutputFormat» встроенный в parquet-mr. Но для достижения подобного поведения я использовал MultipleOutputs.

Для отображения только вида работы, поставить картограф так:

public class EventMapper extends Mapper<LongWritable, BytesWritable, Void, GenericRecord>{ 

    protected KafkaAvroDecoder deserializer; 
    protected String outputPath = ""; 

    // Using MultipleOutputs to write custom named files 
    protected MultipleOutputs<Void, GenericRecord> mos; 

    public void setup(Context context) throws IOException, InterruptedException { 
     super.setup(context); 
     Configuration conf = context.getConfiguration();   
     outputPath = conf.get(FileOutputFormat.OUTDIR); 
     mos = new MultipleOutputs<Void, GenericRecord>(context); 
    } 

    public void map(LongWritable ln, BytesWritable value, Context context){ 

     try { 
      GenericRecord record = (GenericRecord) deserializer.fromBytes(value.getBytes()); 
      AvroWriteSupport.setSchema(context.getConfiguration(), record.getSchema()); 
      Schema schema = record.getSchema(); 
      String mergeEventsPath = outputPath + "/" + schema.getName(); // Adding '/' will do no harm 
      mos.write((Void) null, record, mergeEventsPath); 

     } catch (IOException e) { 
      e.printStackTrace(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void cleanup(Context context) throws IOException, InterruptedException { 
     mos.close(); 
    } 

} 

Это создаст новый RecordWriter для каждой схемы и создает новый паркет файл, прилагаемый с именем схемы, например, , schema1-r-0000.parquet.

Это также создаст файлы part-r-0000x.parquet по умолчанию на основе схемы, установленной в драйвере. Чтобы избежать этого, используйте LazyOutputFormat как:

LazyOutputFormat.setOutputFormatClass(job, AvroParquetOutputFormat.class); 

Надеется, что это помогает.

Смежные вопросы