2016-01-27 2 views
1

настоящее время я использую Python для массивов данных загрузки CSV в таблицу HBase, и я в настоящее время возникают проблемы с написанием соответствующих HFiles использованием saveAsNewAPIHadoopFileСпарк Streaming - HBase Bulk Load

Мой код в настоящее время выглядит следующим образом:

def csv_to_key_value(row): 
    cols = row.split(",") 
    result = ((cols[0], [cols[0], "f1", "c1", cols[1]]), 
       (cols[0], [cols[0], "f2", "c2", cols[2]]), 
       (cols[0], [cols[0], "f3", "c3", cols[3]])) 
    return result 

def bulk_load(rdd): 
    conf = {#Ommitted to simplify} 

    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" 
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" 

    load_rdd = rdd.flatMap(lambda line: line.split("\n"))\ 
        .flatMap(csv_to_key_value) 
    if not load_rdd.isEmpty(): 
     load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime, 
             "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2", 
             conf=conf, 
             keyConverter=keyConv, 
             valueConverter=valueConv) 
    else: 
     print("Nothing to process") 

Когда я запускаю этот код, я получаю следующее сообщение об ошибке:

java.io.IOException: Added a key not lexically larger than previous. Current cell = 10/f1:c1/1453891407213/Minimum/vlen=1/seqid=0, lastCell = /f1:c1/1453891407212/Minimum/vlen=1/seqid=0 at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:204)

Поскольку ошибки указывает на то, что ключ является проблемой, я схватился элементы из моего РДУ и они следующим образом (отформатирован для удобства чтения)

[(u'1', [u'1', 'f1', 'c1', u'A']), 
(u'1', [u'1', 'f2', 'c2', u'1A']), 
(u'1', [u'1', 'f3', 'c3', u'10']), 
(u'2', [u'2', 'f1', 'c1', u'B']), 
(u'2', [u'2', 'f2', 'c2', u'2B']), 
(u'2', [u'2', 'f3', 'c3', u'9']), 

. , ,

(u'9', [u'9', 'f1', 'c1', u'I']), 
(u'9', [u'9', 'f2', 'c2', u'3C']), 
(u'9', [u'9', 'f3', 'c3', u'2']), 
(u'10', [u'10', 'f1', 'c1', u'J']), 
(u'10', [u'10', 'f2', 'c2', u'1A']), 
(u'10', [u'10', 'f3', 'c3', u'1'])] 

Это идеальное соответствие для моего CSV в правильном порядке. Насколько я понимаю, в HBase ключ определяется {row, family, timestamp}. Ряд и семья - это комбинация, уникальная и монотонно возрастающая для всех записей в моих данных, и я не контролирую метку времени (которая является единственной проблемой, которую я могу себе представить)

Может кто-нибудь посоветует мне, как избежать/предотвратить такие проблемы?

ответ

0

Ну, это была просто глупая ошибка с моей стороны, и я чувствую себя немного глупо. Лексикографически, порядок должен быть 1, 10, 2, 3 ... 8, 9. Самый простой способ гарантировать правильный порядок, прежде чем загрузка:

rdd.sortByKey(true); 

Я надеюсь, что смогу спасти по крайней мере один человек, головные боли, которые я имел ,

Смежные вопросы