0

Я использую Python (2.7) и работаю в среде DataFlow Google, и, разумеется, Google еще не полностью очистил все, и документации пока недостаточно. Тем не менее, часть для записи из Dataflow в BigQuery приведена здесь BigQuery Sink.Почему мой Python BigQuery Dataflow не вставляет записи в базу данных?

Согласно документации, с тем чтобы определить схему, вам нужно ввести строку:

schema = 'field_1:STRING, field_2:STRING, field_3:STRING, created_at:TIMESTAMP, updated_at:TIMESTAMP, field_4:STRING, field_5:STRING' 

Имя таблицы, ID проекта и набор данных ID, как это: «example_project_id: example_dataset_id.example_table_name»

Теперь все это работает. См. Код ниже, но из того, что я вижу, он успешно создает таблицу и поля. Примечание. Идентификатор проекта задается как часть аргументов функции.

bq_data | beam.io.Write(
    "Write to BQ", beam.io.BigQuerySink(
     'example_dataset_id.{}'.format(bq_table_name), 
     schema=schema, 
     write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, 
     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED 
    ) 
) 

Теперь, похоже, что я могу получить все вставки с помощью этого:

bq_data = pipeline | beam.Create(
    [{ 
     'field_1': 'ExampleIdentifier', 
     'field_2': 'ExampleValue', 
     'field_3': 'ExampleFieldValue', 
     'created_at': '2016-12-26T05:50:39Z', 
     'updated_at': '2016-12-26T05:50:39Z', 
     'field_4': 'ExampleDataIdentifier', 
     'field_5: 'ExampleData' 
    }] 
) 

Но по какой-то причине, когда упаковка значения в PCollection, он говорит, что он вставляет в BigQuery, но когда Я запрашиваю таблицу, она ничего не показывает.

Почему он не вставляет? Я не вижу никаких ошибок, но ничего не вставляет в BigQuery.

Это то, что выглядит данные, как, содержащийся в PCollection, я близко к 1,100 строк для вставки:

{'field_1': 'ExampleIdentifier', 'field_2': 'ExampleValue', 'field_3': 'ExampleFieldValue', 'created_at': '2016-12-29 12:10:32', 'updated_at': '2016-12-29 12:10:32', 'field_4': 'ExampleDataIdentifier', 'field_5': 'ExampleData'} 

Примечание: Я проверил в форматирования даты и форматирование даты выше допускается для вставки BigQuery.

+0

Мы изучаем это. Вы используете DirectPipelineRunner (который по умолчанию)? Также можете ли вы дать более подробную информацию о своем конвейере и как вы подтвердили, что данные недоступны в BigQuery? Обязательно вызовите pipe.run() в конце для выполнения вашего конвейера. – chamikara

+0

Вот копия команды, используемой для запуска потока данных с информацией о образцах, помещенной в: python -m dataflow_sample --runner DirectPipelineRunner --setup_file ./setup.py --job_name sample-dataflow-run-1 -server dev - -worker_machine_type g1-small --num_workers 10 --start_date '2016-12-01' --end_date '2016-12-30' --devices device_id_1 device_id_2 device_id_3 – Jravict

+0

Насколько я проверял данные не в BigQuery, Я запускаю запрос к самой таблице, где должна быть информация. И да, я запускаю pipe.run() в конце функции runflow(). – Jravict

ответ

0

Я попробовал пример с вашей точной схемой и вводами, и это сработало для меня. Я должен был выполнить следующие исправления.

(1) Кажется, что вы не указываете проект в своих аргументах. Вы можете указать это в определении конвейера, так как вы не видите ошибку для этого. (2) В коде, указанном выше, есть опечатка. 'field_5: 'ExampleData' должен быть 'field_5': 'ExampleData' Но я предполагаю, что это только опечатка в этом вопросе не в исходном конвейере, так как вы не получаете ошибку для этого.

Вы используете последнюю версию Dataflow? Вы можете попробовать создать новую виртуальную среду и запустить «pip install google-cloud-dataflow» для установки последней версии.

Можно ли поделиться своим полным pipleine для меня, чтобы попробовать?

Трудно отлаживать это дистанционно, так как вы используете «DirectPipelineRunner». Можно ли попробовать использовать тот же конвейер, используя «DataflowPipelineRunner» (обратите внимание, что для этого вам понадобится проект GCP с включенным биллингом)? Я смогу просмотреть журналы, если вы можете запустить это с помощью «DataflowPipelineRunner» и предоставить идентификатор задания.

+0

Проект находится в args: -server, required = False -region --output, required = False ** - project **, required = False --bucket, required = False --job_name, required = False --staging_location, required = False --temp_location, required = False --runner, required = False --setup_file, required = False --devices, nargs = "*", required = False --start_date, required = True --end_date, required = True – Jravict

+0

Честно говоря, я предполагаю, что что-то получило исправлено на стороне Google, потому что я играл с вещами, и после того, как я увидел ваш ответ, я вернулся к тому, что у меня было, и он работал отлично. Я не знаю, в чем проблема, но похоже, что она вставляет сейчас. – Jravict

Смежные вопросы