68 votes

Comment créer une tâche conditionnelle dans Airflow

Je voudrais créer une tâche conditionnelle dans le flux d'Air comme décrit dans le schéma ci-dessous. Le scénario prévu est le suivant:

  • Tâche 1 exécute les
  • Si la Tâche 1 à réussir, puis exécuter la Tâche 2a
  • Sinon Si la Tâche 1 échoue, puis exécuter la Tâche 2b
  • Enfin, exécution de la Tâche 3

Conditional Task Toutes les tâches ci-dessus sont SSHExecuteOperator. Je devine que je devrais être à l'aide de la ShortCircuitOperator et / ou XCom pour gérer la maladie, mais je ne suis pas clair sur la façon de l'appliquer. Pourriez-vous décrire la solution?

80voto

villasv Points 1915

La circulation de l'air a un BranchPythonOperator qui peut être utilisé pour exprimer la ramification de la dépendance plus directement.

Les docs décrire son utilisation:

Le BranchPythonOperator est un peu comme le PythonOperator sauf qu'il s'attend à un python_callable qui renvoie un task_id. Le task_id retourné est suivi, et tous les autres chemins sont ignorés. Le task_id retourné par la fonction Python a pour faire référence à une tâche directement en aval de la BranchPythonOperator tâche.

...

Si vous souhaitez ignorer certaines tâches, gardez à l'esprit que vous ne pouvez pas avoir un vide chemin, si donc faire un mannequin de la tâche.

Exemple De Code

def dummy_test():
    return 'branch_a'

A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)

branch_task = BranchPythonOperator(
    task_id='branching',
    python_callable=dummy_test,
    dag=dag,
)

branch_task >> A_task 
branch_task >> B_task

EDIT:

Si vous êtes à l'installation d'un débit d'Air version >=1.10.3, vous pouvez également retourner une liste d'id de tâche, vous permettant de sauter plusieurs aval des chemins d'accès dans un seul Opérateur et ne pas utiliser un mannequin de la tâche avant de rejoindre.

57voto

Jean S Points 456

Vous devez utiliser des règles de déclenchement du flux d'air

Tous les opérateurs ont un argument trigger_rule qui définit la règle selon laquelle la tâche générée est déclenchée.

Les possibilités de la règle de déclenchement:

 ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
 

Voici l'idée pour résoudre votre problème:

 from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook

sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)

task_1 = SSHExecuteOperator(
        task_id='task_1',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2 = SSHExecuteOperator(
        task_id='conditional_task',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2a = SSHExecuteOperator(
        task_id='task_2a',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)

task_2b = SSHExecuteOperator(
        task_id='task_2b',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_FAILED,
        ssh_hook=sshHook,
        dag=dag)

task_3 = SSHExecuteOperator(
        task_id='task_3',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ONE_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)


task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)
 

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X