J'ai trouvé un moyen de créer des flux de travail basés sur le résultat des tâches précédentes.
Fondamentalement, ce que vous voulez faire est d'avoir deux subdags avec les éléments suivants:
- Xcom pousser une liste (ou ce que jamais vous avez besoin pour créer la dynamique de flux de travail plus tard) dans le subdag qui est exécutée en premier (cf. test1.py
def return_list()
)
- Passer le principal dag objet en tant que paramètre à votre deuxième subdag
- Maintenant, si vous avez le principal objet dag, vous pouvez l'utiliser pour obtenir une liste de ses tâches instances. À partir de cette liste de tâche cas, vous pouvez filtrer une tâche d'exécution en cours en utilisant
parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
), on pourrait sans doute ajouter d'autres filtres ici.
- Avec cette instance de tâche, vous pouvez utiliser xcom pull pour récupérer la valeur que vous avez besoin en spécifiant le dag_id à celui de la première subdag:
dag_id='%s.%s' % (parent_dag_name, 'test1')
- Utilisation de la liste/valeur pour créer vos tâches de façon dynamique
Maintenant, j'ai testé dans mon local de la circulation de l'air installation et il fonctionne très bien. Je ne sais pas si la xcom tirer partie aura aucun problème si il n'y a plus d'une instance de la dag cours d'exécution dans le même temps, mais alors il faudrait probablement utiliser une clé unique ou quelque chose comme ça à identifier de manière unique le xcom valeur que vous souhaitez.
On pourrait sans doute d'optimiser le 3. étape pour être sûr à 100% pour obtenir une tâche spécifique de l'actuel de dag, mais pour mon utilisation elle effectue assez bien, je pense que l'on a seulement besoin d'un task_instance objet à utiliser xcom_pull.
Aussi, j'ai nettoyer le xcoms pour la première subdag avant chaque exécution, juste pour m'assurer de ne pas accidentellement toute valeur incorrecte.
Je suis assez mal à l'expliquer, alors j'espère que le code suivant va rendre les choses claires:
test1.py
from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
log = logging.getLogger(__name__)
def test1(parent_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.test1' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date,
)
def return_list():
return ['test1', 'test2']
list_extract_folder = PythonOperator(
task_id='list',
dag=dag,
python_callable=return_list
)
clean_xcoms = PostgresOperator(
task_id='clean_xcoms',
postgres_conn_id='airflow_db',
sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
dag=dag)
clean_xcoms >> list_extract_folder
return dag
test2.py
from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator
log = logging.getLogger(__name__)
def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
dag = DAG(
'%s.test2' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date
)
if len(parent_dag.get_active_runs()) > 0:
test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
dag_id='%s.%s' % (parent_dag_name, 'test1'),
task_ids='list')
if test_list:
for i in test_list:
test = DummyOperator(
task_id=i,
dag=dag
)
return dag
et le flux de travail principal:
test.py
from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2
DAG_NAME = 'test-dag'
dag = DAG(DAG_NAME,
description='Test workflow',
catchup=False,
schedule_interval='0 0 * * *',
start_date=datetime(2018, 8, 24))
test1 = SubDagOperator(
subdag=test1(DAG_NAME,
dag.start_date,
dag.schedule_interval),
task_id='test1',
dag=dag
)
test2 = SubDagOperator(
subdag=test2(DAG_NAME,
dag.start_date,
dag.schedule_interval,
parent_dag=dag),
task_id='test2',
dag=dag
)
test1 >> test2