2016-08-04 3 views
0

Раньше имел PCollection formattedResults; и я использую ниже код для вставки строк в большом запросе:Вставить данные в BigQuery из Dataflow

    // OPTION 1 
PCollection<TableRow> formattedResults = .... 
formattedResults.apply(BigQueryIO.Write.named("Write").to(tableName) 
          .withSchema(tableSchema) 
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 

И все строки непосредственно вставлены в BigQuery, все хорошо, пока здесь. Но теперь я начал динамически определить имя таблицы и ее строки так творю PCollection, как показано ниже: (String будет имя таблицы, а затем ее строка в качестве значения)

PCollection<KV<String, TableRow>> tableRowMap // OPTION 2 

Кроме того, я создаю группу строк который будет идти в той же таблице, как:

PCollection<KV<String, Iterable<TableRow>>> groupedRows //OPTION 3 

где ключ (String) является имя таблицы BQ и значением является список строк, которые будут вставлены в BQ.

С опцией 1 я смог легко вставлять строки в BQ, используя код, показанный выше, но тот же код не может использоваться с OPTION 2 или OPTION 3, потому что в этом случае мое имя таблицы является ключевым в карте. Есть ли способ вставить строки в таблицу с помощью OPTION 2 или OPTION 3. Любая ссылка или образец кода будет большой помощью.

ответ

1

Ближайшая вещь, которую Dataflow записывает в таблицу для каждого окна (и вы можете создать свой собственный подкласс BoundedWindow и WindowFn для включения любых данных, которые вы хотите в окне). Для этого используйте

to(SerializableFunction<BoundedWindow,String> tableSpecFunction) 

на BigQueryIO.Write.

Обратите внимание, что эта функция использует функцию потоковой загрузки BigQuery, которая ограничена 100 МБ/с на таблицу. Кроме того, загрузка не является атомарной, поэтому неудачное пакетное задание может загружать только часть вывода.

-1

У вас также есть возможность создать собственный DoFn, который напрямую вставляет данные в bigquery, вместо того, чтобы полагаться на BigQueryIO.Write. Технически вам нужно создать BigQueryTableInserter, вы можете использовать insertAll(TableReference ref, List<TableRow> rowList), чтобы вставить материал в нужный вам стол.

Вы можете создать TableReference используя что-то вроде: new TableReference().setProjectId("projectfoo").setDatasetId("datasetfoo").setTableId("tablefoo")

Это не 100%, как рекомендуемое BigQueryIO делает некоторые хороший материал, чтобы разделить строки, которые нужно вставляя, чтобы максимизировать пропускную способность и обрабатывает попыток правильно.

Смежные вопросы