16 votes

tâche de céleri et personnalisation du décorateur

Je travaille sur un projet utilisant django et celery(django-celery). Notre équipe a décidé d'envelopper tout le code d'accès aux données au sein de (app-name)/manager.py (PAS d'enveloppement dans les Managers comme le django ), et laisser le code dans (nom de l'application)/task.py s'occuper uniquement de l'assemblage et de l'exécution des tâches avec celery (ainsi nous n'avons pas de dépendance ORM de django dans cette couche).

Dans mon manager.py j'ai quelque chose comme ça :

def get_tag(tag_name):
    ctype = ContentType.objects.get_for_model(Photo)
    try:
        tag = Tag.objects.get(name=tag_name)
    except ObjectDoesNotExist:
        return Tag.objects.none()
    return tag

def get_tagged_photos(tag):
    ctype = ContentType.objects.get_for_model(Photo)
    return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk)

def get_tagged_photos_count(tag):
    return get_tagged_photos(tag).count()

Dans mon task.py, j'aime les envelopper dans des tâches (puis peut-être utiliser ces tâches pour faire des tâches plus compliquées), donc j'écris ce décorateur :

import manager #the module within same app containing data access functions

class mfunc_to_task(object):
    def __init__(mfunc_type='get'):
        self.mfunc_type = mfunc_type

    def __call__(self, f):
        def wrapper_f(*args, **kwargs):
            callback = kwargs.pop('callback', None)

            mfunc = getattr(manager, f.__name__)

            result = mfunc(*args, **kwargs)
            if callback:
                if self.mfunc_type == 'get':
                    subtask(callback).delay(result)
                elif self.mfunc_type == 'get_or_create':
                    subtask(callback).delay(result[0])
                else:
                    subtask(callback).delay()
            return result            

        return wrapper_f

puis (toujours dans task.py ):

#@task
@mfunc_to_task()
def get_tag():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos_count():
    pass

Tout fonctionne bien sans @task . Mais, après avoir appliqué cette @task décorateur (en haut comme la documentation sur le céleri l'indique), les choses commencent à s'écrouler. Apparemment, chaque fois que le mfunc_to_task.__call__ est appelé, le même task.get_tag est passée comme f . Donc je me suis retrouvé avec le même wrapper_f à chaque fois, et maintenant la seule chose que je peux faire est d'obtenir une seule étiquette.

Je suis nouveau chez les décorateurs. Quelqu'un peut-il m'aider à comprendre ce qui n'a pas fonctionné ici, ou m'indiquer d'autres moyens d'accomplir cette tâche ? Je déteste vraiment écrire le même code d'enveloppement des tâches pour chacune de mes fonctions d'accès aux données.

20voto

michel.iamit Points 1542

Vous ne savez pas vraiment pourquoi le passage d'arguments ne fonctionne pas ?

si vous utilisez cet exemple :

@task()
def add(x, y):
    return x + y

ajoutons un peu de journalisation à la tâche MyCoolTask :

from celery import task
from celery.registry import tasks

import logging
import celery

logger = logging.getLogger(__name__)

class MyCoolTask(celery.Task):

    def __call__(self, *args, **kwargs):
        """In celery task this function call the run method, here you can
        set some environment variable before the run of the task"""
        logger.info("Starting to run")
        return self.run(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        #exit point of the task whatever is the state
        logger.info("Ending run")
        pass

et créer une classe étendue (étendant MyCoolTask, mais maintenant avec des arguments) :

class AddTask(MyCoolTask):

    def run(self,x,y):
        if x and y:
            result=add(x,y)
            logger.info('result = %d' % result)
            return result
        else:
            logger.error('No x or y in arguments')

tasks.register(AddTask)

et assurez-vous que vous passez les kwargs comme des données json :

{"x":8,"y":9}

J'obtiens le résultat :

[2013-03-05 17:30:25,853: INFO/MainProcess] Starting to run
[2013-03-05 17:30:25,855: INFO/MainProcess] result = 17
[2013-03-05 17:30:26,739: INFO/MainProcess] Ending run
[2013-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17

9voto

Mauro Rocco Points 1419

Au lieu d'utiliser le décorateur, pourquoi ne pas créer une classe de base qui étendrait celery.Task ?

De cette façon, toutes vos tâches peuvent étendre votre classe de tâche personnalisée, où vous pouvez mettre en œuvre votre comportement personnel en utilisant des méthodes __call__ y after_return . Vous pouvez également définir des méthodes et des objets communs pour toutes vos tâches.

class MyCoolTask(celery.Task):

    def __call__(self, *args, **kwargs):
        """In celery task this function call the run method, here you can
        set some environment variable before the run of the task"""
        return self.run(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        #exit point of the task whatever is the state
        pass

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X