45 votes

configuration de s3 pour les journaux dans airflow

J'utilise docker-compose pour mettre en place un cluster airflow évolutif. J'ai basé mon approche sur le Dockerfile suivant https://hub.docker.com/r/puckel/docker-airflow/

Mon problème est de faire en sorte que les journaux soient configurés pour écrire/lire à partir de s3. Lorsqu'une dag est terminée, je reçois une erreur comme celle-ci

*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.

*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00

J'ai créé une nouvelle section dans le airflow.cfg comme ceci

[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx

Et puis j'ai spécifié le chemin s3 dans la section des journaux distants dans airflow.cfg

remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn

Ai-je correctement configuré ce système et y a-t-il un bug ? Y a-t-il une recette du succès qui m'échappe ?

-- Mise à jour

J'ai essayé d'exporter aux formats URI et JSON, mais aucun ne semble fonctionner. J'ai ensuite exporté le aws_access_key_id et le aws_secret_access_key, puis airflow a commencé à le capter. Maintenant, je reçois cette erreur dans les journaux du travailleur

6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00

-- Mise à jour

J'ai également trouvé ce lien https://www.mail-archive.com/dev@airflow.incubator.apache.org/msg00462.html

Je me suis ensuite connecté à l'une de mes machines de travail (séparée du serveur web et du planificateur) et j'ai exécuté ce bout de code en python.

import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))

Je reçois cette erreur.

boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden

J'ai essayé d'exporter plusieurs types différents de AIRFLOW_CONN_ envs comme expliqué ici dans la section sur les connexions https://airflow.incubator.apache.org/concepts.html et par d'autres réponses à cette question.

s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3

{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}

{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}

J'ai également exporté AWS_ACCESS_KEY_ID et AWS_SECRET_ACCESS_KEY sans succès.

Ces informations d'identification sont stockées dans une base de données. Une fois que je les ai ajoutées dans l'interface utilisateur, elles devraient être récupérées par les travailleurs, mais ceux-ci ne sont pas en mesure d'écrire/de lire les journaux pour une raison quelconque.

0 votes

A ce stade, je suis prêt à toutes les stratégies pour que l'exploitation forestière fonctionne. Je ne peux pas les avoir localement, sur s3, ou en utilisant rfs

0 votes

Le dossier 'logs' existe-t-il dans le chemin ? Au moins les journaux locaux devraient fonctionner sans problème si le dossier existe. S'ils ne fonctionnent pas, même localement, la seule autre raison à laquelle je pense est que les permissions sur le dossier airflow sont incorrectes.

0 votes

On a peut-être quelque chose ici. github.com/puckel/docker-airflow/pull/100

40voto

Arne Huang Points 436

UPDATE Airflow 1.10 rend la journalisation possible beaucoup plus facile.

Pour la journalisation s3, configurez le crochet de connexion comme suit la réponse ci-dessus

et ensuite ajoutez simplement ce qui suit à airflow.cfg

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_base_log_folder = s3://my-bucket/path/to/logs
    remote_log_conn_id = MyS3Conn
    # Use server-side encryption for logs stored in S3
    encrypt_s3_logs = False

Pour l'enregistrement du GCS,

  1. Installez d'abord le paquet gcp_api, comme ceci : pip install apache-airflow [gcp_api].

  2. Configurez le crochet de connexion comme suit la réponse ci-dessus

  3. Ajoutez ce qui suit à airflow.cfg

    [core]
    # Airflow can store logs remotely in AWS S3. Users must supply a remote
    # location URL (starting with either 's3://...') and an Airflow connection
    # id that provides access to the storage location.
    remote_logging = True
    remote_base_log_folder = gs://my-bucket/path/to/logs
    remote_log_conn_id = MyGCSConn

REMARQUE : A partir de la version 1.9 d'Airflow, l'enregistrement à distance a été supprimé. modifié de manière significative . Si vous utilisez la version 1.9, lisez la suite.

Référence aquí

Instructions complètes :

  1. Créez un répertoire pour stocker les configurations et placez-le de sorte qu'il puisse être trouvé dans PYTHONPATH. Un exemple est $AIRFLOW_HOME/config

  2. Créer des fichiers vides appelés $AIRFLOW_HOME/config/log_config.py et $AIRFLOW_HOME/config/__init__.py

  3. Copiez le contenu de airflow/config_templates/airflow_local_settings.py dans le fichier log_config.py qui vient d'être créé à l'étape précédente.

  4. Personnalisez les parties suivantes du modèle :

    #Add this variable to the top of the file. Note the trailing slash.
    S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'
    
    Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
    LOGGING_CONFIG = ...
    
    Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    's3.task': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow.task',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        's3_log_folder': S3_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    
     Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
    'loggers': {
        'airflow.task': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow': {
            'handlers': ['console'],
            ...
        },
    }
  5. Assurez-vous qu'un crochet de connexion s3 a été défini dans Airflow, conformément à la procédure suivante la réponse ci-dessus . Le hook doit avoir un accès en lecture et en écriture au s3 bucket défini ci-dessus dans S3_LOG_FOLDER.

  6. Mettre à jour $AIRFLOW_HOME/airflow.cfg pour contenir :

    task_log_reader = s3.task
    logging_config_class = log_config.LOGGING_CONFIG
    remote_log_conn_id = <name of the s3 platform hook>
  7. Redémarrez le serveur web Airflow et le programmateur, et déclenchez (ou attendez) l'exécution d'une nouvelle tâche.

  8. Vérifiez que les journaux s'affichent pour les tâches nouvellement exécutées dans le seau que vous avez défini.

  9. Vérifiez que la visionneuse de stockage s3 fonctionne dans l'interface utilisateur. Affichez une tâche récemment exécutée, et vérifiez que vous voyez quelque chose comme :

    *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
    [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
    [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
    [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py

1 votes

Cela ne devrait-il pas être $AIRFLOW_HOME/config/__init__.py. ?

0 votes

Whoops, le formatage markdown m'a eu. Je l'ai édité, merci !

1 votes

Il y a une autre coquille s3TaskHandler devrait être S3TaskHandler

31voto

Him Points 916

Vous devez configurer la connexion S3 via l'interface Airflow. Pour cela, vous devez vous rendre dans l'onglet Admin -> Connexions sur airflow UI et créez une nouvelle ligne pour votre connexion S3.

Un exemple de configuration serait :

Conn Id: my_conn_S3

Conn Type: S3

Extra: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}

3 votes

Ok, je vais essayer ça aussi. Le problème est que nous avons tout dockerisé et qu'à chaque fois que nous mettons à jour l'image, nous avons une étape manuelle.

2 votes

Vous pouvez exporter une variable d'environnement par le biais de dockerfile, qui est récupéré par airflow comme paramètre de connexion. stackoverflow.com/a/44708691/2879084

1 votes

Ces paramètres fonctionnent-ils à partir de l'interface utilisateur ? Si oui, je peux ajouter plus de détails sur la configuration automatique.

13voto

Niels Joaquin Points 885

(Mis à jour à partir de Airflow 1.10.2)

Voici une solution si vous n'utilisez pas l'interface d'administration.

Mon Airflow ne fonctionne pas sur un serveur persistant (il est lancé chaque jour dans un conteneur Docker sur Heroku). dans ma configuration minimale, je ne touche jamais à l'interface d'administration ou au fichier cfg. Au lieu de cela, je dois définir des variables d'environnement spécifiques à Airflow dans un script bash, qui remplace le fichier .cfg.

apache-airflow [s3]

Tout d'abord, vous avez besoin de la s3 installé pour écrire vos journaux Airflow sur S3. ( boto3 fonctionne bien pour les travaux Python au sein de vos DAGs, mais l'option S3Hook dépend du sous-paquetage s3).

Une dernière remarque : l'installation de conda ne s'occupe pas encore de cela donc je dois faire pip install apache-airflow[s3] .

Variables d'environnement

Dans un script bash, j'ai configuré les éléments suivants core variables. En partant de ces instructions mais en utilisant la convention d'appellation AIRFLOW__{SECTION}__{KEY} pour les variables d'environnement, je le fais :

export AIRFLOW__CORE__REMOTE_LOGGING=True
export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

ID de la connexion S3

El s3_uri ci-dessus est un identifiant de connexion que j'ai inventé. Dans Airflow, il correspond à une autre variable d'environnement, AIRFLOW_CONN_S3_URI . La valeur de cela est votre chemin S3, qui doit être sous forme d'URI. C'est

s3://access_key:secret_key@bucket/key

Conservez-la de la même manière que vous gérez les autres variables d'environnement sensibles.

Avec cette configuration, Airflow sera capable d'écrire vos logs sur S3. Ils suivront le chemin de s3://bucket/key/dag/task_id/timestamp/1.log .


Annexe sur la mise à niveau d'Airflow 1.8 vers Airflow 1.10

J'ai récemment mis à jour mon pipeline de production d'Airflow 1.8 à 1.9, puis 1.10. La bonne nouvelle, c'est que les changements sont assez minimes ; le reste du travail consistait simplement à trouver des nuances dans l'installation des paquets (sans rapport avec la question initiale sur les journaux S3).

(1) Tout d'abord, j'ai dû passer à Python 3.6 avec Airflow 1.9.

(2) Le nom du paquet est passé de airflow a apache-airflow avec la version 1.9. Vous pouvez également rencontrer ce dans votre pip install .

(3) Le paquet psutil doit être dans une gamme de version spécifique pour Airflow. Vous pouvez rencontrer ce problème lorsque vous faites pip install apache-airflow .

(4) Les en-têtes python3-dev sont nécessaires avec Airflow 1.9+.

(5) Voici les modifications de fond : export AIRFLOW__CORE__REMOTE_LOGGING=True est maintenant nécessaire. Et

(6) Les journaux ont un chemin légèrement différent dans S3, que j'ai mis à jour dans la réponse : s3://bucket/key/dag/task_id/timestamp/1.log .

Mais c'est tout ! Les journaux ne fonctionnaient pas dans la version 1.9, je vous recommande donc de passer directement à la version 1.10, maintenant qu'elle est disponible.

0 votes

@Alex Consultez ma réponse entièrement mise à jour pour Airflow 1.10 !

0 votes

J'essaie de le faire sous la version 1.10.3 - et lorsque j'essaie d'ajouter le compte/secret à l'interface de l'utilisateur, je ne peux pas le faire. S3://account:secret_key_redacted@bucket/key uri, l'analyseur échoue - apparemment en essayant d'analyser mon secret en un port : ValueError: invalid literal for int() with base 10: 'secret_key_redacted' Mon secret contient un '/' - et c'est là que l'analyseur semble s'arrêter...

0 votes

Ok- c'est un problème connu avec AWS -- mais mon conteneur fonctionne à la manière d'un Fargate ECS J'ai un autre code boto3 en cours d'exécution qui est capable de lire/écrire sur S3 sans passer de clés - puisque c'est un rôle IAM qui donne l'accès. L'utilisation d'une clé/secret comme celle-ci est en fait un anti-modèle lors de l'exécution INSIDE AWS (EC2/ECS/etc). Alors, comment résoudre ce cas ? Est-ce que je dois réimplémenter le mécanisme d'authentification de S3Hook pour essayer d'obtenir une session et un client sans authentification ?

2voto

polomarcus Points 11

Pour compléter la réponse d'Arne avec les récentes mises à jour d'Airflow, vous n'avez pas besoin de paramétrer task_log_reader à une autre valeur que celle par défaut : task

Comme si vous suiviez le modèle de journalisation par défaut airflow/config_templates/airflow_local_settings.py vous pouvez voir depuis cet engagement (notez que le nom du gestionnaire a été changé en 's3': {'task'... au lieu de s3.task ) qui est la valeur sur le dossier distant( REMOTE_BASE_LOG_FOLDER ) remplacera le gestionnaire par le bon :

REMOTE_LOGGING = conf.get('core', 'remote_logging')

if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])

Plus de détails sur la façon de se connecter à/de lire depuis S3 : https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3

1voto

diogoa Points 11

Juste une remarque à l'intention de tous ceux qui suivent les instructions très utiles de l'ouvrage intitulé la réponse ci-dessus : Si vous tombez sur ce problème : "ModuleNotFoundError : No module named 'airflow.utils.log.logging_mixin.RedirectStdHandler'" comme indiqué ici (ce qui arrive quand on utilise airflow 1.9), la solution est simple - utilisez plutôt ce modèle de base : https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py (et suivez toutes les autres instructions dans la réponse ci-dessus )

Le modèle actuel incubateur-airflow/airflow/config_templates/airflow_local_settings.py présent dans la branche master contient une référence à la classe "airflow.utils.log.s3_task_handler.S3TaskHandler", qui n'est pas présente dans le paquet python apache-airflow==1.9.0. J'espère que cela vous aidera !

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X