2016-11-22 2 views
3

Мне нужен совет о том, как перезапустить все службы воздушного потока при развертывании, не убивая рабочих в середине задачи.Процедура развертывания для новых пропусков

Я написал процедуру развертывания для своих групп DAG, которые устанавливают воздушный поток и любые другие зависимости pip в virtualenv. После того, как мой каталог релиз готов, я:

  1. остановка воздушного потока цветок, воздушный поток-рабочий, воздушный поток-планировщик, и воздушный поток, веб-сервер
  2. Update «текущий» SimLink, чтобы указать на мой новый релиз
  3. Начать воздушный поток, цветок, воздушный поток, планировщик воздушного потока и воздушный поток.

Проблема с этой процедурой развертывания заключается в том, что рабочие немедленно уничтожаются. Я хотел бы добавить какой-то мониторинг к сценарию, чтобы приостановить все группы DAG, дождаться, пока рабочие простаивают, а затем перезапустите службы, но CLI воздушного потока не имеет возможности узнать, какие блокировки включены, и не работают ли рабочие.

Я понимаю, что многие службы воздушного потока могут автоматически обнаруживать изменения в папке dags, но я хочу, чтобы каждое развертывание имело свой собственный virtualenv. Если я не перезапущу все службы, то новое развертывание не заберет новую строку в файле requirements.txt.

+0

Я схожу с такой же проблемой, вы нашли способ или вам пришлось реализовать все упомянутое в принятом ответе? – s7anley

+0

Я действительно заметил, что работники изящно уйдут с SIGINT. Я обновил свой сценарий systemctl соответственно –

ответ

1

Работники воздушного потока изящно уходят с SIGINT. Обновите монитор процесса, чтобы завершить работу с SIGINT, а не по умолчанию. Если вы используете systemctl, то это будет выглядеть примерно так:

... 
[Service] 
EnvironmentFile=/etc/sysconfig/airflow 
User=airflow 
Group=airflow 
Type=simple 
ExecStart=... 
KillSignal=SIGINT 
Restart=on-failure 
RestartSec=10s 

... 
+2

На самом деле это не ответ на исходный вопрос.Это прекрасный ответ для изящного закрытия работника Airflow с файлом unitd. – DetDev

3

У вас есть доступ к базе данных Airflow, поэтому рассмотрите разработку сценария развертывания, который выполняет этот процесс для вас.

  • Обновление таблицы DAG, чтобы приостановить все группы DAG
  • Прочитайте таблицу TASK_INSTANCE подождать, пока все состояние выполнения задач комплектных
  • Restart воздушных потоков услуг.
  • Обновите таблицу DAG, чтобы отключить DAG.
+0

Немного круглая, но умная. Было бы хорошо, если бы эта функциональность была запечена в воздушном потоке. (Скажем, глобальная пауза/отказ от CLI + wait_for рабочих на холостом ходу) –

+0

рассмотреть вопрос о повышении запроса функции - https://issues.apache.org/jira/browse/AIRFLOW/?selectedTab=com.atlassian.jira.jira-projects -plugin: issues-panel – kvb

+0

Сделаю. Кроме того, этот ответ дал мне 90% пути, но я столкнулся с проблемами на этом пути. Планировщик воздушного потока должен быть остановлен, иначе рабочие продолжают выполнять задачи, которые уже преуспели. Я также должен был использовать API сельдерея, чтобы запросить рабочих, а не доверять данным в таблице task_instance. –

Смежные вопросы