2017-02-17 2 views
3

Потерпите меня, так как я только начал использовать Airflow, и то, что я пытаюсь сделать, это собрать код возврата из задачи BashOperator и сохранить его в локальной переменной, а затем на основе что код возврата разветвляется на другую задачу. У меня проблема заключается в том, как заставить BashOperator что-то возвращать. Ниже мой сегмент кода:Воздушный поток BashOperator собирает код возврата

dag = DAG(dag_id='dag_1', 
     default_args=default_args, 
     schedule_interval='0 2 * * *', 
     user_defined_macros=user_def_macros, 
     dagrun_timeout=timedelta(minutes=60) 
    ) 
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}", dag=dag) 
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("oodas") }}"', dag=dag) 
t2.set_upstream(oodas) 

Я пробуя xcom_push, но честно не знаю, как это работает .. Является ли это правильный путь, чтобы собрать результат? В журналах последняя строка: Команда вышла с кодом возврата 0.

ответ

0

Можете ли вы разместить всю DAG. Я думаю, что вы имеете вопрос в интерпретации, как Airflow работает

От TASK1 (если это оператор баш), вы можете сделать:

t1 = BashOperator(task_id='t1', bash_command='echo "{{ ti.xcom_push("t1") }}"', dag=dag) 

И в TASK2:

t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("t1") }}"', dag=dag) 

где ТИ переменная task_instance и {{}} нотация используется для доступа к разделу Variables

+0

Я только что обновил код. Поэтому я считаю, что вы правы, я не использую его правильно. Вот мой экземпляр задачи «ti» только «oodas»? Я не уверен, что ключом является то, что BashOperator использует для xcom –

Смежные вопросы