Je vais développer la réponse de @StephenFuhry au sujet de l'erreur "not-found", car une manière plus ou moins agnostique de récupérer la longueur de la file d'attente est bénéfique même si Celery suggère de s'adresser directement aux courtiers . Dans Celery 4 (avec Redis broker) cette erreur ressemble à :
ChannelError: Channel.queue_declare: (404) NOT_FOUND - no queue 'NAME' in vhost '/'
Observations :
-
ChannelError
est un kombu
exception (en fait, c'est amqp
et kombu
le "réexporter").
-
Sur le courtier Redis, Celery/Kombu représentent les files d'attente comme des listes Redis.
-
Les clés de type collection Redis sont retiré dès que la collection devient vide
-
Si nous regardons ce que queue_declare
fait, il a ces lignes :
if passive and not self._has_queue(queue, **kwargs):
raise ChannelError(...)
-
Le transport virtuel Redis de Kombu _has_queue
es ce :
def _has_queue(self, queue, **kwargs):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
pipe = pipe.exists(self._q_for_pri(queue, pri))
return any(pipe.execute())
La conclusion est que sur un courtier Redis ChannelError
soulevées à partir de queue_declare
est correct (pour une file existante bien sûr), et signifie simplement que la file est vide.
Voici un exemple de la façon d'afficher la longueur de toutes les files d'attente actives de Celery (normalement, elle devrait être de 0, à moins que votre travailleur ne puisse pas faire face aux tâches).
from kombu.exceptions import ChannelError
def get_queue_length(name):
with celery_app.connection_or_acquire() as conn:
try:
ok_nt = conn.default_channel.queue_declare(queue=name, passive=True)
except ChannelError:
return 0
else:
return ok_nt.message_count
for queue_info in celery_app.control.inspect().active_queues().values():
print(queue_info[0]['name'], get_queue_length(queue_info[0]['name']))