84 votes

execution_date dans airflow: besoin d'accéder en tant que variable

Je suis vraiment un débutant dans ce forum. Mais j'ai été jouer avec le flux d'air, pendant un certain temps, pour notre société. Désolé si cette question a l'air vraiment stupide.

Je suis en train d'écrire un pipeline à l'aide tas de BashOperators. En gros, pour chaque Tâche, je veux simplement appeler une api REST à l'aide de "curl"

C'est ce que mon pipeline ressemble(version très simplifiée):

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['xxxx@xxx.xxx'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)

Si vous remarquez que je suis en train de faire current_datetime= datetime_obj.now(tz=tz.tzlocal()) Au lieu de ce que je veux, ici, 'execution_date'

Comment puis-je utiliser 'execution_date' directement et l'assigner à une variable dans mon fichier python?

Je dois avoir cette question plus générale de l'accès args. Toute aide sera vraiment appréciée.

Merci

58voto

Erik Schuchmann Points 211

L' BashOperators' bash_command argument est un modèle. Vous pouvez accéder execution_date dans le modèle en tant que datetime objet à l'aide de l' execution_date variable. Dans le modèle, vous pouvez utiliser n'importe quel jinja2 méthodes pour les manipuler.

À l'aide de votre BashOperator bash_command chaîne de:

# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}

# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}

Si vous voulez juste l'équivalent de chaîne de la date d'exécution, ds sera de retour un horodatage (AAAA-MM-JJ), ds_nodash retourne même sans les tirets (AAAAMMJJ), etc. Plus sur macros est disponible dans l' Api Docs.


Votre opérateur final ressemblerait à:

command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)

49voto

Ziggy Eunicien Points 1155

Le PythonOperator constructeur prend un " provide_context paramètre (voir https://pythonhosted.org/airflow/code.html). Si c'est Vrai, alors il passe par un certain nombre de paramètres dans le python_callable via kwargs. kwargs['execution_date'] est ce que vous voulez, je crois.

Quelque chose comme ceci:

def python_method(ds, **kwargs):
    Variable.set('execution_date', kwargs['execution_date'])
    return

doit = PythonOperator(
    task_id='doit',
    provide_context=True,
    python_callable=python_method,
    dag=dag)

Je ne suis pas sûr de savoir comment faire avec la BashOperator, mais vous pouvez commencer par cette question: https://github.com/airbnb/airflow/issues/775

32voto

Babcool Points 31

Je pense que vous ne pouvez pas affecter des variables avec les valeurs de la circulation de l'air contexte, en dehors d'une instance de tâche, ils sont uniquement disponibles au moment de l'exécution. Fondamentalement, il y a 2 différentes étapes lorsqu'un groupe est chargé et exécuté dans la circulation de l'air :

  • D'abord votre fichier dag est interprétée et analysée. Il a de travailler et de les compiler et les définitions de tâches doit être correct (pas d'erreur de syntaxe ou autre). Au cours de cette étape, si vous faites des appels de fonction à remplir certaines valeurs, ces fonctions ne seront pas en mesure d'accéder à la circulation de l'air le contexte (la date d'exécution pour l'exemple, encore plus si vous faites des le remblayage).

  • La deuxième étape est l'exécution de la dag. C'est seulement au cours de cette deuxième étape que les variables fournies par le flux d'air (execution_date, ds, etc...) sont disponibles en tant qu'elles sont liées à une exécution de la dag.

Donc vous ne pouvez pas initialiser les variables globales à l'aide de la circulation de l'Air contexte, toutefois, la circulation de l'Air vous donne de multiples mécanismes pour obtenir le même effet :

  1. À l'aide de jinja le modèle de votre commande (il peut être dans une chaîne de caractères dans le code ou dans un fichier, les deux seront traitées). Vous disposez de la liste des modèles disponibles ici : https://airflow.apache.org/code.html#default-variables. Notez que certaines fonctions sont également disponibles, en particulier pour le calcul des jours de delta et le formatage de la date.

  2. À l'aide d'un PythonOperator dans lequel vous passez le contexte (l' provide_context argument). Cela vous permettra d'accéder à un même modèle avec la syntaxe kwargs['<variable_name']. Si vous en avez besoin, vous pouvez renvoyer une valeur à partir d'un PythonOperator, celle-ci sera stockée dans un XCOM variable, vous pouvez utiliser plus tard dans n'importe quel modèle. L'accès à XCOM variables d'utiliser cette syntaxe : https://airflow.apache.org/concepts.html#xcoms

  3. Si vous écrivez votre propre opérateur, vous pouvez accéder à débit d'air variable avec le dict context.

23voto

l0n3r4ng3r Points 344
 def execute(self, context):
    execution_date = context.get("execution_date")
 

Cela devrait être à l'intérieur de la méthode execute () de l'opérateur

0voto

田咖啡 Points 89

La date d'exécution, (datetime.datetime)

  {{ execution_date }}
 

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X