Я пишу топологию Storm для чтения данных из HBase с использованием DRPC. По сути, это выполняет проверку для получения данных, обогащает данные и возвращает их.Болт Storm-hbase не работает над DRPC
Я могу легко получить базовый пример DRPC, работающий (на основе http://storm.apache.org/releases/current/Distributed-RPC.html). Однако, когда я вставляю код для сканирования, процесс занимает очень много времени. Через минуту, я получаю следующее сообщение об ошибке:
backtype.storm.generated.DRPCExecutionException
в backtype.storm.daemon.drpc $ service_handler $ reify__8688.failRequest (drpc.clj: 136) ~ [Шторм core-0.10.0.2.4.2.0-258.jar: 0.10.0.2.4.2.0-258]
at backtype.storm.drpc.DRPCSpout.fail (DRPCSpout.java:241) ~ [storm-core -0.10.0.2.4.2.0-258.jar: 0.10.0.2.4.2.0-258
Через некоторое время я получаю org.apache.hadoop.hbase.client.RetriesExhaustedException. Это не всегда происходит, но очень распространено. Мое предположение, основанное на этом, является одной из двух возможностей:
Отсканированное время. Однако выполнение сканирования через HBase Shell или REST возвращает менее чем за секунду Таблица несовместима, в результате чего определенная область отсутствует. Я запустил hbase hbck, и он показывает 0 несоответствий. Я знаю, что соединение с HBase в порядке: я добавил результат отладки, и болт получает результаты. Однако из-за исключения DRPCExecutionException эти результаты никогда не возвращаются по DRPC.
Я, хотя проблема была таймаутом DRPC, однако я увеличил тайм-аут DRPC много, и я получаю тот же результат за такое же количество времени. После Googling я нашел кого-то другого с той же проблемой ([Storm][DRPC] Request failed), но нет никаких указаний, как это исправить.
Для справки я добавляю мой код ниже:
try (Table table = HbaseClient.connection().getTable(TableName.valueOf("EPG_URI")))
{
List<Filter> filters = new ArrayList<>();
String startRowString = "start";
String endRowString = "end";
RowFilter startRow = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryPrefixComparator(startRowString.getBytes()));
filters.add(startRow);
RowFilter endRow = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryPrefixComparator(endRowString.getBytes()));
filters.add(endRow);
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filters);
Scan scan = new Scan();
scan.addFamily("f1".getBytes());
scan.setFilter(filterList);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner)
{
hbaseValues.add(result);
}
}
}
Заранее спасибо за помощь.