2016-02-08 2 views
6

Я пытаюсь выяснить, как мы «засекаем» состояние окна для некоторых наших потоковых заданий потока данных. Сценарий: у нас есть поток сообщений на форуме, мы хотим, чтобы на все время запускалось количество сообщений для каждой темы, поэтому у нас есть потоковое задание потока данных с глобальным окном и триггеры, чтобы испускать каждый раз, когда запись для темы приходит Все хорошо до сих пор. Но до источника потока у нас есть большой файл, который мы хотели бы обработать, чтобы получить наши исторические подсчеты, также потому, что темы живут вечно, нам нужен исторический счет, чтобы информировать о выходе из источника потока, нужна одна и та же логика для запуска над файлом, а затем запускается через источник потока, когда файл исчерпан, сохраняя состояние окна.Начальное состояние для задания потока данных

Современных идеи:

  • Написать обычай неограниченного источника, который делает именно это. Читает файл, пока он не исчерпан, а затем начинает чтение из потока. Не очень весело, потому что писать пользовательские источники не очень весело.
  • Запустите логику в пакетном режиме по файлу, и, как последний шаг, каким-то образом выпустите состояние в поточную раковину, затем введите поточную версию логического запуска, которая считывает как поток состояния, так и поток данных, и как-то сочетает их. Это, похоже, имеет смысл, но не уверен, как убедиться, что потоковое задание считывает все из источника состояния, чтобы инициализировать, прежде чем читать из потока данных.
  • Проведите исторические данные в поток, напишите задание, которое читается из обоих потоков. Те же проблемы, что и второе решение, не уверены, как убедиться, что один поток «потребляется» первым.

EDIT: последний вариант и то, с чем мы собираемся, состоит в том, чтобы написать расчетное задание, чтобы не иметь значения, в каком порядке происходят события, поэтому мы просто подталкиваем архив к паб/под тема, и все будет работать. Это работает в этом случае, но, очевидно, это влияет на нисходящего потребителя (необходимо либо поддерживать обновления, либо ретракции), поэтому мне было бы интересно узнать, какие другие решения люди имеют для посева своих оконных состояний.

ответ

2

Вы можете сделать то, что вы предложили в пункте 2 пули --- запустить два конвейера (в том же главном), с первым, который заполняет тему pubsub из большого файла. Это похоже на то, что делает пример StreamingWordExtract.

+0

Да, я вижу, что мы можем получить данные в потоковое задание через Pub/Sub, это больше вопрос о том, можно ли сначала прочитать задание с темы «Засыпка», а не пытаться читать как на в то же время и поэтому имеет очень разные временные метки событий. – bfabry

+0

Наряду с моим правлением выше, мы планируем, что вычисление будет агностиком заказа, в котором происходят события, и сделать так, чтобы наши выходные данные поддержки были такими, что это сработает. Приветствия. – bfabry

Смежные вопросы