2013-06-28 3 views
2

У меня сложная запись Avro (вложенные записи, объединенные типы), которую я храню как значение в HBase. Я прочитал в Avro файл данных, используя схему, которая в настоящее время соответствует писателю схема файла и записи, которые находятся в HBase:Чтение Avro записывается из HBase в Java

Schema schema = new Schema.Parser().parse(schema_file); 
DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema); 
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(avro_file, datumReader); 
GenericRecord record = null; 
record = dataFileReader.next(record); 

Затем я проверить HBase, чтобы увидеть, если запись с таким же RowKey уже существует. Вал вернулся моими получают представление байтового массива из Avro записи:

Configuration conf = HBaseConfiguration.create(); 
HTable table = new HTable(conf, "table"); 
String pk = new String(record.get("x").toString()+record.get("y").toString()); 
Get get = new Get(Bytes.toBytes(pk)); 
Result result = table.get(get); 
byte[] val = result.getValue(Bytes.toBytes("c"),Bytes.toBytes("c")); 

Если нет записи в HBase с тем же RowKey, я поставил рекорд:

if (val == null) { 
    System.out.println("pk: "+pk+" does not exist"); 
    Put put = new Put(Bytes.toBytes(pk)); 
    put.add(Bytes.toBytes("c"), Bytes.toBytes("c"), Bytes.toBytes(record.toString())); 
    try { 
    table.put(put); 
    } catch (Exception e) { 
    System.err.println("Can't put to table: " + e); 
    } 
} 
else { 
    System.out.println("pk: "+pk+" does exist"); 
    //help me! 
} 

Если есть запись в HBase с той же ключевой строкой, я хочу преобразовать массив байтов результата HBase обратно в схему Avro, а затем сравнить пару полей, чтобы увидеть, какая запись «лучше». Я бы поставил «лучшую» запись в HBase, но я застрял. Как преобразовать массив байтов из HBase в GenericRecord, чтобы я мог сравнивать поля между моей файловой записью и HBase?

ответ

4

Я понял. Мне нужно было записать свою запись в HBase в виде сериализованного байтового массива вместо строки, преобразованной в массив байтов.

Put становится:

ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
DataFileWriter<GenericRecord> dfw = new DataFileWriter<GenericRecord>(writer); 
dfw.create(schema, baos); 
dfw.append(record); 
dfw.close(); 
Put put = new Put(Bytes.toBytes(pk)); 
put.add(Bytes.toBytes("c"), Bytes.toBytes("c"), baos.toByteArray()); 

И это, чтобы получить:

GenericRecord hrecord = null; 
    ByteArrayInputStream bais = new ByteArrayInputStream(val); 
    DataFileStream<GenericRecord> dfs = new DataFileStream<GenericRecord>(bais, datumReader); 
    hrecord = dfs.next(hrecord);