J'utilise celery et django-celery. J'ai défini une tâche périodique que j'aimerais tester. Est-il possible d'exécuter manuellement la tâche périodique à partir du shell afin de visualiser la sortie de la console ?
Réponses
Trop de publicités?Avez-vous essayé d'exécuter la tâche à partir du shell Django ? Vous pouvez utiliser la commande .apply
d'une tâche pour s'assurer qu'elle est exécutée rapidement et localement.
En supposant que la tâche s'appelle my_task
dans l'application Django myapp
dans un tasks
sous-module :
$ python manage.py shell
>>> from myapp.tasks import my_task
>>> eager_result = my_task.apply()
L'instance de résultat a la même API que l'habituelle AsyncResult
sauf que le résultat est toujours évalué avec empressement et localement et que l'option .apply()
bloquera jusqu'à ce que la tâche soit exécutée jusqu'à son terme.
Si vous voulez simplement déclencher une tâche lorsque la condition n'est pas satisfaite, par exemple, l'heure périodique n'est pas atteinte. Vous pouvez le faire en deux étapes.
1.Obtenez votre identifiant de tâche.
Vous pouvez le faire en tapant.
celery inspect registered
Vous verrez quelque chose comme app.tasks.update_something
. Si rien, c'est probablement que celery
n'a pas été lancé. Exécute-le.
2. exécuter la tâche avec celery call
celery call app.tasks.update_something
Pour plus de détails, il suffit de taper
celery --help
celery inspect --help
celery call --help
Je pense que vous devrez ouvrir deux shells : l'un pour exécuter des tâches à partir du shell Python/Django, et l'autre pour exécuter les tâches suivantes celery worker
( python manage.py celery worker
). Et comme l'a dit la réponse précédente, vous pouvez exécuter des tâches à l'aide de apply()
o apply_async()
J'ai modifié la réponse pour que vous n'utilisiez pas une commande obsolète.