Я пишу проект для получения данных от Kafka и записи в таблицу Hbase. Поскольку я хочу знать дифференциал записей, мне нужно сначала записать запись с одной и той же rowkey в Hbase, а затем выполнить вычитание с принятой записью и, наконец, сохранить новые записи в таблице HBase.Чтение данных Hbase в Spark Streaming
В начале я попытался использовать newAPIHadoop
для получения данных из hbase. Вот моя попытка:
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", zkQuorum)
conf.set("hbase.master", masterAddr)
conf.set("hbase.zookeeper.property.clientPort", portNum)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName)
val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
По этому пути, я могу получить значение записей с конкретной семьей столбца и именем столбца ТОЛЬКО РАЗ. Говоря только один раз, я имею в виду, что каждый раз, когда я запускаю свое искрообразующее приложение, этот фрагмент кода будет выполнен, и я могу получить значение, но оно больше не будет выполнено. Потому что я хочу читать свои записи из HBase с cf и column каждый раз, когда я получаю запись от Kafka, это не работает для меня.
Чтобы решить эту проблему, я переведу логику на foreachRDD()
, но, к сожалению, искробезопасность кажется не сериализуемой. Я получил ошибку, как task is not serialzable
.
Наконец, я нашел, что есть другой способ читать данные из hbase, используя hbase.clinet HTable. Так что это моя последняя работа:
def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = {
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set("hbase.zookeeper.quorum", "xxxxxx")
conf.set("hbase.master", "xxxx")
conf.set("hbase.zookeeper.property.clientPort", "xxx")
conf.set(TableInputFormat.INPUT_TABLE, "xx")
conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx")
val testTable = new HTable(conf, "testTable")
val scan = new Scan
scan.addColumn("cf1".getBytes, "test".getBytes)
val rs = testTable.getScanner(scan)
var r = rs.next()
val res = new StringBuilder
while(r != null){
val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes))
res.append(tmp)
r= rs.next()
}
val res = res.toString
//do the following manipulations and return object (ImmutableBytesWritable, Put)
..............................
.......................
}
В основной метод я использую выше метод в foreachRDD и сохранить в HBase с помощью метода saveAsNewAPIHadoopDataset
streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration))
Сейчас это работает хорошо для меня, но у меня есть вопросы об этом процессе:
Таким образом, я думаю, для каждого раздела RDD будет создано соединение с HBase. Мне интересно, можно ли масштабировать мое приложение. Скажем, если у меня более 1000 записей за 1 секунду, похоже, что в моей искровой потоковой передаче будет установлено 1000 соединений.
Это правильный способ делать чтение данных с hbase? Какова наилучшая практика для чтения данных из HBase в sparkStreaming? Или искровой поток не должен читать какие-либо данные, он просто предназначен для записи данных потока в БД.
Заранее спасибо.
Спасибо, что ответили на мой вопрос. Я попробовал ваше решение, передав conf как параметр в методе transferToHBasePut. Но, как вы сказали, foreach выполняет каждый отдельный процесс jvm-исполнителей, singleton не может быть перенесен из драйвера в рабочего. Я думаю, что это потому, что конфигурация не может быть запрограммирована. Наконец, я обнаружил, что в RDD можно использовать метод, называемый foreachPartition. Этот метод пообещает, что соединение устанавливается только один раз для раздела RDD. – Frankie