96 votes

Airflow: comment supprimer un DAG?

J'ai démarré le serveur Web Airflow et planifié des dags. Je peux voir les dags sur l'interface graphique Web.

Comment puis-je supprimer un groupe de disponibilité de base de données particulier d'être exécuté et affiché dans l'interface graphique Web? Existe-t-il une commande Airflow CLI pour le faire?

J'ai regardé autour de moi mais je n'ai pas trouvé de réponse à un moyen simple de supprimer un DAG une fois qu'il a été chargé et planifié.

78voto

tedmiston Points 465

Edit 8/27/18 - débit d'Air 1.10 est maintenant disponible sur PyPI!

https://pypi.org/project/apache-airflow/1.10.0/


Comment supprimer un DAG complètement

Nous avons maintenant dans la circulation de l'Air ≥ 1.10!

Le PR #2199 (Jira: la circulation de l'AIR-1002) ajout de DAG retrait de la circulation de l'Air est maintenant intégrée qui permet la suppression d'un DAG, les entrées de toutes les tables liées.

Le noyau delete_dag(...) du code est désormais de la partie expérimentale de l'API, et il y a entrypoints disponible via la CLI et également via l'API REST.

CLI:

airflow delete_dag my_dag_id

API REST (exécutant le serveur web en local):

curl -X "DELETE" http://127.0.0.1:8080/api/experimental/dags/my_dag_id

Avertissement concernant l'API REST: s'Assurer que l'entrée d'Air de cluster utilise l'authentification de la production.

Installation / mise à niveau de la circulation de l'Air 1.10 (en cours)

Pour la mise à niveau, exécutez:

export SLUGIFY_USES_TEXT_UNIDECODE=yes

ou:

export AIRFLOW_GPL_UNIDECODE=yes

Alors:

pip install -U apache-airflow

N'oubliez pas de vérifier la mise à JOUR.rapport de la première pour plus de détails!

21voto

Jesus Carpintero Points 171

Ceci est mon code adapté utilisant PostgresHook avec le connection_id par défaut.

 import sys
from airflow.hooks.postgres_hook import PostgresHook

dag_input = sys.argv[1]
hook=PostgresHook( postgres_conn_id= "airflow_db")

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
    sql="delete from {} where dag_id='{}'".format(t, dag_input)
    hook.run(sql, True)
 

14voto

Ruslan Points 196

Pas sûr pourquoi Apache Airflow n'a pas un moyen évident et facile de supprimer un DAG

Classé sur https://issues.apache.org/jira/browse/AIRFLOW-1002

11voto

Oleg Yamin Points 516

Je viens d'écrire un script qui supprime tout ce qui a trait à un particulier dag, mais ce n'est que pour MySQL. Vous pouvez écrire un autre connecteur de la méthode si vous utilisez PostgreSQL. À l'origine, les commandes posté par Lance sur la https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 Je viens de le mettre dans le script. Espérons que cette aide. Format: python script.py dag_id

import sys
import MySQLdb

dag_input = sys.argv[1]

query = {'delete from xcom where dag_id = "' + dag_input + '"',
        'delete from task_instance where dag_id = "' + dag_input + '"',
        'delete from sla_miss where dag_id = "' + dag_input + '"',
        'delete from log where dag_id = "' + dag_input + '"',
        'delete from job where dag_id = "' + dag_input + '"',
        'delete from dag_run where dag_id = "' + dag_input + '"',
        'delete from dag where dag_id = "' + dag_input + '"' }

def connect(query):
        db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database")
        cur = db.cursor()
        cur.execute(query)
        db.commit()
        db.close()
        return

for value in query:
        print value
        connect(value)

5voto

jeff Points 51

J'ai écrit un script qui supprime toutes les métadonnées liées à un dag spécifique pour la base de données SQLite par défaut. Ceci est basé sur la réponse de Jésus ci-dessus mais adapté de Postgres à SQLite. Les utilisateurs doivent définir ../airflow.db sur l'emplacement où script.py est stocké par rapport au fichier airflow.db par défaut (généralement ~/airflow ). Pour exécuter, utilisez python script.py dag_id .

 import sqlite3
import sys

conn = sqlite3.connect('../airflow.db')
c = conn.cursor()

dag_input = sys.argv[1]

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
    query = "delete from {} where dag_id='{}'".format(t, dag_input)
    c.execute(query)

conn.commit()
conn.close()
 

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X