2016-10-13 6 views
0

Я пишу топологию Storm для чтения данных из HBase с использованием DRPC. По сути, это выполняет проверку для получения данных, обогащает данные и возвращает их.Болт Storm-hbase не работает над DRPC

Я могу легко получить базовый пример DRPC, работающий (на основе http://storm.apache.org/releases/current/Distributed-RPC.html). Однако, когда я вставляю код для сканирования, процесс занимает очень много времени. Через минуту, я получаю следующее сообщение об ошибке:

backtype.storm.generated.DRPCExecutionException

в backtype.storm.daemon.drpc $ service_handler $ reify__8688.failRequest (drpc.clj: 136) ~ [Шторм core-0.10.0.2.4.2.0-258.jar: 0.10.0.2.4.2.0-258]

at backtype.storm.drpc.DRPCSpout.fail (DRPCSpout.java:241) ~ [storm-core -0.10.0.2.4.2.0-258.jar: 0.10.0.2.4.2.0-258

Через некоторое время я получаю org.apache.hadoop.hbase.client.RetriesExhaustedException. Это не всегда происходит, но очень распространено. Мое предположение, основанное на этом, является одной из двух возможностей:

Отсканированное время. Однако выполнение сканирования через HBase Shell или REST возвращает менее чем за секунду Таблица несовместима, в результате чего определенная область отсутствует. Я запустил hbase hbck, и он показывает 0 несоответствий. Я знаю, что соединение с HBase в порядке: я добавил результат отладки, и болт получает результаты. Однако из-за исключения DRPCExecutionException эти результаты никогда не возвращаются по DRPC.

Я, хотя проблема была таймаутом DRPC, однако я увеличил тайм-аут DRPC много, и я получаю тот же результат за такое же количество времени. После Googling я нашел кого-то другого с той же проблемой ([Storm][DRPC] Request failed), но нет никаких указаний, как это исправить.

Для справки я добавляю мой код ниже:

try (Table table = HbaseClient.connection().getTable(TableName.valueOf("EPG_URI"))) 
    { 
     List<Filter> filters = new ArrayList<>(); 
     String startRowString = "start"; 
     String endRowString = "end"; 
     RowFilter startRow = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryPrefixComparator(startRowString.getBytes())); 
     filters.add(startRow); 
     RowFilter endRow = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryPrefixComparator(endRowString.getBytes())); 
     filters.add(endRow); 
     FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filters); 

     Scan scan = new Scan(); 
     scan.addFamily("f1".getBytes()); 
     scan.setFilter(filterList); 

     ResultScanner scanner = table.getScanner(scan); 
     for (Result result : scanner) 
     { 
      hbaseValues.add(result); 
     } 
    } 
} 

Заранее спасибо за помощь.

ответ

0

Хорошо, так что я не знаю, почему возникла исходная задача, но я исправить это сильно упрощает мой сканер:

Scan scan = new Scan(startRowString.getBytes(), endRowString.getBytes()); 
scan.addFamily("f1".getBytes()); 
ResultScanner scanner = table.getScanner(scan);    
for (Result r : scanner) 
{...} 

Основываясь на этом, проблема, кажется, на стороне HBase, а чем Шторм.

Смежные вопросы