2016-09-29 3 views
3

Я пишу проект для получения данных от Kafka и записи в таблицу Hbase. Поскольку я хочу знать дифференциал записей, мне нужно сначала записать запись с одной и той же rowkey в Hbase, а затем выполнить вычитание с принятой записью и, наконец, сохранить новые записи в таблице HBase.Чтение данных Hbase в Spark Streaming

В начале я попытался использовать newAPIHadoop для получения данных из hbase. Вот моя попытка:

val conf = HBaseConfiguration.create() 
conf.set("zookeeper.znode.parent", "/hbase-secure") 
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) 
conf.set("hbase.zookeeper.quorum", zkQuorum) 
conf.set("hbase.master", masterAddr) 
conf.set("hbase.zookeeper.property.clientPort", portNum) 
conf.set(TableInputFormat.INPUT_TABLE, tableName) 
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName) 

val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf, 
     classOf[TableInputFormat], 
     classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
     classOf[org.apache.hadoop.hbase.client.Result]) 

По этому пути, я могу получить значение записей с конкретной семьей столбца и именем столбца ТОЛЬКО РАЗ. Говоря только один раз, я имею в виду, что каждый раз, когда я запускаю свое искрообразующее приложение, этот фрагмент кода будет выполнен, и я могу получить значение, но оно больше не будет выполнено. Потому что я хочу читать свои записи из HBase с cf и column каждый раз, когда я получаю запись от Kafka, это не работает для меня.

Чтобы решить эту проблему, я переведу логику на foreachRDD(), но, к сожалению, искробезопасность кажется не сериализуемой. Я получил ошибку, как task is not serialzable.

Наконец, я нашел, что есть другой способ читать данные из hbase, используя hbase.clinet HTable. Так что это моя последняя работа:

def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = { 
    val conf = HBaseConfiguration.create() 
    conf.set("zookeeper.znode.parent", "/hbase-secure") 
    conf.set("hbase.zookeeper.quorum", "xxxxxx") 
    conf.set("hbase.master", "xxxx") 
    conf.set("hbase.zookeeper.property.clientPort", "xxx") 
    conf.set(TableInputFormat.INPUT_TABLE, "xx") 
    conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx") 

    val testTable = new HTable(conf, "testTable") 
    val scan = new Scan 
    scan.addColumn("cf1".getBytes, "test".getBytes) 
    val rs = testTable.getScanner(scan) 

    var r = rs.next() 
    val res = new StringBuilder 
    while(r != null){ 
     val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes)) 

     res.append(tmp) 
     r= rs.next() 
    } 
val res = res.toString 

//do the following manipulations and return object (ImmutableBytesWritable, Put) 
     .............................. 
     ....................... 
      } 

В основной метод я использую выше метод в foreachRDD и сохранить в HBase с помощью метода saveAsNewAPIHadoopDataset

streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration)) 

Сейчас это работает хорошо для меня, но у меня есть вопросы об этом процессе:

Таким образом, я думаю, для каждого раздела RDD будет создано соединение с HBase. Мне интересно, можно ли масштабировать мое приложение. Скажем, если у меня более 1000 записей за 1 секунду, похоже, что в моей искровой потоковой передаче будет установлено 1000 соединений.

Это правильный способ делать чтение данных с hbase? Какова наилучшая практика для чтения данных из HBase в sparkStreaming? Или искровой поток не должен читать какие-либо данные, он просто предназначен для записи данных потока в БД.

Заранее спасибо.

ответ

0

foreachRDD выполняет на отдельных исполнителей процесс jvm. По крайней мере, вы можете получить экземпляр singleton conf (означает наличие нулевой проверки перед использованием существующего набора conf процесса jvm или new conf) в методе transferToHBasePut. Таким образом, это уменьшит количество подключений Hbase к числу исполнителей, порожденных в вашем Spark-кластере.

Надеется, что это помогает ...

+0

Спасибо, что ответили на мой вопрос. Я попробовал ваше решение, передав conf как параметр в методе transferToHBasePut. Но, как вы сказали, foreach выполняет каждый отдельный процесс jvm-исполнителей, singleton не может быть перенесен из драйвера в рабочего. Я думаю, что это потому, что конфигурация не может быть запрограммирована. Наконец, я обнаружил, что в RDD можно использовать метод, называемый foreachPartition. Этот метод пообещает, что соединение устанавливается только один раз для раздела RDD. – Frankie

3

После некоторого обучения, создать конфигурацию для каждого раздела РДА. Проверьте шаблон проектирования для foreachRDD по адресу: Spark Streaming official website. На самом деле конфигурация не является соединением, поэтому я не знаю, как получить соединение из существующего пула соединений, чтобы получить и поместить запись для Hbase.

+0

Вы достигли чтения из HBase с искровым потоком? Я могу только прочитать его, открыв соединение для каждого из данных. Каков способ сделать это? – zorkaya

Смежные вопросы