2016-11-16 1 views
1

Я создал задачи в воздушном потоке, который я по расписанию запускать ежечасно и start_date установлен значением 2016-11-16установленного расписание воздуха интервал

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2016, 11, 16), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
    # 'queue': 'bash_queue', 
    # 'pool': 'backfill', 
    # 'priority_weight': 10, 
    # 'end_date': datetime(2016, 1, 1), 
} 

dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly") 

Я стартовал воздушный поток в текущий момент времени, который 10:00 AM, и я мог видеть воздушный поток работает его от 00:00 AM, то 01:00 AM и так далее:

INFO - Executing command: airflow run test_hourly_job task1 2016-11-16T00:00:00 --local -sd DAGS_FOLDER/test_airflow.py 
........ 
........ 
INFO - Executing command: airflow run test_hourly_job task1 2016-11-16T01:00:00 --local -sd DAGS_FOLDER/test_airflow.py 
....... 
....... 

Как настроить поток воздуха, чтобы начать говорить с текущего времени и запустить ежечасно идти вперед, вместо того, начиная с 00:00?

ответ

2

В вашем вопросе вы написали Словарь: default_args

In this there is Key: 'start_date': datetime(2016, 11, 16) 

На самом деле здесь DateTime объект создается, что, вход YYYY/MM/ДД, мы не обеспечиваем ввод времени и поэтому имеет по умолчанию 00:00 , так что ваш скрипт работает на время 00:00 вы можете проверить так: в питона

от даты и времени импорта DateTime

DateTime (2016, 11, 16)

#That Datetime object is generated with 00:00 Time 

#datetime (2016, 11, 16, 0, 0)

#If you need Current date and time to start process you can set value as: 
'start_date': datetime.now() 
#if you want only current time with respective date then you can use as fallows: 

current_date = datetime.now() 
default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2016, 11, 16, current_date.hour, current_date.minute), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
    # 'queue': 'bash_queue', 
    # 'pool': 'backfill', 
    # 'priority_weight': 10, 
    # 'end_date': datetime(2016, 1, 1), 
} 
dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly") 
0
  1. Установить воздушный поток в виртуальной среде Python.
  2. Активируйте среду.
  3. Сброс load_examples = False в ~/airflow/airflow.cfg
  4. Начать воздушный поток. $ airflow webserver -p <port>
  5. Скопируйте ниже даг в ~/airflow/dags
  6. Запустите планировщик $ airflow scheduler

Теперь интервал графика см ниже код.

Попробуйте это:

'start_date': datetime.now() 
dag = DAG('tutorial', default_args=default_args, schedule_interval="* * * * *") 

или

'start_date': datetime(2015, 6, 1), 
dag = DAG('tutorial', default_args=default_args, schedule_interval="@hourly") 

Полный код

""" 
Code that goes along with the Airflow tutorial located at: 
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py 
""" 
from airflow import DAG 
from airflow.operators.bash_operator import BashOperator 
from datetime import datetime, timedelta 


default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    #'start_date': datetime(2015, 6, 1), 
    'start_date': datetime.now(), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    #'retries': 1, 
    #'retry_delay': timedelta(minutes=5), 
    # 'queue': 'bash_queue', 
    # 'pool': 'backfill', 
    # 'priority_weight': 10, 
    # 'end_date': datetime(2016, 1, 1), 
} 

dag = DAG('tutorial', default_args=default_args, schedule_interval="* * * * *") // For minute 
#dag = DAG('tutorial', default_args=default_args, schedule_interval="@hourly") 
# 

# t1, t2 and t3 are examples of tasks created by instantiating operators 
t1 = BashOperator(
    task_id='print_date', 
    bash_command='date', 
    dag=dag) 

t2 = BashOperator(
    task_id='sleep', 
    bash_command='sleep 5', 
    retries=3, 
    dag=dag) 

templated_command = """ 
    {% for i in range(5) %} 
     echo "{{ ds }}" 
     echo "{{ macros.ds_add(ds, 7)}}" 
     echo "{{ params.my_param }}" 
    {% endfor %} 
""" 

t3 = BashOperator(
    task_id='templated', 
    bash_command=templated_command, 
    params={'my_param': 'Parameter I passed in'}, 
    dag=dag) 

t2.set_upstream(t1) 
t3.set_upstream(t1) 
0

Airflow обеспечивает драгоценный камень оператора называется LatestOnlyOperator пропустить задачи, которые не выполняются в течение последний запланированный запуск для DAG. LastOnlyOperator пропускает все немедленные нисходящие задачи и сам, если время прямо сейчас не между его исполнением и очередным запланированным исполнением_time. Этот оператор уменьшает количество циклов процессора.

default_args = { 
'owner': 'airflow', 
'depends_on_past': False, 
'start_date': datetime(2016, 11, 16), 
'email': ['[email protected]'], 
'email_on_failure': False, 
'email_on_retry': False, 
'retries': 1, 
'retry_delay': timedelta(minutes=5) 
} 

dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly") 

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) 

task1 = DummyOperator(task_id='task1', dag=dag) 

latest_only >> task 

Latest_only всегда должен находиться в верхней части задачи, которую вы хотите пропустить. Преимущество оператора last_only заключается в том, что всякий раз, когда вы перезапускаете dag, он пропускает задачи за все предыдущие времена и запускает текущий dag.

Также лучше не записывать время начала. Вместо этого:

from datetime import datetime, timedelta 

START_DATE = datetime.combine(datetime.today() - timedelta(1), datetime.min.time()) 
Смежные вопросы