Я использую Python (2.7) и работаю в среде DataFlow Google, и, разумеется, Google еще не полностью очистил все, и документации пока недостаточно. Тем не менее, часть для записи из Dataflow в BigQuery приведена здесь BigQuery Sink.Почему мой Python BigQuery Dataflow не вставляет записи в базу данных?
Согласно документации, с тем чтобы определить схему, вам нужно ввести строку:
schema = 'field_1:STRING, field_2:STRING, field_3:STRING, created_at:TIMESTAMP, updated_at:TIMESTAMP, field_4:STRING, field_5:STRING'
Имя таблицы, ID проекта и набор данных ID, как это: «example_project_id: example_dataset_id.example_table_name»
Теперь все это работает. См. Код ниже, но из того, что я вижу, он успешно создает таблицу и поля. Примечание. Идентификатор проекта задается как часть аргументов функции.
bq_data | beam.io.Write(
"Write to BQ", beam.io.BigQuerySink(
'example_dataset_id.{}'.format(bq_table_name),
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
Теперь, похоже, что я могу получить все вставки с помощью этого:
bq_data = pipeline | beam.Create(
[{
'field_1': 'ExampleIdentifier',
'field_2': 'ExampleValue',
'field_3': 'ExampleFieldValue',
'created_at': '2016-12-26T05:50:39Z',
'updated_at': '2016-12-26T05:50:39Z',
'field_4': 'ExampleDataIdentifier',
'field_5: 'ExampleData'
}]
)
Но по какой-то причине, когда упаковка значения в PCollection, он говорит, что он вставляет в BigQuery, но когда Я запрашиваю таблицу, она ничего не показывает.
Почему он не вставляет? Я не вижу никаких ошибок, но ничего не вставляет в BigQuery.
Это то, что выглядит данные, как, содержащийся в PCollection, я близко к 1,100 строк для вставки:
{'field_1': 'ExampleIdentifier', 'field_2': 'ExampleValue', 'field_3': 'ExampleFieldValue', 'created_at': '2016-12-29 12:10:32', 'updated_at': '2016-12-29 12:10:32', 'field_4': 'ExampleDataIdentifier', 'field_5': 'ExampleData'}
Примечание: Я проверил в форматирования даты и форматирование даты выше допускается для вставки BigQuery.
Мы изучаем это. Вы используете DirectPipelineRunner (который по умолчанию)? Также можете ли вы дать более подробную информацию о своем конвейере и как вы подтвердили, что данные недоступны в BigQuery? Обязательно вызовите pipe.run() в конце для выполнения вашего конвейера. – chamikara
Вот копия команды, используемой для запуска потока данных с информацией о образцах, помещенной в: python -m dataflow_sample --runner DirectPipelineRunner --setup_file ./setup.py --job_name sample-dataflow-run-1 -server dev - -worker_machine_type g1-small --num_workers 10 --start_date '2016-12-01' --end_date '2016-12-30' --devices device_id_1 device_id_2 device_id_3 – Jravict
Насколько я проверял данные не в BigQuery, Я запускаю запрос к самой таблице, где должна быть информация. И да, я запускаю pipe.run() в конце функции runflow(). – Jravict