79 votes

Comment empêcher le flux d'air de remblayer les pistes de dag ?

Supposons que vous ayez un DAG de flux d'air qui n'a pas de sens pour le remblayage, ce qui signifie qu'après l'avoir exécuté une fois, l'exécuter plusieurs fois rapidement serait complètement inutile.

Par exemple, si vous chargez dans votre base de données des données provenant d'une source qui n'est mise à jour que toutes les heures, le backfilling, qui se produit en succession rapide, reviendrait à importer les mêmes données encore et encore.

C'est particulièrement ennuyeux lorsque vous instanciez une nouvelle tâche horaire, et qu'elle exécute N le nombre de fois par heure qu'il a manqué, en faisant du travail redondant, avant qu'il ne commence à fonctionner à l'intervalle que vous avez spécifié.

La seule solution à laquelle je peux penser est quelque chose qu'ils ont spécifiquement déconseillé dans FAQ de la docs

Il est déconseillé d'utiliser des valeurs dynamiques comme date de début, en particulier dans les cas suivants datetime.now() car cela peut être assez déroutant.

Existe-t-il un moyen de désactiver le backfilling pour un DAG, ou dois-je faire ce qui précède ?

82voto

sage88 Points 865

Passez à la version 1.8 d'airflow et utilisez catchup_by_default=False dans le airflow.cfg ou appliquez catchup=False à chacun de vos dags.

https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#catchup_by_default

0 votes

Merci. C'est bien mieux que le LatestOnlyOperator.

8 votes

J'ai défini catchup_by_default=False, mais Airflow continue à remplir les tâches. Avez-vous une idée de la raison ? J'utilise la version 1.8

0 votes

@OllieGlass Êtes-vous sûr de l'avoir appliqué au bon conteneur, je ne sais pas exactement quelle est votre configuration, mais cela a certainement de l'importance. Vous pouvez également essayer de l'appliquer à des DAG spécifiques si vous n'êtes pas sûr.

23voto

Ben Tallman Points 149

En définissant catchup=False dans votre déclaration dag, vous obtiendrez exactement cette fonctionnalité.

Je n'ai pas la "réputation" de commenter, mais je voulais dire que catchup=False a été conçu (par moi) dans ce but précis. De plus, je peux vérifier que dans la version 1.10.1, elle fonctionne lorsqu'elle est définie explicitement dans l'instanciation. Cependant, je ne vois pas comment cela fonctionne lorsqu'il est placé dans les args par défaut. J'ai été éloigné d'Airflow pendant 18 mois cependant, donc il faudra un peu de temps avant que je puisse regarder pourquoi les args par défaut ne fonctionnent pas pour le rattrapage.

dag = DAG('example_dag',
        max_active_runs=3,
        catchup=False,
        schedule_interval=timedelta(minutes=5),
        default_args=default_args)

16voto

Ziggy Eunicien Points 1155

Il semble que ce soit un problème de flux d'air non résolu. Je sais que j'aimerais vraiment avoir exactement la même fonctionnalité. Voici ce que j'ai fait jusqu'à présent ; cela peut être utile à d'autres.

Certaines fonctionnalités de l'interface utilisateur (au moins dans la version 1.7.1.3) peuvent aider à résoudre ce problème. Si vous allez dans l'arborescence et cliquez sur une tâche spécifique (cases carrées), un bouton de dialogue s'affiche avec un bouton "marquer le succès". En cliquant sur "passé", puis sur "marquer le succès", toutes les instances de cette tâche dans le DAG seront étiquetées comme réussies et ne seront pas exécutées. Le DAG de niveau supérieur (cercles en haut) peut également être étiqueté comme réussi de manière similaire, mais il ne semble pas y avoir de moyen d'étiqueter plusieurs instances de DAG.

Je ne me suis pas encore suffisamment penché sur la question, mais il est possible d'utiliser la sous-commande 'trigger_dag' pour marquer les états des DAGs. voir ici : https://github.com/apache/incubator-airflow/pull/644/commits/4d30d4d79f1a18b071b585500474248e5f46d67d

Une fonctionnalité CLI pour marquer les DAGs est en cours de réalisation : http://mail-archives.apache.org/mod_mbox/airflow-commits/201606.mbox/%3CJIRA.12973462.1464369259000.37918.1465189859133@Atlassian.JIRA%3E https://github.com/apache/incubator-airflow/pull/1590

MISE À JOUR (9/28/2016) : Un nouvel opérateur 'LatestOnlyOperator' a été ajouté (. https://github.com/apache/incubator-airflow/pull/1752 ) qui n'exécutera que la dernière version des tâches en aval. Cela semble très utile et j'espère que cela fera bientôt partie des versions.

MISE À JOUR 2 : À partir de la version 1.8 de l'airflow, les LatestOnlyOperator a été publié.

0 votes

La mise à jour semble vraiment prometteuse ! Merci d'avoir répondu à la question.

2 votes

Notez que LatestOnlyOperator place les tâches en aval dans un état "ignoré". D'après la documentation, les états ignorés se propagent de telle sorte que toutes les tâches directement en amont sont également ignorées. Cela rend l'approche inadaptée lorsque vous souhaitez que les tâches en amont s'exécutent avec succès avec des données périmées. Dans ce cas, la meilleure solution est d'ajouter un opérateur précoce dans votre code qui s'échappe vers le succès si la tâche est exécutée particulièrement tard.

1 votes

La commande de remblayage pour le cli semble être maintenant disponible et est probablement la meilleure façon de le faire pour le moment. airflow.incubator.apache.org/cli.html airflow backfill -h [hostname here] -m=True -s [startdate] -e $(date + "%Y-%m-%dT:%H:%M:%S")

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X