У меня есть работа с потоком искрообразования, и в этом я немного собираюсь, теперь я хочу вставить эти записи в HBase, но это не типичная вставка. Я хочу делать UPSERT, если для rowkey доступно, чем в значениях столбцов sum (newvalue + oldvalue). Кто-нибудь делится псевдокодом в java, как я могу это достичь?Hbase Upsert with Spark
ответ
Что-то вроде этого ...
byte[] rowKey = null; // Provided
Table table = null; // Provided
long newValue = 1000; // Provided
byte[] FAMILY = new byte[]{0}; // Defined
byte[] QUALIFIER = new byte[]{1}; // Defined
try {
Get get = new Get(rowKey);
Result result = table.get(get);
if (!result.isEmpty()) {
Cell cell = result.getColumnLatestCell(FAMILY, QUALIFIER);
newValue += Bytes.bytesToLong(cell.getValueArray(),cell.getValueOffset());
}
Put put = new Put(rowKey);
put.addColumn(FAMILY,QUALIFIER,Bytes.toBytes(newValue));
table.put(put);
} catch (Exception e) {
// Handle Exceptions...
}
Мы (сращивания машины [Open Source]) имеют некоторые довольно прохладно уроки с использованием искровых Streaming для хранения данных в HBase.
Проверить it вне. может быть интересно.
Я нашел способ ниже код псевдо: -
=========== Для UPSERT (Update и Insert) ===========
общественной недействительный HbaseUpsert (javaRDD < Строка> javaRDD) бросает IOException, ServiceException {
JavaPairRDD < ImmutableBytesWritable, Put > hbasePuts1 = javaRDD.mapToPair(
new PairFunction < Row, ImmutableBytesWritable, Put >() {
private static final long serialVersionUID = 1L;
public Tuple2 < ImmutableBytesWritable, Put > call(Row row) throws Exception {
if(HbaseConfigurationReader.getInstance()!=null)
{
HTable table = new HTable(HbaseConfigurationReader.getInstance().initializeHbaseConfiguration(), "TEST");
try {
String Column1 = row.getString(1);
long Column2 = row.getLong(2);
Get get = new Get(Bytes.toBytes(row.getString(0)));
Result result = table.get(get);
if (!result.isEmpty()) {
Cell cell = result.getColumnLatestCell(Bytes.toBytes("cf1"), Bytes.toBytes("Column2"));
Column2 += Bytes.toLong(cell.getValueArray(),cell.getValueOffset());
}
Put put = new Put(Bytes.toBytes(row.getString(0)));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("Column1"), Bytes.toBytes(Column1));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("Column2"), Bytes.toBytes(Column2));
return new Tuple2 < ImmutableBytesWritable, Put > (new ImmutableBytesWritable(), put);
} catch (Exception e) {
e.printStackTrace();
}
finally {
table.close();
}
}
return null;
}
});
hbasePuts1.saveAsNewAPIHadoopDataset(HbaseConfigurationReader.initializeHbaseConfiguration());
}
============== для конфигурации ========== ===== открытый класс HbaseConfigurationReader реализует Serializable {
static Job newAPIJobConfiguration1 =null;
private static Configuration conf =null;
private static HTable table= null;
private static HbaseConfigurationReader instance= null;
private static Log logger= LogFactory.getLog(HbaseConfigurationReader.class);
HbaseConfigurationReader() бросает MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { initializeHbaseConfiguration(); }
общественности статической HbaseConfigurationReader деЫпзЬапсе() бросает MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
if (instance == null) {
instance = new HbaseConfigurationReader();
}
return instance;
} общественности статической initializeHbaseConfiguration конфигурации() бросает MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { , если (CONF = = null) { conf = HBaseConfiguration.create(); conf.set ("hbase.zookeeper.quorum", "localhost"); conf.set ("hbase.zookeeper.property.clientPort", "2181"); HBaseAdmin.checkHBaseAvailable (conf); стол = новый HTable (conf, "TEST"); conf.set (org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, "TEST"); try { новый APIJobConfiguration1 = Job.getInstance (conf); newAPIJobConfiguration1.getConfiguration(). Set (TableOutputFormat.OUTPUT_TABLE, "TEST"); newAPIJobConfiguration1.setOutputFormatClass (org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); } catch (IOException e) { e.printStackTrace(); }
}
else
logger.info("Configuration comes null");
return newAPIJobConfiguration1.getConfiguration();
}}
- 1. Meteor upsert with mongo
- 2. Upsert with $ in
- 3. Bulk upsert with SQLAlchemy
- 4. Upsert with .net sqladapter
- 5. Spark Closures with Array
- 6. Spark Streaming с Hbase
- 7. Spark Streaming: source HBase
- 8. Hbase using spark-sql
- 9. Снимки Spark и HBase
- 10. Hbase Spark Connector
- 11. Формат timestamp для phoenix upsert в hbase?
- 12. MongoDB C# Upsert with Guid
- 13. Spark to Hbase using Oozie
- 14. Проблема с соединением Spark Hbase
- 15. Hbase Spark RDD JSON Column
- 16. Совместимость версий Spark и HBase
- 17. Spark insert to HBase slow
- 18. Загрузка таблицы Hbase на Spark -
- 19. Открыть несколько таблиц Hbase Spark
- 20. Spark Dataframes UPSERT to Postgres Таблица
- 21. C# Mongdb.Dynamic Method Missing Exception with upsert()
- 22. Meteor 1.0: Upsert with Mongo Selector
- 23. Upsert with Mongoskin (node.js and mongodb)
- 24. Lily with Morphline и HBase
- 25. NoClassDefFoundError with spark
- 26. Apache spark with python
- 27. IllegalArgumentException with Spark 1.6
- 28. Spark with Cython
- 29. Mongodb with Spark
- 30. Spark workflow with jar
Кроме того, вы можете рассмотреть приращение HBase в зависимости от типа добавляемого ... –
Привет Джон, Спасибо за ваш ответ его рабочий, но нашли время для 500 mb его взятие вокруг 1 часа в искре есть ли какой-либо способ, которым Вы знаете о массовом обновлении вида вещи? – ankitbeohar90
Я изучаю машину для сращивания, также посетил вебинар «Лямбда-в-коробке» и отправил одно сообщение в адрес «Thomas Ryan» и ожидал ответа, я ценю работу Splice Machine, можете ли вы рассказать мне, какой учебник мне следует проверьте, потому что я проверил много, но не смог найти какой-либо конкретной. Снова спасибо тонне .... – ankitbeohar90