2015-07-06 4 views
1

Я пытаюсь прочитать все записи из таблицы Hbase. Ниже приведен фрагмент кода.Извлечь все записи из HBase, используя SparkSQL

SparkContext sparkContext = new SparkContext(conf); 

    SQLContext sqlContext = new SQLContext(sparkContext); 

    Configuration hbaseConf = HBaseConfiguration.create(); 

    hbaseConf.set("hbase.master", "localhost:60000"); 
    hbaseConf.setInt("timeout", 120000); 
    hbaseConf.set("hbase.zookeeper.quorum", "localhost"); 
    hbaseConf.set("zookeeper.znode.parent", "/hbase-unsecure"); 
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "Test"); 

    DataFrame df = sqlContext.createDataFrame(sparkContext.newAPIHadoopRDD(hbaseConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class), TestBean.class); 

    df.registerTempTable("TempTest"); 
    df.show(); 

В df.show() я получаю ниже ошибок

java.lang.IllegalArgumentException: object is not an instance of declaring class at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

Всех указатели, почему я сталкиваюсь с этой проблемой.

+0

Трассировка стека поможет, но я подозреваю, что у вас есть несовпадающие версии библиотек где-то. –

ответ

0

Вы пытаетесь создать DataFrame из РДА, состоящий из пар:

org.apache.hadoop.hbase.io.ImmutableBytesWritable  
org.apache.hadoop.hbase.client.Result 

Вы должны прочитать свой hBaseRDD:

val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], 
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
    classOf[org.apache.hadoop.hbase.client.Result]); 

Тогда преобразование (ImmutableBytesWritable, результат) кортежи в РДУ от результата:

val resultRDD = hBaseRDD.map(tuple => tuple._2) 

Затем преобразуйте в RDD строк, которые могут быть преобразованы int o DataFrame.

В качестве примера предположим, что у вас есть таблица Hbase с ключом, который содержит два значения «value1_value2», можно разобрать ключ (sperated на «_») с помощью:

val keyValueRDD = resultRDD.map(result =>  (Bytes.toString(result.getRow()).split("_")(0), Bytes.toString(result.getRow()).split("_")(1), Bytes.toFloat(result.value()))) 

Теперь вы можете создать dataframe со значениями в вашем «_» отделенного ключа:

import sqlContext.implicits._ 
    val df = keyValueRDD.toDF("value1", "value2"); 
    df.registerTempTable("Table") 
    sqlContext.sql("SELECT * FROM Table Limit 5").show() 

чтобы полностью карты таблицы Hbase к dataFrame вам нужно:

  1. Создать тематический класс: (за пределы вашего объекта)

    case class TestRow(rowkey: String, value1: String, value2: String, value3: Float, value4: Float) 
    
  2. Определите вашу семью столбца в байтах:

    final val cfTest = "te" 
    final val cfTestBytes = Bytes.toBytes(cfTest) 
    
  3. Разбирает Результаты:

    object TestRow { 
        def parseTestRow(result: Result): TestRow = { 
         val rowkey = Bytes.toString(result.getRow()) 
    
         val p0 = rowkey 
         val p1 = Bytes.toString(result.getValue(cfTestBytes, Bytes.toBytes("currency"))) 
         val p2 = Bytes.toString(result.getValue(cfTestBytes, Bytes.toBytes("asat"))) 
         val p3 = Bytes.toFloat(result.getValue(cfTestBytes, Bytes.toBytes("m_aed"))) 
         val p4 = Bytes.toFloat(result.getValue(cfTestBytes, Bytes.toBytes("m_usd"))) 
         TestRow(p0, p1, p2, p3, p4) 
        } 
        } 
    
  4. Создать Dataframe

    val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], 
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
    classOf[org.apache.hadoop.hbase.client.Result]); 
    
    val resultRDD = hBaseRDD.map(tuple => tuple._2) 
    val testRDD = resultRDD.map(TestRow.parseTestRow) 
    import sqlContext.implicits._ 
    val testDF = testRDD.toDF() 
    testDF.registerTempTable("Test") 
    sqlContext.sql("SELECT count(*) FROM Test").show() 
    
Смежные вопросы