13 votes

Chaîne de tâches Celery et accès aux **kwargs

Je me trouve dans une situation similaire à celle décrite ici sauf qu'au lieu d'enchaîner des tâches avec plusieurs arguments, je veux enchaîner des tâches qui renvoient un dictionnaire avec plusieurs entrées.

C'est - très vaguement et abstraitement --- ce que j'essaie de faire :

tasks.py

@task()
def task1(item1=None, item2=None):
  item3 = #do some stuff with item1 and item2 to yield item3
  return_object = dict(item1=item1, item2=item2, item3=item3)
  return return_object

def task2(item1=None, item2=None, item3=None):
  item4 = #do something with item1, item2, item3 to yield item4
  return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4)
  return return_object

En travaillant avec ipython, je suis capable d'appeler la tâche 1 individuellement et de manière asynchrone, sans aucun problème.

Je peux AUSSI appeler la tâche 2 individuellement avec le résultat retourné par la tâche 1 comme argument double :

>>res1 = task1.s(item1=something, item2=something_else).apply_async()
>>res1.status
'SUCCESS'
>>res2 = task2.s(**res1.result).apply_async()
>>res2.status
'SUCCESS

Cependant, ce que je veux obtenir en fin de compte, c'est le même résultat final que ci-dessus, mais via une chaîne, et ici, je n'arrive pas à comprendre comment faire en sorte que la tâche 2 ne soit pas instanciée avec (positional) arguments retourné par task1, mais avec task1.result comme **kwargs :

chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async()  #THIS DOESN'T WORK!

Je pense que je peux revenir en arrière et réécrire mes tâches pour qu'elles renvoient des arguments positionnels au lieu d'un dictionnaire, et cela pourrait clarifier les choses, mais il me semble qu'il devrait y avoir un moyen d'accéder à l'objet de retour de la tâche 1 dans la tâche 2 avec la fonctionnalité équivalente de la **double étoile. Je soupçonne également que j'ai raté quelque chose d'assez évident à propos de l'implémentation des sous-tâches de Celery ou des *args par rapport aux **kwargs.

J'espère que cela a du sens. Et merci d'avance pour tout conseil.

10voto

Balthazar Rouberol Points 1259

Voici ma façon d'aborder le problème, en utilisant une classe de tâches abstraites :

from __future__ import absolute_import
from celery import Task
from myapp.tasks.celery import app   

class ChainedTask(Task):
    abstract = True    

    def __call__(self, *args, **kwargs):
        if len(args) == 1 and isinstance(args[0], dict):
            kwargs.update(args[0])
            args = ()
        return super(ChainedTask, self).__call__(*args, **kwargs)

@app.task(base=ChainedTask)
def task1(x, y):
    return {'x': x * 2, 'y': y * 2, 'z': x * y}    

@app.task(base=ChainedTask)
def task2(x, y, z):
    return {'x': x * 3, 'y': y * 3, 'z': z * 2}

Vous pouvez maintenant définir et exécuter votre chaîne en tant que telle :

from celery import chain

pipe = chain(task1.s(x=1, y=2) | task2.s())
pipe.apply_async()

2voto

asksol Points 9574

chain et les autres primitives de canevas font partie de la famille des utilitaires fonctionnels comme map y reduce .

Par exemple, où map(target, items) appels target(item) pour chaque élément de la liste, Python dispose d'une version rarement utilisée de map appelée itertools.starmap , qui appelle à la place target(*item) .

Alors que nous pourrions ajouter starchain et même kwstarchain à la boîte à outils, ces seraient très spécialisés et ne seraient probablement pas utilisés aussi souvent.

Il est intéressant de noter que Python a rendu ces expressions inutiles avec les expressions de liste et de générateur, de sorte que map est remplacé par [target(item) for item in item] et starmap avec [target(*item) for item in item] .

Ainsi, au lieu de mettre en œuvre plusieurs alternatives pour chaque primitive, je pense que nous devrions se concentrer sur la recherche d'une manière plus flexible de supporter cela, par exemple en utilisant des expressions génératrices alimentées par celery (si possible, et si ce n'est pas le cas, quelque chose d'aussi puissant)

1voto

Paul McMillan Points 11723

Comme cela n'est pas intégré dans celery, j'ai écrit moi-même une fonction décoratrice pour faire quelque chose de similaire.

# Use this wrapper with functions in chains that return a tuple. The
# next function in the chain will get called with that the contents of
# tuple as (first) positional args, rather than just as just the first
# arg. Note that both the sending and receiving function must have
# this wrapper, which goes between the @task decorator and the
# function definition. This wrapper should not otherwise interfere
# when these conditions are not met.

class UnwrapMe(object):
    def __init__(self, contents):
        self.contents = contents

    def __call__(self):
        return self.contents

def wrap_for_chain(f):
    """ Too much deep magic. """
    @functools.wraps(f)
    def _wrapper(*args, **kwargs):
        if type(args[0]) == UnwrapMe:
            args = list(args[0]()) + list(args[1:])
        result = f(*args, **kwargs)

        if type(result) == tuple and current_task.request.callbacks:
            return UnwrapMe(result)
        else:
            return result
    return _wrapper

Le mien se déballe comme le starchain mais vous pouvez facilement le modifier pour déballer les kwargs à la place.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X