200 votes

Méthode appropriée pour créer des flux de travail dynamiques dans Airflow

Problème

Est-il possible dans la circulation de l'Air pour créer un flux de travail tels que le nombre de tâches B.* est inconnue jusqu'à la fin de la Tâche? J'ai regardé subdags mais on dirait qu'il ne peut travailler qu'avec un ensemble statique de tâches qui doivent être déterminées lors de la création d'un Dag.

Serait dag déclencheurs de travail? Et si oui, pourriez-vous donner un exemple.

J'ai une question où il est impossible de connaître le nombre de tâche B qui seront nécessaires pour calculer la Tâche C jusqu'à ce que la Tâche a été achevée. Chaque Tâche B.* prendra plusieurs heures de calcul et ne peuvent être combinées.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

Idée #1

Je n'aime pas cette solution car j'ai créer un blocage ExternalTaskSensor et toutes les la Tâche B.* va prendre entre 2 à 24 heures. Donc, je ne la considère pas comme une solution viable. Il y a sûrement un moyen plus facile? Ou était-débit d'Air n'est pas conçu pour cela?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

Edit 1:

Dès à présent cette question ne dispose pas encore d'une grande réponse. J'ai été contacté par plusieurs personnes à la recherche d'une solution.

61voto

Oleg Yamin Points 516

Voici comment je l'ai fait avec une requête similaire sans aucun sous-tag:

Commencez par créer une méthode qui renvoie les valeurs souhaitées.

 def values_function():
     return values
 

Prochaine méthode create qui générera les travaux de manière dynamique:

 def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)
 

Et puis combinez-les:

 push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete
 

15voto

Christopher Beck Points 273

J'ai trouvé un moyen de créer des flux de travail basés sur le résultat des tâches précédentes.
Fondamentalement, ce que vous voulez faire est d'avoir deux subdags avec les éléments suivants:

  1. Xcom pousser une liste (ou ce que jamais vous avez besoin pour créer la dynamique de flux de travail plus tard) dans le subdag qui est exécutée en premier (cf. test1.py def return_list())
  2. Passer le principal dag objet en tant que paramètre à votre deuxième subdag
  3. Maintenant, si vous avez le principal objet dag, vous pouvez l'utiliser pour obtenir une liste de ses tâches instances. À partir de cette liste de tâche cas, vous pouvez filtrer une tâche d'exécution en cours en utilisant parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]), on pourrait sans doute ajouter d'autres filtres ici.
  4. Avec cette instance de tâche, vous pouvez utiliser xcom pull pour récupérer la valeur que vous avez besoin en spécifiant le dag_id à celui de la première subdag: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. Utilisation de la liste/valeur pour créer vos tâches de façon dynamique

Maintenant, j'ai testé dans mon local de la circulation de l'air installation et il fonctionne très bien. Je ne sais pas si la xcom tirer partie aura aucun problème si il n'y a plus d'une instance de la dag cours d'exécution dans le même temps, mais alors il faudrait probablement utiliser une clé unique ou quelque chose comme ça à identifier de manière unique le xcom valeur que vous souhaitez. On pourrait sans doute d'optimiser le 3. étape pour être sûr à 100% pour obtenir une tâche spécifique de l'actuel de dag, mais pour mon utilisation elle effectue assez bien, je pense que l'on a seulement besoin d'un task_instance objet à utiliser xcom_pull.

Aussi, j'ai nettoyer le xcoms pour la première subdag avant chaque exécution, juste pour m'assurer de ne pas accidentellement toute valeur incorrecte.

Je suis assez mal à l'expliquer, alors j'espère que le code suivant va rendre les choses claires:

test1.py

from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator

log = logging.getLogger(__name__)


def test1(parent_dag_name, start_date, schedule_interval):
    dag = DAG(
        '%s.test1' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    def return_list():
        return ['test1', 'test2']

    list_extract_folder = PythonOperator(
        task_id='list',
        dag=dag,
        python_callable=return_list
    )

    clean_xcoms = PostgresOperator(
        task_id='clean_xcoms',
        postgres_conn_id='airflow_db',
        sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
        dag=dag)

    clean_xcoms >> list_extract_folder

    return dag

test2.py

from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator

log = logging.getLogger(__name__)


def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
    dag = DAG(
        '%s.test2' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date
    )

    if len(parent_dag.get_active_runs()) > 0:
        test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
            dag_id='%s.%s' % (parent_dag_name, 'test1'),
            task_ids='list')
        if test_list:
            for i in test_list:
                test = DummyOperator(
                    task_id=i,
                    dag=dag
                )

    return dag

et le flux de travail principal:

test.py

from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2

DAG_NAME = 'test-dag'

dag = DAG(DAG_NAME,
          description='Test workflow',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 24))

test1 = SubDagOperator(
    subdag=test1(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval),
    task_id='test1',
    dag=dag
)

test2 = SubDagOperator(
    subdag=test2(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval,
                 parent_dag=dag),
    task_id='test2',
    dag=dag
)

test1 >> test2

12voto

Ena Points 797

OA: "Est-il un chemin dans la circulation de l'Air pour créer un flux de travail tels que le nombre de tâches B.* est inconnue jusqu'à la fin de la Tâche?"

Réponse courte est non. La circulation de l'air permettra de construire la DAG flux avant de commencer à l'exécuter.

Cela dit nous sommes arrivés à une conclusion simple, c'est que nous n'avons pas besoin. Quand vous voulez pour paralléliser peu de travail, vous devez évaluer les ressources disponibles et non pas le nombre d'éléments à traiter.

Nous l'avons fait: nous générer dynamiquement un nombre fixe de tâches, disons, 10, qui va diviser le travail. Par exemple, si nous avons besoin pour traiter 100 fichiers pour chaque tâche du processus de 10 d'entre eux. Je vais poster le code plus tard aujourd'hui.

Mise à jour

Voici le code, désolé pour le retard.

from datetime import datetime, timedelta

import airflow
from airflow.operators.dummy_operator import DummyOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 8),
    'email': ['myemail@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

dag = airflow.DAG(
    'parallel_tasks_v1',
    schedule_interval="@daily",
    catchup=False,
    default_args=args)

# You can read this from variables
parallel_tasks_total_number = 10

start_task = DummyOperator(
    task_id='start_task',
    dag=dag
)


# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
    return DummyOperator(
        provide_context=True,
        task_id='parallel_task_' + str(current_task_number),
        python_callable=parallelTask,
        # your task will take as input the total number and the current number to elaborate a chunk of total elements
        op_args=[current_task_number, int(parallel_tasks_total_number)],
        dag=dag)


end = DummyOperator(
    task_id='end',
    dag=dag)

for page in range(int(parallel_tasks_total_number)):
    created_task = create_dynamic_task(page)
    start_task >> created_task
    created_task >> end

Explication du Code:

Ici, nous avons un seul démarrer la tâche et une seule fin de tâche (à la fois factice).

Puis, à partir du début de la tâche avec la boucle de nous créer des 10 tâches avec les mêmes python appelable. Les tâches sont créés dans la fonction create_dynamic_task.

À chaque python appelable nous passer comme arguments le nombre total de tâches en parallèle et la tâche actuelle de l'indice.

Supposons que vous avez 1000 points à préciser: la première tâche sera de recevoir en entrée qu'il faut élaborer le premier morceau de 10 morceaux. Il va diviser les 1000 points en 10 morceaux et de l'élaboration de la première.

3voto

flinz Points 36

Je pense avoir trouvé une meilleure solution à ce à https://github.com/mastak/airflow_multi_dagrunqui utilise de simples replacement de DagRuns par le déclenchement de plusieurs dagruns, semblable à TriggerDagRuns. La plupart des crédits d'aller à https://github.com/mastak, bien que j'ai eu de patch quelques détails pour le faire fonctionner avec la plus récente des flux d'air.

La solution utilise un custom opérateur qui déclenche plusieurs DagRuns:

from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone


class TriggerMultiDagRunOperator(TriggerDagRunOperator):
    CREATED_DAGRUN_KEY = 'created_dagrun_key'

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None,
                 *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):

        context.update(self.op_kwargs)
        session = settings.Session()
        created_dr_ids = []
        for dro in self.python_callable(*self.op_args, **context):
            if not dro:
                break
            if not isinstance(dro, DagRunOrder):
                dro = DagRunOrder(payload=dro)

            now = timezone.utcnow()
            if dro.run_id is None:
                dro.run_id = 'trig__' + now.isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                execution_date=now,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True,
            )
            created_dr_ids.append(dr.id)
            self.log.info("Created DagRun %s, %s", dr, now)

        if created_dr_ids:
            session.commit()
            context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
        else:
            self.log.info("No DagRun created")
        session.close()

Vous pouvez ensuite soumettre plusieurs dagruns de la fonction appelable dans votre PythonOperator, par exemple:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago


def generate_dag_run(**kwargs):
    for i in range(10):
        order = DagRunOrder(payload={'my_variable': i})
        yield order

args = {
    'start_date': days_ago(1),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='simple_trigger',
    max_active_runs=1,
    schedule_interval='@hourly',
    default_args=args,
)

gen_target_dag_run = TriggerMultiDagRunOperator(
    task_id='gen_target_dag_run',
    dag=dag,
    trigger_dag_id='common_target',
    python_callable=generate_dag_run
)

J'ai créé une fourchette avec le code à https://github.com/flinz/airflow_multi_dagrun

-2voto

Mark Matthews Points 29

J'ai trouvé ce Moyen post qui est très similaire à cette question. Cependant, il est plein de fautes de frappe, et ne fonctionne pas quand j'ai essayé de la mettre en œuvre.

Ma réponse ci-dessus est comme suit:

Si vous êtes à la création de tâches de manière dynamique, vous devez le faire par itération sur quelque chose qui n'est pas créé en un en amont de la tâche, ou peut être défini indépendamment de cette tâche. J'ai appris que vous ne pouvez pas passer d'exécution de dates ou d'autres flux d'air variables à quelque chose d'extérieur à un modèle (par exemple, une tâche) comme beaucoup d'autres l'ont fait avant. Voir aussi ce post.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X