2016-08-09 7 views
1

У меня есть работа с потоком искрообразования, и в этом я немного собираюсь, теперь я хочу вставить эти записи в HBase, но это не типичная вставка. Я хочу делать UPSERT, если для rowkey доступно, чем в значениях столбцов sum (newvalue + oldvalue). Кто-нибудь делится псевдокодом в java, как я могу это достичь?Hbase Upsert with Spark

ответ

1

Что-то вроде этого ...

byte[] rowKey = null; // Provided 
Table table = null; // Provided 
long newValue = 1000; // Provided 
byte[] FAMILY = new byte[]{0}; // Defined 
byte[] QUALIFIER = new byte[]{1}; // Defined 

try { 
    Get get = new Get(rowKey); 
    Result result = table.get(get); 
    if (!result.isEmpty()) { 
     Cell cell = result.getColumnLatestCell(FAMILY, QUALIFIER); 
     newValue += Bytes.bytesToLong(cell.getValueArray(),cell.getValueOffset()); 
    } 
    Put put = new Put(rowKey); 
    put.addColumn(FAMILY,QUALIFIER,Bytes.toBytes(newValue)); 
    table.put(put); 
} catch (Exception e) { 
    // Handle Exceptions... 
} 

Мы (сращивания машины [Open Source]) имеют некоторые довольно прохладно уроки с использованием искровых Streaming для хранения данных в HBase.

Проверить it вне. может быть интересно.

+0

Кроме того, вы можете рассмотреть приращение HBase в зависимости от типа добавляемого ... –

+0

Привет Джон, Спасибо за ваш ответ его рабочий, но нашли время для 500 mb его взятие вокруг 1 часа в искре есть ли какой-либо способ, которым Вы знаете о массовом обновлении вида вещи? – ankitbeohar90

+0

Я изучаю машину для сращивания, также посетил вебинар «Лямбда-в-коробке» и отправил одно сообщение в адрес «Thomas Ryan» и ожидал ответа, я ценю работу Splice Machine, можете ли вы рассказать мне, какой учебник мне следует проверьте, потому что я проверил много, но не смог найти какой-либо конкретной. Снова спасибо тонне .... – ankitbeohar90

0

Я нашел способ ниже код псевдо: -

=========== Для UPSERT (Update и Insert) ===========

общественной недействительный HbaseUpsert (javaRDD < Строка> javaRDD) бросает IOException, ServiceException {

 JavaPairRDD < ImmutableBytesWritable, Put > hbasePuts1 = javaRDD.mapToPair(

     new PairFunction < Row, ImmutableBytesWritable, Put >() { 

     private static final long serialVersionUID = 1L; 
    public Tuple2 < ImmutableBytesWritable, Put > call(Row row) throws Exception { 

      if(HbaseConfigurationReader.getInstance()!=null) 
      { 
      HTable table = new HTable(HbaseConfigurationReader.getInstance().initializeHbaseConfiguration(), "TEST"); 

     try { 

      String Column1 = row.getString(1); 
      long Column2 = row.getLong(2); 
      Get get = new Get(Bytes.toBytes(row.getString(0))); 
       Result result = table.get(get); 
       if (!result.isEmpty()) { 
        Cell cell = result.getColumnLatestCell(Bytes.toBytes("cf1"), Bytes.toBytes("Column2")); 
        Column2 += Bytes.toLong(cell.getValueArray(),cell.getValueOffset()); 
       } 
      Put put = new Put(Bytes.toBytes(row.getString(0))); 
      put.add(Bytes.toBytes("cf1"), Bytes.toBytes("Column1"), Bytes.toBytes(Column1)); 
      put.add(Bytes.toBytes("cf1"), Bytes.toBytes("Column2"), Bytes.toBytes(Column2)); 
      return new Tuple2 < ImmutableBytesWritable, Put > (new ImmutableBytesWritable(), put); 

     } catch (Exception e) { 

      e.printStackTrace(); 
     } 
     finally { 
      table.close(); 
     } 
      } 
     return null; 
     } 
     }); 

    hbasePuts1.saveAsNewAPIHadoopDataset(HbaseConfigurationReader.initializeHbaseConfiguration()); 

    } 

============== для конфигурации ========== ===== открытый класс HbaseConfigurationReader реализует Serializable {

static Job newAPIJobConfiguration1 =null; 
private static Configuration conf =null; 
private static HTable table= null; 
private static HbaseConfigurationReader instance= null; 

private static Log logger= LogFactory.getLog(HbaseConfigurationReader.class); 

HbaseConfigurationReader() бросает MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { initializeHbaseConfiguration(); }

общественности статической HbaseConfigurationReader деЫпзЬапсе() бросает MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {

if (instance == null) { 
    instance = new HbaseConfigurationReader(); 
} 

return instance; 

} общественности статической initializeHbaseConfiguration конфигурации() бросает MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { , если (CONF = = null) { conf = HBaseConfiguration.create(); conf.set ("hbase.zookeeper.quorum", "localhost"); conf.set ("hbase.zookeeper.property.clientPort", "2181"); HBaseAdmin.checkHBaseAvailable (conf); стол = новый HTable (conf, "TEST"); conf.set (org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, "TEST"); try { новый APIJobConfiguration1 = Job.getInstance (conf); newAPIJobConfiguration1.getConfiguration(). Set (TableOutputFormat.OUTPUT_TABLE, "TEST"); newAPIJobConfiguration1.setOutputFormatClass (org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); } catch (IOException e) { e.printStackTrace(); }

} 

else 
    logger.info("Configuration comes null"); 

return newAPIJobConfiguration1.getConfiguration(); 

}}