55 votes

Tâche Celery qui exécute plus de tâches

J'utilise celerybeat pour lancer une tâche primaire qui déclenche un certain nombre de tâches secondaires. Les deux tâches sont déjà écrites.

Existe-t-il un moyen de le faire facilement ? Celery permet-il d'exécuter des tâches à partir d'autres tâches ?

Mon exemple :

@task
def compute(users=None):
    if users is None:
        users = User.objects.all()

    tasks = []
    for user in users:
        tasks.append(compute_for_user.subtask((user.id,)))

    job = TaskSet(tasks)
    job.apply_async() # raises a IOError: Socket closed

@task
def compute_for_user(user_id):
    #do some stuff

compute est appelé depuis celerybeat, mais provoque une IOError lorsqu'il essaie d'exécuter apply_async . Des idées ?

0 votes

0 votes

Peut-on donner un coup de pied à un ensemble de tâches à partir d'une tâche ?

7 votes

Les tâches et les ensembles de tâches peuvent être appliqués depuis l'intérieur d'une tâche, mais vous ne devez jamais attendre leurs résultats (cf. docs.celeryproject.org/en/latest/userguide/ )

36voto

Jeremy W. Sherman Points 22019

Pour répondre à vos premières questions : Depuis la version 2.0, Celery offre un moyen simple de démarrer des tâches à partir d'autres tâches. Ce que vous appelez des "tâches secondaires" sont ce qu'il appelle des "sous-tâches". Consultez la documentation de Ensembles de tâches, Sous-tâches et Callbacks que @Paperino a eu la gentillesse de mettre en lien.

Pour la version 3.0, Celery a changé d'utilisation. groupes pour ce type de comportement, et d'autres.

Votre code montre que vous êtes déjà familiarisé avec cette interface. Votre question semble être la suivante : "Pourquoi est-ce que je reçois un message 'Socket Closed' ? IOError lorsque j'essaie d'exécuter mon ensemble de sous-tâches ?" Je ne pense pas que quiconque puisse répondre à cette question, car vous n'avez pas fourni suffisamment d'informations sur votre programme. Votre extrait ne peut pas être exécuté tel quel, nous ne pouvons donc pas examiner le problème que vous rencontrez par nous-mêmes. Veuillez poster la trace de la pile fournie avec le IOError et avec un peu de chance, quelqu'un qui peut vous aider avec votre casseur se présentera.

11voto

Abhilash Joseph Points 362

Vous pouvez utiliser quelque chose comme ceci (support dans 3.0 )

g = group(compute_for_user.s(user.id) for user in users)
g.apply_async()

0 votes

Donc, dans votre implémentation, la méthode "compute(users=None) :" n'est pas du tout nécessaire, oui ?

1 votes

Ce n'est pas recommandé si vous voulez attendre que les sous-tâches se terminent et récupérer le résultat.

9voto

michel.iamit Points 1542

Et depuis la version 3.0, le terme 'TaskSet' n'est plus utilisé... Les Groupes, Chaînes et Accords en tant que type spécial de sous-tâche sont la nouvelle chose, cf. http://docs.celeryproject.org/en/3.1/whatsnew-3.0.html#group-chord-chain-are-now-subtasks

-1voto

Zheng Liu Points 217

Pour l'erreur IOError mentionnée, bien que les informations ici ne soient pas suffisantes pour dire ce qui l'a causée, je pense que vous avez essayé d'établir une connexion à l'intérieur de la fonction de tâche, donc chaque fois qu'une tâche est appelée, une nouvelle connexion est construite. Si la tâche doit être appelée des milliers de fois, il y aura des milliers de connexions. Cela va inonder le gestionnaire de socket du système et l'erreur IOError est sa plainte.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X