2015-05-13 2 views
4

Наш трубопровод выглядит следующим образом:Как использовать 'придавить' правильно DataFlow

ГКС (GZ сжатых файлов) -> Pardo -> BigQuery

Я хочу использовать 'придавить' совок в нескольких файлах из GCS в качестве ввода в мой конвейер. Но он продолжает borking с ошибкой:

Workflow failed. Causes: (5001e5764f46ac2c): BigQuery creation of import job for table "Impressions_05_2015_denormalized_test" in dataset "CPT_XXXX" in project "gdfp-XXXX" failed. Causes: (5001e5764f46a1cf): Error: 
Message: Load configuration must specify at least one source URI 
HTTP Code: 400 

Код:

PCollection<String> file1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_21.gz").withCompressionType(TextIO.CompressionType.GZIP)); 
     PCollection<String> file2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_22.gz").withCompressionType(TextIO.CompressionType.GZIP)); 
     PCollectionList<String> allFiles = PCollectionList.of(file1).and(file2); 
     PCollection<String> inputRead = allFiles.apply(Flatten.<String>pCollections()); 
inputRead.apply(ParDo.of(transformation) 
       .named(String.format("%s-CPT-transform", type)) 
       .withSideInputs(views)) 
       .apply(Write.to(getOutputTable(type)) 
         .withCreateDisposition(CREATE_IF_NEEDED) 
         .withWriteDisposition(WRITE_APPEND) 
         .withSchema(schema) 
         .named(String.format("%s-BQ-write", type))); 

Пример задания ID: 2015-05-12_19_54_06-10158770219525037626

Что я делаю неправильно?

+0

Свести выглядит отлично. «Импортное задание» сообщения об ошибке используется для передачи данных в BQ. –

+0

Да, я уверен, что код сглаживания в порядке. Похож на ошибку в шаге записи BigQuery. –

ответ

2

вместо хака предлагаемого, который на самом деле довольно угловатый, я вместо того, чтобы написать один пустую строку в методе finishBundle(). Это будет писать 1 пустую строку на один пучок, но мы можем жить с этим до тех пор, пока исправление не будет развернуто. Установка «id» упрощает фильтрацию этих строк позже.

Кроме того, этот способ/хак намного проще реализовать:

@Override 
public void finishBundle(Context c) throws Exception { 
    TableRow workaroundRow = new TableRow(); 
    workaroundRow.set("id", "workaround_row"); 
    c.output(workaroundRow); //Workaround to http://goo.gl/CpBxEf 
} 
1

На нашем конце появляется ошибка при записи пустого PCollection в BigQuery. У меня есть вопрос о проблеме - мы исправим ее как можно скорее и последуем за ней.

Если вам нужно создать результат PCollection, который может быть пустым, и вы можете обрабатывать одну пустую строку, добавляемую в таблицу BigQuery, в случае пустого результата, вы можете временно обойти проблему для Теперь с помощью этого хака:

// Temporary hack around a temporary bug writing empty PCollections to BigQuery by 
// creating a single empty row if a PCollection<TableRow> is empty. 
static class AddEmptyRowIfEmpty 
     extends PTransform<PCollection<TableRow>, PCollection<TableRow>> { 

    @Override 
    public PCollection<TableRow> apply(PCollection<TableRow> maybeEmpty) { 

     // Build a PCollection that contains no elements if 'maybeEmpty' has elements, or 
     // exactly one empty TableRow if 'maybeEmpty' is empty. 
     final PCollectionView<Iterable<TableRow>> maybeEmptyView = maybeEmpty.apply(
       View.<TableRow>asIterable()); 
     PCollection<TableRow> singleRowIfMaybeEmptyIsEmpty = 
       maybeEmpty.getPipeline() 
        .apply(Create.of((Void) null)).setCoder(VoidCoder.of()) 
        .apply(ParDo.of(
         new DoFn<Void, TableRow>() { 
          @Override 
          public void processElement(ProcessContext c) { 
          Iterator<TableRow> rows = c.sideInput(maybeEmptyView).iterator(); 
          if (!rows.hasNext()) { 
           c.output(new TableRow()); 
          } 
          } 
         }).withSideInputs(maybeEmptyView)); 

     // Return a PCollection with at least one element. 
     return PCollectionList.of(singleRowIfMaybeEmptyIsEmpty).and(maybeEmpty) 
       .apply(Flatten.<TableRow>pCollections()); 

    } 
} 

// Then in your pipeline: 
... 
.apply(new AddEmptyRowIfEmpty()) 
.apply(BigQueryIO.Write(...)) 
+0

У вас есть общая идея, когда исправление будет выпущено? –

+0

У вас есть приблизительная идея, когда исправление будет выпущено? –

Смежные вопросы