105 votes

Comment vérifier l'état d'une tâche dans Celery ?

Comment vérifier si une tâche est en cours d'exécution dans celery (plus précisément, j'utilise celery-django) ?

J'ai lu la documentation, et j'ai cherché sur Google, mais je ne vois pas d'appel comme celui-ci :

my_example_task.state() == RUNNING

Mon cas d'utilisation est que j'ai un service externe (java) pour le transcodage. Lorsque j'envoie un document à transcoder, je veux vérifier si la tâche qui exécute ce service est en cours d'exécution, et si non, la (re)lancer.

J'utilise les versions stables actuelles - 2.4, je crois.

107voto

Gregor Points 2305

Retourne le task_id (qui est donné par .delay()) et interroge l'instance celery sur l'état de la tâche :

x = method.delay(1,2)
print x.task_id

Lors de la demande, obtenez un nouveau AsyncResult en utilisant ce task_id :

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()

11 votes

Merci, mais que faire si je n'ai pas accès à x ?

4 votes

Où est-ce que vous mettez en file d'attente vos travaux dans celery ? Là, vous devez retourner le task_id pour suivre le travail dans le futur.

0 votes

Contrairement à celle de @Marcin, cette réponse n'utilise pas la méthode statique Task.AsyncResult() comme fabrique de l'AsyncResult, ce qui permet de réutiliser utilement la configuration du backend, sinon une erreur est levée lors de la tentative d'obtention du résultat.

92voto

Louis Points 13534

Créer un AsyncResult de l'identifiant de la tâche est de la manière recommandée dans le FAQ pour obtenir le statut de la tâche lorsque la seule chose que vous avez est l'id de la tâche.

Cependant, à partir de Celery 3.x, il y a des mises en garde importantes qui risquent de faire mal si l'on n'y prête pas attention. Cela dépend vraiment du scénario d'utilisation spécifique.

Par défaut, Celery n'enregistre pas d'état "en cours d'exécution".

Pour que Celery enregistre qu'une tâche est en cours d'exécution, vous devez configurer task_track_started a True . Voici une tâche simple qui permet de tester cela :

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

Quand task_track_started es False qui est la valeur par défaut, le spectacle d'état est PENDING même si la tâche a commencé. Si vous définissez task_track_started a True alors l'état sera STARTED .

L'État PENDING signifie "Je ne sais pas".

Un site AsyncResult avec l'État PENDING ne signifie rien de plus que le fait que Celery ne connaît pas l'état de la tâche. Cela peut être dû à un certain nombre de raisons.

D'abord, AsyncResult peut être construit avec des identifiants de tâches invalides. De telles "tâches" seront considérées comme en attente par Celery :

>>> task.AsyncResult("invalid").status
'PENDING'

Ok, donc personne ne va nourrir évidemment ids invalides à AsyncResult . D'accord, mais il a aussi pour effet que AsyncResult considérera également une tâche qui a été exécutée avec succès mais que Celery a oubliée comme étant PENDING . Encore une fois, dans certains cas d'utilisation cela peut être un problème. Une partie du problème dépend de la façon dont Celery est configuré pour conserver les résultats des tâches, car cela dépend de la disponibilité des "pierres tombales" dans le backend des résultats. ("Tombstones" est le terme utilisé dans la documentation de Celery pour désigner les morceaux de données qui enregistrent la façon dont la tâche s'est terminée). Utilisation de AsyncResult ne fonctionnera pas du tout si task_ignore_result es True . Un problème plus contrariant est que Celery expire les pierres tombales par défaut. Le site result_expires est réglé par défaut sur 24 heures. Ainsi, si vous lancez une tâche, et enregistrez l'id dans le stockage à long terme, et plus 24 heures plus tard, vous créez un AsyncResult avec elle, le statut sera PENDING .

Toutes les "vraies tâches" commencent dans le PENDING l'état. Donc, obtenir PENDING sur une tâche pourrait signifier que la tâche a été demandée mais n'a jamais progressé plus loin (pour une raison quelconque). Ou encore que la tâche a été exécutée mais que Celery a oublié son état.

Ouch ! AsyncResult ne fonctionnera pas pour moi. Que puis-je faire d'autre ?

Je préfère garder la trace de objectifs que de garder la trace de la les tâches elles-mêmes . Je conserve certaines informations sur les tâches, mais elles sont vraiment secondaires par rapport au suivi des objectifs. Les objectifs sont stockés dans un espace indépendant de Celery. Lorsqu'une requête doit effectuer un calcul qui dépend de l'atteinte d'un objectif, Celery vérifie si l'objectif a déjà été atteint. Si oui, il utilise l'objectif mis en cache, sinon il lance la tâche qui aura un effet sur l'objectif et envoie au client qui a fait la requête HTTP une réponse indiquant qu'il doit attendre un résultat.


Les noms de variables et les hyperliens ci-dessus sont pour Celery 4.x. En 3.x, les variables et les hyperliens correspondants sont les suivants : CELERY_TRACK_STARTED , CELERY_IGNORE_RESULT , CELERY_TASK_RESULT_EXPIRES .

1 votes

Donc, si je veux vérifier le résultat plus tard (peut-être même au sein d'un autre processus), il vaut mieux que j'utilise ma propre implémentation ? Stocker manuellement le résultat dans la base de données ?

0 votes

Oui, je séparerais le suivi des "objectifs" de celui des "tâches". J'ai écrit "effectuer un calcul qui dépend d'un objectif". En général, le "but" est également un calcul. Par exemple, si je veux montrer l'article X à un utilisateur, je dois le convertir de XML en HTML, mais avant cela, je dois avoir résolu toutes les références bibliographiques. (Je vérifie si l'objectif "article X avec toutes les références bibliographiques résolues" existe et je l'utilise plutôt que d'essayer de vérifier l'état d'une tâche Celery qui aurait calculé l'objectif que je souhaite.

0 votes

Et l'information "article X avec toutes les références bibliographiques résolues" est stockée dans un cache mémoire et stockée dans une base de données eXist-db.

66voto

Marcin Points 25366

Chaque Task a un .request qui le contient AsyncRequest objet. Ainsi, la ligne suivante donne l'état d'une Task task :

task.AsyncResult(task.request.id).state

2 votes

Existe-t-il un moyen de stocker le pourcentage d'avancement d'une tâche ?

5 votes

Lorsque je fais cela, j'obtiens un AsyncResult PENDING permanent, même si j'attends suffisamment longtemps pour que la tâche se termine. Existe-t-il un moyen de faire en sorte que les changements d'état soient visibles ? Je pense que mon backend est configuré, et j'ai essayé de définir CELERY_TRACK_STARTED=True, sans succès.

1 votes

@dstromberg Malheureusement, cela fait 4 ans que ce problème ne s'est pas posé pour moi, donc je ne peux pas vous aider. Vous avez certainement besoin de configurer celery pour suivre le statut.

18voto

msangel Points 1716

Vous pouvez également créer des états personnalisés et mettre à jour leur valeur lors de l'exécution de la tâche. Cet exemple est tiré de la documentation :

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

17voto

Cesar Rios Points 161

Vieille question mais j'ai récemment rencontré ce problème.

Si vous essayez d'obtenir l'identifiant de la tâche, vous pouvez le faire comme suit :

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

Maintenant, vous savez exactement ce qu'est le task_id et vous pouvez l'utiliser pour obtenir le AsyncResult :

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4

6 votes

Il n'est absolument pas nécessaire de créer votre propre identifiant de tâche et de le transmettre à apply_async . L'objet renvoyé par apply_async est un AsyncResult qui contient l'identifiant de la tâche générée par Celery.

1 votes

Corrigez-moi si je me trompe, mais n'est-il pas parfois utile de générer un UUID basé sur certaines entrées, de sorte que tous les appels obtenant les mêmes entrées obtiennent le même UUID ? En d'autres termes, il est peut-être parfois utile de spécifier votre task_id.

1 votes

@dstromberg La question posée par l'OP est "comment vérifier le statut d'une tâche" et la réponse ici dit "Si vous essayez d'obtenir l'identifiant de la tâche...". Ni la vérification de l'état de la tâche, ni l'obtention de l'identifiant de la tâche. task_id exigent que vous générer une tâche à accomplir. Dans votre commentaire, vous avez imaginé une raison qui va comme suit au-delà des frontières "Comment vérifier le statut d'une tâche ?" et "Si vous essayez d'obtenir l'identifiant de la tâche...` Très bien si vous avez ce besoin, mais ce n'est pas le cas ici. (En outre, l'utilisation de uuid() pour générer un identifiant de tâche ne fait absolument rien au-delà de ce que Celery fait par défaut).

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X