Я использую следующий код для заполнения данных в Bigtable:Наполнение данных в Google Cloud Bigtable занимает много времени
CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder()
.withConfiguration("clusterId", options.getBigTableClusterId())
.withProjectId(options.getProject())
.withInstanceId(options.getBigTableInstanceId())
.withTableId(options.getOutputBTTable())
.build();
Pipeline p = Pipeline.create(options);
/**
* Read Data from Big Query
*/
CloudBigtableIO.initializeForWrite(p);
p.apply(BigQueryIO.Read.fromQuery(getQuery(options.getDate())))
.apply(ParDo.of(new DoFn<TableRow, Mutation>() {
public void processElement(ProcessContext c) {
Mutation output = convertDataToRow(c.element());
if (output != null) {
c.output(output);
};
}
}))
.apply(CloudBigtableIO.writeToTable(config));
p.run();
private static Mutation convertDataToRow(TableRow element) {
LOG.info("element: "+ element);
if(element.get("BASM_AID") != null){
Put obj = new Put(getRowKey(element).getBytes()).addColumn(SEGMENT_FAMILY, SEGMENT_COLUMN_NAME, ((String)element.get("BAS_category")).getBytes());
obj.addColumn(USER_FAMILY, AID, ((String)element.get("BASM_AID")).getBytes());
if(element.get("BASM_segment_id") != null){
obj.addColumn(SEGMENT_FAMILY, SEGMENT_ID, ((String)element.get("BASM_segment_id")).getBytes());
}
if(element.get("BAS_sub_category") != null){
obj.addColumn(SEGMENT_FAMILY, SUB_CATEGORY, ((String)element.get("BAS_sub_category")).getBytes());
}
if(element.get("BAS_name") != null){
obj.addColumn(SEGMENT_FAMILY, NAME, ((String)element.get("BAS_name")).getBytes());
}
if(element.get("BAS_description") != null){
obj.addColumn(SEGMENT_FAMILY, DESCRIPTION, ((String)element.get("BAS_description")).getBytes());
}
if(element.get("BASM_krux_user_id") != null){
obj.addColumn(USER_FAMILY, KRUX_USER_ID, ((String)element.get("BASM_krux_user_id")).getBytes());
}
if(element.get("BAS_last_compute_day") != null){
obj.addColumn(SEGMENT_FAMILY, LAST_COMPUTE_DAY, ((String)element.get("BAS_last_compute_day")).getBytes());
}
if(element.get("BAS_type") != null){
obj.addColumn(SEGMENT_FAMILY, TYPE, ((String)element.get("BAS_type")).getBytes());
}
if(element.get("BASM_REGID") != null){
obj.addColumn(USER_FAMILY, REGID, ((String)element.get("BASM_REGID")).getBytes());
}
return obj;
}else{
return null;
}
}
У нас есть 30 Bigtable Узлов и моя работа потока данных, с которыми работает 100 человек, в целом процесс должен обрабатывать около 10 миллиардов рядов данных, причем указанная выше конфигурация моей работы занимает более одного дня, чтобы завершить, что не идеально.
Любые предложения на уровне кода, с помощью которых мы можем работать быстрее, я знаю, что увеличение количества узлов Bigtable является одним из вариантов, но в настоящее время я ищу другие варианты, в которых нам не нужно увеличивать узлы.