62 votes

Exécuter des tâches "uniques" avec celery

J'utilise celery pour mettre à jour les flux RSS de mon site d'agrégation de nouvelles. J'utilise une @task pour chaque flux, et les choses semblent bien fonctionner.

Il y a cependant un détail que je ne suis pas sûr de bien gérer : tous les flux sont mis à jour une fois par minute avec une tâche @periodic_task, mais que se passe-t-il si un flux est toujours en cours de mise à jour depuis la dernière tâche périodique lorsqu'une nouvelle tâche est lancée ? (par exemple si le flux est vraiment lent, ou hors ligne et que la tâche est maintenue dans une boucle de réessai).

Actuellement, je stocke les résultats des tâches et je vérifie leur état comme ceci :

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed

_results = {}

@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
    for feed in Feed.objects.all():
        if feed.pk in _results:
            if not _results[feed.pk].ready():
                # The task is not finished yet
                continue
        _results[feed.pk] = update_feed.delay(feed)

@task()
def update_feed(feed):
    try:
        feed.fetch_articles()
    except socket.error, exc:
        update_feed.retry(args=[feed], exc=exc)

Peut-être existe-t-il un moyen plus sophistiqué/robuste d'obtenir le même résultat en utilisant un mécanisme de celery qui m'aurait échappé ?

47voto

SteveJ Points 253

D'après la réponse de MattH, vous pourriez utiliser un décorateur comme celui-ci :

def single_instance_task(timeout):
    def task_exc(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            lock_id = "celery-single-instance-" + func.__name__
            acquire_lock = lambda: cache.add(lock_id, "true", timeout)
            release_lock = lambda: cache.delete(lock_id)
            if acquire_lock():
                try:
                    func(*args, **kwargs)
                finally:
                    release_lock()
        return wrapper
    return task_exc

ensuite, utilisez-le comme ça...

@periodic_task(run_every=timedelta(minutes=1))
@single_instance_task(60*10)
def fetch_articles()
    yada yada...

1 votes

Juste ce dont j'avais besoin ! Merci !

10 votes

Merci ; cela a fonctionné pour moi ! Notez cependant que cela ne fonctionne pas avec les CACHES par défaut de django car la valeur par défaut est le cache en mémoire locale, ce qui signifie que chaque processus a son propre cache, donc chaque celery worker (processus) aura son propre cache.....

35voto

MattH Points 15352

Extrait de la documentation officielle : S'assurer qu'une tâche n'est exécutée qu'une fois à la fois. .

1 votes

Je ne vois rien de supérieur dans cette approche, c'est beaucoup plus complexe mais fait fondamentalement la même chose (et utiliser le cache de django pour stocker les verrous semble maladroit).

5 votes

Oh, j'ai oublié un détail important, il rend le processus de verrouillage et le thread sûr.

0 votes

Savez-vous si cela est toujours valable lorsqu'on écrit dans une variable globale ? stackoverflow.com/questions/7719203/

19voto

vdboor Points 6259

Utilisation de https://pypi.python.org/pypi/celery_once semble très bien faire le travail, y compris signaler les erreurs et tester l'unicité de certains paramètres.

Tu peux faire des choses comme :

from celery_once import QueueOnce
from myapp.celery import app
from time import sleep

@app.task(base=QueueOnce, once=dict(keys=('customer_id',)))
def start_billing(customer_id, year, month):
    sleep(30)
    return "Done!"

qui nécessite simplement les paramètres suivants dans votre projet :

ONCE_REDIS_URL = 'redis://localhost:6379/0'
ONCE_DEFAULT_TIMEOUT = 60 * 60  # remove lock after 1 hour in case it was stale

10voto

keithl8041 Points 518

Si vous recherchez un exemple qui n'utilise pas Django, alors essayez cet exemple (attention : utilise Redis à la place, que j'utilisais déjà).

Le code du décorateur est le suivant (crédit complet à l'auteur de l'article, allez le lire)

import redis

REDIS_CLIENT = redis.Redis()

def only_one(function=None, key="", timeout=None):
    """Enforce only one celery task at a time."""

    def _dec(run_func):
        """Decorator."""

        def _caller(*args, **kwargs):
            """Caller."""
            ret_value = None
            have_lock = False
            lock = REDIS_CLIENT.lock(key, timeout=timeout)
            try:
                have_lock = lock.acquire(blocking=False)
                if have_lock:
                    ret_value = run_func(*args, **kwargs)
            finally:
                if have_lock:
                    lock.release()

            return ret_value

        return _caller

    return _dec(function) if function is not None else _dec

1 votes

Est-il possible de faire cela dans rabbitMQ ?

6voto

Ander Points 1469

Je me demandais pourquoi personne n'avait mentionné l'utilisation celery.app.control.inspect().active() pour obtenir la liste des tâches en cours d'exécution. N'est-ce pas en temps réel ? Parce que sinon, ce serait très facile à mettre en œuvre, par exemple :

def unique_task(callback,  *decorator_args, **decorator_kwargs):
    """
    Decorator to ensure only one instance of the task is running at once.
    """
    @wraps(callback)
    def _wrapper(celery_task, *args, **kwargs):
        active_queues = task.app.control.inspect().active()
        if active_queues:
            for queue in active_queues:
                for running_task in active_queues[queue]:
                    # Discard the currently running task from the list.
                    if task.name == running_task['name'] and task.request.id != running_task['id']:
                        return f'Task "{callback.__name__}()" cancelled! already running...'

        return callback(celery_task, *args, **kwargs)

    return _wrapper

Et ensuite, il suffit d'appliquer le décorateur aux tâches correspondantes :

@celery.task(bind=True)
@unique_task
def my_task(self):
    # task executed once at a time.
    pass

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X