22 votes

Django Celery : compter les tâches

J'utilise actuellement django avec celery et tout fonctionne bien.

Cependant, je veux pouvoir donner aux utilisateurs la possibilité d'annuler une tâche si le serveur est surchargé en vérifiant combien de tâches sont actuellement programmées.

Comment puis-je y parvenir ?

J'utilise redis comme courtier.

Je viens de trouver ceci : Récupérer la liste des tâches dans une file d'attente en Celery

C'est en quelque sorte lié à mon problème mais je n'ai pas besoin de lister les tâches, juste de les compter :)

31voto

Stephen J. Fuhry Points 3549

Voici comment vous pouvez obtenir le nombre de messages dans une file d'attente en utilisant celery qui est agnostique vis-à-vis des courtiers.

En utilisant connection_or_acquire Vous pouvez minimiser le nombre de connexions ouvertes à votre broker en utilisant le pooling de connexion interne de celery.

celery = Celery(app)

with celery.connection_or_acquire() as conn:
    conn.default_channel.queue_declare(
        queue='my-queue', passive=True).message_count

Vous pouvez également étendre Celery pour fournir cette fonctionnalité :

from celery import Celery as _Celery

class Celery(_Celery)

    def get_message_count(self, queue):
        '''
        Raises: amqp.exceptions.NotFound: if queue does not exist
        '''
        with self.connection_or_acquire() as conn:
            return conn.default_channel.queue_declare(
                queue=queue, passive=True).message_count

celery = Celery(app)
num_messages = celery.get_message_count('my-queue')

25voto

Nino Walker Points 2272

Si votre courtier est configuré comme redis://localhost:6379/1 et vos tâches sont soumises au général celery vous pouvez alors obtenir la longueur par les moyens suivants :

import redis
queue_name = "celery"
client = redis.Redis(host="localhost", port=6379, db=1)
length = client.llen(queue_name)

Ou, à partir d'un shell script (bon pour les moniteurs et autres) :

$ redis-cli -n 1 -h localhost -p 6379 llen celery

6voto

grucin Points 136

Si vous avez déjà configuré redis dans votre application, vous pouvez essayer ceci :

from celery import Celery

QUEUE_NAME = 'celery'

celery = Celery(app)
client = celery.connection().channel().client

length = client.llen(QUEUE_NAME)

4voto

Shemhamforasch Points 153

Obtenir une instance du client redis utilisé par Celery, puis vérifier la longueur de la file d'attente. N'oubliez pas de libérer la connexion à chaque fois que vous l'utilisez (utilisez la fonction .acquire ) :

# Get a configured instance of celery:
from project.celery import app as celery_app

def get_celery_queue_len(queue_name):
    with celery_app.pool.acquire(block=True) as conn:
        return conn.default_channel.client.llen(queue_name)

Obtenez toujours une connexion à partir du pool, ne la créez pas manuellement. Sinon, votre serveur redis sera à court d'emplacements de connexion et cela tuera vos autres clients.

1voto

saaj Points 412

Je vais développer la réponse de @StephenFuhry au sujet de l'erreur "not-found", car une manière plus ou moins agnostique de récupérer la longueur de la file d'attente est bénéfique même si Celery suggère de s'adresser directement aux courtiers . Dans Celery 4 (avec Redis broker) cette erreur ressemble à :

ChannelError: Channel.queue_declare: (404) NOT_FOUND - no queue 'NAME' in vhost '/'

Observations :

  1. ChannelError est un kombu exception (en fait, c'est amqp et kombu le "réexporter").

  2. Sur le courtier Redis, Celery/Kombu représentent les files d'attente comme des listes Redis.

  3. Les clés de type collection Redis sont retiré dès que la collection devient vide

  4. Si nous regardons ce que queue_declare fait, il a ces lignes :

    if passive and not self._has_queue(queue, **kwargs):
        raise ChannelError(...)
  5. Le transport virtuel Redis de Kombu _has_queue es ce :

    def _has_queue(self, queue, **kwargs):
        with self.conn_or_acquire() as client:
            with client.pipeline() as pipe:
                for pri in self.priority_steps:
                    pipe = pipe.exists(self._q_for_pri(queue, pri))
                return any(pipe.execute())

La conclusion est que sur un courtier Redis ChannelError soulevées à partir de queue_declare est correct (pour une file existante bien sûr), et signifie simplement que la file est vide.

Voici un exemple de la façon d'afficher la longueur de toutes les files d'attente actives de Celery (normalement, elle devrait être de 0, à moins que votre travailleur ne puisse pas faire face aux tâches).

from kombu.exceptions import ChannelError

def get_queue_length(name):
    with celery_app.connection_or_acquire() as conn: 
        try:
            ok_nt = conn.default_channel.queue_declare(queue=name, passive=True)
        except ChannelError:
            return 0
        else:
            return ok_nt.message_count

for queue_info in celery_app.control.inspect().active_queues().values():
    print(queue_info[0]['name'], get_queue_length(queue_info[0]['name']))

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X