2015-11-11 2 views
1

У меня есть таблица с именем «sample» в hbase. Мне нужно запросить таблицу, используя запрос Apache spark-sql. Есть ли способ читать данные hbase, используя запрос Apache spark-sql?Hbase using spark-sql

ответ

6

Спарк SQL является двигателем запроса в памяти, чтобы выполнить некоторые операции запроса с использованием искровой SQL поверх HBase таблицы нужно

  1. Fetch данных из HBase используя искру и создать искру RDD

    SparkConf sparkConf = new SparkConf(); 
    sparkConf.setAppName("SparkApp"); 
    sparkConf.setMaster("local[*]"); 
    
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); 
    
    Configuration config = HBaseConfiguration.create(); 
    config.addResource(new Path("/etc/hbase/hbase-site.xml")); 
    config.addResource(new Path("/etc/hadoop/core-site.xml")); 
    config.set(TableInputFormat.INPUT_TABLE, "sample"); 
    
    JavaPairRDD<ImmutableBytesWritable, Result> hbaseRDD = javaSparkContext.newAPIHadoopRDD(hbaseConfig, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); 
    
    JavaRDD<StudentBean> sampleRDD = hbaseRDD.map(new Function<Tuple2<ImmutableBytesWritable,Result>, StudentBean >() { 
        private static final long serialVersionUID = -2021713021648730786L; 
        public StudentBean call(Tuple2<ImmutableBytesWritable, Result> tuple) { 
         StudentBean bean = new StudentBean (); 
         Result result = tuple._2; 
         bean.setRowKey(rowKey); 
         bean.setFirstName(Bytes.toString(result.getValue(Bytes.toBytes("details"), Bytes.toBytes("firstName")))); 
         bean.setLastName(Bytes.toString(result.getValue(Bytes.toBytes("details"), Bytes.toBytes("lastName")))); 
         bean.setBranch(Bytes.toString(result.getValue(Bytes.toBytes("details"), Bytes.toBytes("branch")))); 
         bean.setEmailId(Bytes.toString(result.getValue(Bytes.toBytes("details"), Bytes.toBytes("emailId")))); 
         return bean; 
        } 
    }); 
    
  2. Создать объект DataFrame с помощью этого RDD и зарегистрировать это с некоторым временным именем таблицы, а затем вы можете выполнить ваш запрос

    DataFrame schema = sqlContext.createDataFrame(sampleRDD, StudentBean.class); 
    schema.registerTempTable("spark_sql_temp_table"); 
    
    DataFrame schemaRDD = sqlContext.sql("YOUR_QUERY_GOES_HERE"); 
    
    JavaRDD<StudentBean> result = schemaRDD.toJavaRDD().map(new Function<Row, StudentBean>() { 
    
        private static final long serialVersionUID = -2558736294883522519L; 
    
        public StudentBean call(Row row) throws Exception { 
         StudentBean bean = new StudentBean(); 
         // Do the mapping stuff here 
         return bean; 
        } 
    });