5 votes

Airflow PythonVirtualenvOperator, Aucun fichier ou répertoire de ce type : 'virtualenv'.

J'essaie d'exécuter l'Apache Airflow PythonVirtualenvOperator dans l'un de mes DAGs mais Airflow génère l'erreur suivante :

[2020-12-14 20:06:32,291] {python_operator.py:316} INFO - Executing cmd
['virtualenv', '/tmp/venvwtqb3rki', '--python=python3.8']
[2020-12-14 20:06:32,301] {taskinstance.py:1150} ERROR - [Errno 2] No such file or directory: 'virtualenv'
Traceback (most recent call last):
  File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 292, in execute_callable
    self._execute_in_subprocess(self._generate_virtualenv_cmd(tmp_dir))
  File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 317, in _execute_in_subprocess
    output = subprocess.check_output(cmd,
  File "/usr/lib/python3.8/subprocess.py", line 411, in check_output
    return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
  File "/usr/lib/python3.8/subprocess.py", line 489, in run
    with Popen(*popenargs, **kwargs) as process:
  File "/usr/lib/python3.8/subprocess.py", line 854, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/usr/lib/python3.8/subprocess.py", line 1702, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)

J'ai Airflow et tous mes DAGs fonctionnant en tant qu'utilisateur Airflow. J'ai pensé que peut-être Airflow ne pouvait pas trouver la commande virutalenv dans son chemin pendant la configuration/exécution de la tâche.

Voici le code que j'ai mis en place actuellement pour le tester.

import logging
import datetime
from airflow import DAG
import airflow
from airflow.hooks.S3_hook import S3Hook
from airflow.contrib.hooks import aws_hook
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator, PythonVirtualenvOperator
from airflow.utils.dates import days_ago
import time

default_args = {
    'owner':'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'retries': 0
}

dag = DAG (
    dag_id = 'list_reqestor',
    default_args = default_args,
    catchup=False,
    schedule_interval = None

)

def setup_driver(ti):
    from libcloud.compute.types import Provider
    from libcloud.compute.providers import get_driver
    """
    Sets up Apache libcloud AWS ec2 node driver.

    Args:
        region: AWS region to perform credential check.
    """
    try:
        shopper_logger.info("Setting up node deployment driver.")
        region = Variable.get('REGION')
        cls = get_driver(Provider.EC2)
        a_hook = aws_hook.AwsHook()
        driver = cls(creds,region=region)
        ti.xcom_push(XCOM_REQUESTOR_LIB_CLOUD_DRIVER, driver)   
        time.sleep(30)     

setup_driver_task = PythonVirtualenvOperator(
    task_id='setup_driver_task',
    python_callable=setup_driver,
    retries=0,
    requirements=['apache-libcloud'],
    python_version="3.8",
    system_site_packages=False,
    provide_context=True,
    xcom_push=True,
    dag=dag
)

setup_driver

Je ne suis pas sûr de ce qui me manque.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X