У меня есть данные моего журнала в hbase в следующем формате.HBase обновить существующую строку
источник HBase таблица
---------------------
date(table key) word count
---------------------
2013/09/25 apple 5
2013/09/25 mangoes 2
2013/09/25 oranges 6
2013/09/25 apple 2
2013/09/25 mangoes 3
2013/09/25 mangoes 1
Dest стол (В таблице назначения, слово добавляется в качестве ключа и суммы подсчета как column.data после запуска MapReduce на 2013/09/25)
------------------
word(table key) count
------------------
apple 7
oranges 6
mangoes 6
Данные будут добавлены в исходную таблицу каждый день. Но я не хочу делать уменьшение карты для всех исходных данных таблицы. поэтому я попытался сделать сокращение карты только для данных, добавленных в этот день.
исходная таблица с новыми данными, добавленными в 2013/09/26.
---------------------
date(table key) word count
---------------------
2013/09/25 apple 5
2013/09/25 mangoes 2
2013/09/25 oranges 6
2013/09/25 apple 2
2013/09/25 mangoes 3
2013/09/25 mangoes 1
2013/09/26 apple 10
2013/09/26 oranges 20
, когда я делаю mapreduce только для данных 2013/09/26. Я получаю следующее в таблице dest.
Dest таблица с новыми данными (так как ключи одинаковы, счетчик для яблок и апельсинов обновляются с данными 2013/09/26 data.old до 2013/09/25 ушел):
------------------
word(table key) count
------------------
apple 10
oranges 10
mangoes 6
ожидается Dest таблица:
------------------
word(table key) count
------------------
apple 17
oranges 16
mangoes 6
Могу ли я карту уменьшить частичные данные и добавить счетчик в колонке подсчета целевой таблицы или мне нужно отобразить свести все данные каждый раз?
, если я смогу отобразить сокращенные частичные данные и обновить счет, как я могу это сделать. Где-то моя функция уменьшения карты.
функция Карта:
public void map(ImmutableBytesWritable row,Result value,Context context) throws IOException {
ImmutableBytesWritable key = new ImmutableBytesWritable(row.get());
String cf = "data";
String column1 = "word";
String column2 = "count";
String word = new String(result.getValue(Bytes.toBytes(cf),Bytes.toBytes(column1)));
Text t = new Text(word);
context.write(t,value);
}
Снижение функции:
public void reduce(Text key,Iterable<Result> values,Context context) throws IOException,InterruptedException {
int count=0;
String cf = "data";
String column = "count";
for(Result val :values) {
int d = Integer.parseInt(new String(result.getValue(Bytes.toBytes(cf),Bytes.toBytes(column))))
count += d;
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(cf.getBytes(), column.getBytes(), String.valueOf(count).getBytes());
context.write(null, put);
}