J'utilise celery pour mettre à jour les flux RSS de mon site d'agrégation de nouvelles. J'utilise une @task pour chaque flux, et les choses semblent bien fonctionner.
Il y a cependant un détail que je ne suis pas sûr de bien gérer : tous les flux sont mis à jour une fois par minute avec une tâche @periodic_task, mais que se passe-t-il si un flux est toujours en cours de mise à jour depuis la dernière tâche périodique lorsqu'une nouvelle tâche est lancée ? (par exemple si le flux est vraiment lent, ou hors ligne et que la tâche est maintenue dans une boucle de réessai).
Actuellement, je stocke les résultats des tâches et je vérifie leur état comme ceci :
import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed
_results = {}
@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
for feed in Feed.objects.all():
if feed.pk in _results:
if not _results[feed.pk].ready():
# The task is not finished yet
continue
_results[feed.pk] = update_feed.delay(feed)
@task()
def update_feed(feed):
try:
feed.fetch_articles()
except socket.error, exc:
update_feed.retry(args=[feed], exc=exc)
Peut-être existe-t-il un moyen plus sophistiqué/robuste d'obtenir le même résultat en utilisant un mécanisme de celery qui m'aurait échappé ?