2 votes

Tâche après BranchPythonOperator : la tâche est ignorée.

J'ai créé un BranchPythonOperator qui appelle 2 tâches en fonction de la condition comme :

typicon_check_table = BranchPythonOperator(
    task_id='typicon_check_table',
    python_callable=CheckTable(),
    provide_context=True,
    dag=typicon_task_dag)

typicon_create_table = PythonOperator(
    task_id='typicon_create_table',
    python_callable=CreateTable(),
    provide_context=True,
    dag=typicon_task_dag)

typicon_load_data = PythonOperator(
    task_id='typicon_load_data',
    python_callable=LoadData(),
    provide_context=True,
    dag=typicon_task_dag)

typicon_check_table.set_downstream([typicon_load_data, typicon_create_table])
typicon_create_table.set_downstream(typicon_load_data)

C'est le CheckTable classe appelable :

class CheckTable:
    """
    DAG task to check if table exists or not.
    """

    def __call__(self, **kwargs) -> None:
        pg_hook = PostgresHook(postgres_conn_id="postgres_docker")
        query = "SELECT EXISTS ( \
            SELECT 1 FROM information_schema.tables \
            WHERE table_schema = 'public' \
            AND table_name = 'users');"

        table_exists = pg_hook.get_records(query)[0][0]
        if table_exists:
            return "typicon_load_data"
        return "typicon_create_table"

Le problème est que les deux tâches sont ignorées lorsque l'application typicon_check_table est exécutée.

Comment résoudre ce problème ?

enter image description here

2voto

Saleem Shaiks Points 11

J'ai travaillé avec le même scénario, cela fonctionne bien avec moi pour le code ci-dessous.

BranchPythonOperator(task_id='slot_population_on_is_y_or_n', python_callable=DAGConditionalValidation('Y'),
                         trigger_rule='one_success')
slot_population_on_is_y = DummyOperator(task_id='slot_population_on_is_y')
slot_population_on_is_n = DummyOperator(task_id='slot_population_on_is_n')
slot_population_on_is_y_or_n >> [slot_population_on_is_y, slot_population_on_is_n]

class DAGConditionalValidation:

    def __init__(self, conditional_param_key):
        self.conditional_param_key = conditional_param_key

    def __call__(self, **kwargs):
        if (conditional_param_key == 'Y'):
            return slot_population_on_is_y
        return slot_population_on_is_n

Tout votre code semble correct, mais il vous manque la règle de déclenchement, veuillez définir la règle de déclenchement comme suit trigger_rule='one_success' .
Cela devrait fonctionner pour vous aussi.

0 votes

Oui c'est vrai, je suis un nouveau contributeur donc je dois apprendre les techniques de postage :)

1voto

La tâche typicon_load_data a typicon_create_table comme parent et le trigger_rule par défaut est all_success Je ne suis donc pas surpris par ce comportement.

Deux cas possibles ici :

  1. CheckTable() renvoie à typicon_load_data alors typicon_create_table est ignoré, mais typicon_load_data en aval est également ignorée.
  2. CheckTable() renvoie à typicon_create_table qui est exécuté et qui déclenche typicon_load_data qui est ignorée parce qu'elle était la branche exclue.

Je suppose que votre capture d'écran est celle du cas 1 ?

0voto

Ajoutez une règle trigger_rule="all_done" à la table typicon_check_table comme suit

typicon_check_table = BranchPythonOperator(
    task_id='typicon_check_table',
    python_callable=CheckTable(),
    provide_context=True,
    trigger_rule="all_done",
    dag=typicon_task_dag)

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X