5 votes

Comment définir la priorité entre différents DAGs dans Airflow

Imaginons que nous ayons deux DAG, dag1 et dag2, ils répondent à des exigences commerciales différentes. ils sont complètement indépendants. mais dag1 est plus important d'être terminé le plus tôt possible.
Pour simplifier, ils ont chacun une tâche et s'exécutent quotidiennement.

Dans un scénario où dag1 est en retard de 2 ou 3 jours, je veux m'assurer que dag1 s'exécute et termine ses exécutions de dag en premier, c'est-à-dire que dag1 est à jour avant que dag2 puisse procéder.

J'ai essayé priority_weight mais cela ne fonctionne pas entre différents DAG.

J'ai besoin d'une manière de mettre ces tâches des deux DAG différents dans la même file d'attente et d'obtenir une priorisation au niveau du DAG.

2voto

Meghdeep Ray Points 2503

À partir de la documentation officielle du Capteur de Tâche Externe:

Attend qu'une tâche différente d'un DAG ou une tâche dans un DAG différent se termine
pour une execution_date spécifique.

    :param external_dag_id: L'ID du DAG qui contient la tâche que vous souhaitez
        attendre
    :type external_dag_id: str
    :param external_task_id:L'ID de la tâche qui contient la tâche que vous souhaitez
        attendre. Si "None", le capteur attend le DAG
    :type external_task_id:str
    :param allowed_states: liste des états autorisés, par défaut c'est ``['success']``
    :type allowed_states: list
    :param execution_delta: différence de temps avec l'exécution précédente à
        examiner, par défaut c'est la même execution_date que la tâche ou le DAG actuel.
        Pour hier, utilisez [positive!] datetime.timedelta(days=1). Soit
        execution_delta soit execution_date_fn peut être passé à
        ExternalTaskSensor, mais pas les deux.
    :type execution_delta: datetime.timedelta
    :param execution_date_fn: fonction qui reçoit la date d'exécution actuelle
        et renvoie les dates d'exécution désirées à interroger. Soit execution_delta
        soit execution_date_fn peut être passé à ExternalTaskSensor, mais pas les deux.
    :type execution_date_fn: callable
    :param check_existence:Défini à `True` pour vérifier si la tâche externe existe (quand
        external_task_id n'est pas None) ou vérifier si le DAG pour lequel il faut attendre existe (quand
        external_task_id est None), et arrêter immédiatement d'attendre si la tâche externe
        ou le DAG n'existe pas (valeur par défaut: False).
    :type check_existence: bool

Les deux DAGs devraient avoir la depends_on_past Règle de Déclenchement définie sur True afin que les nouveaux DAGs planifiés ne s'exécutent que si les exécutions précédentes ont été complétées avec succès.

Ensuite, ajoutez le Capteur de Tâche Externe au début de Dag 2 (celui qui s'exécute plus tard).

Alternativement, vous pourriez créer votre propre capteur personnalisé et l'utiliser via les Modules d'Extensions d'Airflow afin de vérifier la métadatabase pour le statut des Exécutions de DAG.

Vous pourriez également construire des capteurs personnalisés qui utilisent soit des Airflow XCOMs soit des Airflow Variables pour transmettre les temps d'exécution ou tout autre Macro d'Airflow à un Capteur dans le DAG 2.

0voto

Fadi Bakoura Points 114

Je trouve une solution ad hoc où je pourrais simplement envelopper les deux dags dans une couche de verrouillage.

Je veux dire par là, nous ajoutons une tâche simple au début qui verrouille une ligne spécifique dans une base de données, puis à la fin du dag, nous ajoutons également une tâche simple qui déverrouille la ligne verrouillée.
Par conséquent, lorsqu'un des deux dags est en cours d'exécution et que l'autre veut démarrer, il est simplement bloqué car il ne peut pas verrouiller la ligne spécifique connue pour les deux dags.

Voici une description simple de la couche de verrouillage
dag1: lock_operator, task1, unlock_operator.
dag2: lock_operator, task1, unlock_operator.

Bien sûr, nous pourrions laisser le lock_operator échouer s'il ne peut pas verrouiller la ligne, et définir retries_count comme très élevé pour garantir qu'il continuera à tenter de verrouiller jusqu'à ce qu'il le puisse.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X