Наш трубопровод выглядит следующим образом:Как использовать 'придавить' правильно DataFlow
ГКС (GZ сжатых файлов) -> Pardo -> BigQuery
Я хочу использовать 'придавить' совок в нескольких файлах из GCS в качестве ввода в мой конвейер. Но он продолжает borking с ошибкой:
Workflow failed. Causes: (5001e5764f46ac2c): BigQuery creation of import job for table "Impressions_05_2015_denormalized_test" in dataset "CPT_XXXX" in project "gdfp-XXXX" failed. Causes: (5001e5764f46a1cf): Error:
Message: Load configuration must specify at least one source URI
HTTP Code: 400
Код:
PCollection<String> file1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_21.gz").withCompressionType(TextIO.CompressionType.GZIP));
PCollection<String> file2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_22.gz").withCompressionType(TextIO.CompressionType.GZIP));
PCollectionList<String> allFiles = PCollectionList.of(file1).and(file2);
PCollection<String> inputRead = allFiles.apply(Flatten.<String>pCollections());
inputRead.apply(ParDo.of(transformation)
.named(String.format("%s-CPT-transform", type))
.withSideInputs(views))
.apply(Write.to(getOutputTable(type))
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_APPEND)
.withSchema(schema)
.named(String.format("%s-BQ-write", type)));
Пример задания ID: 2015-05-12_19_54_06-10158770219525037626
Что я делаю неправильно?
Свести выглядит отлично. «Импортное задание» сообщения об ошибке используется для передачи данных в BQ. –
Да, я уверен, что код сглаживания в порядке. Похож на ошибку в шаге записи BigQuery. –