J'essaie d'exécuter l'Apache Airflow PythonVirtualenvOperator dans l'un de mes DAGs mais Airflow génère l'erreur suivante :
[2020-12-14 20:06:32,291] {python_operator.py:316} INFO - Executing cmd
['virtualenv', '/tmp/venvwtqb3rki', '--python=python3.8']
[2020-12-14 20:06:32,301] {taskinstance.py:1150} ERROR - [Errno 2] No such file or directory: 'virtualenv'
Traceback (most recent call last):
File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
result = task_copy.execute(context=context)
File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 292, in execute_callable
self._execute_in_subprocess(self._generate_virtualenv_cmd(tmp_dir))
File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 317, in _execute_in_subprocess
output = subprocess.check_output(cmd,
File "/usr/lib/python3.8/subprocess.py", line 411, in check_output
return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
File "/usr/lib/python3.8/subprocess.py", line 489, in run
with Popen(*popenargs, **kwargs) as process:
File "/usr/lib/python3.8/subprocess.py", line 854, in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
File "/usr/lib/python3.8/subprocess.py", line 1702, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
J'ai Airflow et tous mes DAGs fonctionnant en tant qu'utilisateur Airflow. J'ai pensé que peut-être Airflow ne pouvait pas trouver la commande virutalenv dans son chemin pendant la configuration/exécution de la tâche.
Voici le code que j'ai mis en place actuellement pour le tester.
import logging
import datetime
from airflow import DAG
import airflow
from airflow.hooks.S3_hook import S3Hook
from airflow.contrib.hooks import aws_hook
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator, PythonVirtualenvOperator
from airflow.utils.dates import days_ago
import time
default_args = {
'owner':'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'retries': 0
}
dag = DAG (
dag_id = 'list_reqestor',
default_args = default_args,
catchup=False,
schedule_interval = None
)
def setup_driver(ti):
from libcloud.compute.types import Provider
from libcloud.compute.providers import get_driver
"""
Sets up Apache libcloud AWS ec2 node driver.
Args:
region: AWS region to perform credential check.
"""
try:
shopper_logger.info("Setting up node deployment driver.")
region = Variable.get('REGION')
cls = get_driver(Provider.EC2)
a_hook = aws_hook.AwsHook()
driver = cls(creds,region=region)
ti.xcom_push(XCOM_REQUESTOR_LIB_CLOUD_DRIVER, driver)
time.sleep(30)
setup_driver_task = PythonVirtualenvOperator(
task_id='setup_driver_task',
python_callable=setup_driver,
retries=0,
requirements=['apache-libcloud'],
python_version="3.8",
system_site_packages=False,
provide_context=True,
xcom_push=True,
dag=dag
)
setup_driver
Je ne suis pas sûr de ce qui me manque.