2016-06-24 2 views
0

Мне нужно объединить данные из Google Datastore и Google BigTable, чтобы подготовить отчет. Мне нужно каждую минуту выполнять эту операцию. Можно ли выполнить с Google Cloud Dataflow (при условии, что сама обработка не займет много времени и/или может быть разделена на независимые параллельные задания)?Выполнение задания периодического потока данных

  1. Должен ли я иметь бесконечную петлю внутри «основного» создания и выполнения того же самого трубопровода снова и снова?

  2. Если большинство времени в таком сценарии принимается путем создания виртуальных машин, можно ли инструктировать Dataflow использовать виртуальные машины клиентов?

Спасибо,

+0

Сколько данных вы бы хотели добавить и где бы вы сохранили результаты? –

ответ

1

Если вы ожидаете, что ваша работа достаточно мал, чтобы завершить в течение 60 секунд, вы могли бы рассмотреть возможность использования Datastore и BigTable API-интерфейсы из внутри DoFn в работе Streaming. Ваш трубопровод может выглядеть примерно так:

PCollection<Long> impulse = p.apply( CountingInput.unbounded().withRate(1, Duration.standardMinutes(1))) PCollection<A> input1 = impulse.apply(ParDo.of(readFromDatastore)); PCollection<B> input2 = impulse.apply(ParDo.of(readFromBigTable)); ...

Это производит один вход каждую минуту, навсегда. Выполняя роль потокового конвейера, виртуальные машины будут продолжать работать.

После прочтения обоих API-интерфейсов вы можете при необходимости подключить окно/соединение.

+0

Не удалось найти метод UnboundCountingInput.withRate (https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/CountingInput.UnboundedCountingInput#class-countinginputunboundedcountinginput). Каков самый простой способ реализовать такую ​​функцию? Мне нужно расширить класс UnboundedSource или есть более простой способ? –

+0

Метод 'withRate' был добавлен в Apache Beam. Вы должны уметь выполнить эту реализацию. https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java#L195 –

+0

И это backported в этом PR: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/319#issuecomment-229185613 –

Смежные вопросы