513 votes

Comment obtenir la valeur de retour d'un thread en python ?

La fonction foo ci-dessous retourne une chaîne de caractères 'foo' . Comment puis-je obtenir la valeur 'foo' qui est renvoyé par la cible du fil ?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

La "seule façon évidente de le faire", présentée ci-dessus, ne fonctionne pas : thread.join() a retourné None .

361voto

kindall Points 60645

Une façon que j'ai vue est de passer un objet mutable, tel qu'une liste ou un dictionnaire, au constructeur du thread, avec un index ou un autre identifiant de quelque sorte. Le thread peut alors stocker ses résultats dans son emplacement dédié dans cet objet. Par exemple :

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

Si vous voulez vraiment join() pour retourner la valeur de retour de la fonction appelée, vous pouvez le faire avec une balise Thread comme suit :

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

Cela devient un peu délicat à cause d'une certaine manipulation des noms, et il accède à des structures de données "privées" qui sont spécifiques à Thread mise en œuvre... mais ça marche.

Pour python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None
    def run(self):
        print(type(self._target))
        if self._target is not None:
            self._return = self._target(*self._args,
                                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return

0 votes

Merci, je peux voir que ce serait bien comme solution de contournement, mais cela change la définition de la fonction de sorte qu'elle ne fonctionne pas vraiment. return Je voulais savoir dans mon cas original, où fait ce "foo" va réellement... ?

0 votes

@wim : Les valeurs de retour vont quelque part seulement si vous les mettez quelque part. Malheureusement, dans le cas de Thread tout ce qui se passe à l'intérieur de la classe -- le défaut run() ne conserve pas la valeur de retour, vous la perdez donc. Vous pourriez écrire votre propre Thread pour gérer cela, cependant. J'ai essayé de le faire dans mon message.

53 votes

Cool, merci pour l'exemple ! je me demande pourquoi Thread n'a pas été implémenté avec la gestion d'une valeur de retour en premier lieu, cela semble être une chose assez évidente à supporter.

315voto

Jake Biesinger Points 717

Pour info, le multiprocessing dispose d'une interface agréable à cet effet en utilisant le module Pool classe. Et si vous voulez vous en tenir aux threads plutôt qu'aux processus, vous pouvez simplement utiliser la classe multiprocessing.pool.ThreadPool en remplacement d'un cours libre.

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.

0 votes

C'est la meilleure réponse car elle n'est pas intrusive.

0 votes

Il est à noter que dans python 2.7, vous ne pouvez pas passer d'objets dans le tuple args. (Je viens de vérifier, je ne peux pas non plus le faire en 3.2).

1 votes

@CornSmith Je ne suis pas sûr de ce que vous voulez dire... Cela fonctionne bien pour moi avec python 2.7.5... N'oubliez pas d'inclure les arguments dans un tuple. (object1,) : def foo2(testinstance): print 'hello {0}'.format(testinstance.bar) class testme(object): def __init__(self, bar): self.bar = bar async_result = pool.apply_async(foo2, (testme('I am bar'),))

102voto

bj0 Points 825

La réponse de Jake est bonne, mais si vous ne voulez pas utiliser un pool de threads (vous ne savez pas de combien de threads vous aurez besoin, mais vous les créez au fur et à mesure des besoins), un bon moyen de transmettre des informations entre les threads est l'outil intégré Queue.Queue car elle offre la sécurité des fils.

J'ai créé le décorateur suivant pour qu'il agisse de manière similaire au threadpool :

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

Alors vous l'utilisez comme :

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

La fonction décorée crée un nouveau thread à chaque fois qu'elle est appelée et renvoie un objet Thread qui contient la file d'attente qui recevra le résultat.

UPDATE

Cela fait un certain temps que j'ai posté cette réponse, mais elle est toujours consultée. J'ai donc pensé la mettre à jour pour refléter la façon dont je procède dans les nouvelles versions de Python :

Python 3.2 ajouté dans le concurrent.futures qui fournit une interface de haut niveau pour les tâches parallèles. Il fournit ThreadPoolExecutor et ProcessPoolExecutor Vous pouvez donc utiliser un pool de threads ou de processus avec la même interface utilisateur.

L'un des avantages de cette api est que la soumission d'une tâche à un groupe de travail de l'UE peut être effectuée par le biais de l'api. Executor renvoie un Future qui sera complété par la valeur de retour du callable que vous avez soumis.

Cela permet d'attacher un queue inutile, ce qui simplifie considérablement le décorateur :

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

Cela utilisera une valeur par défaut module exécuteur du pool de threads s'il n'y en a pas.

L'utilisation est très similaire à celle d'avant :

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

Si vous utilisez Python 3.4+, l'une des fonctionnalités très intéressantes de cette méthode (et des objets Future en général) est que le futur retourné peut être enveloppé pour le transformer en un objet de type asyncio.Future avec asyncio.wrap_future . Cela lui permet de fonctionner facilement avec les coroutines :

result = await asyncio.wrap_future(long_task(10))

Si vous n'avez pas besoin d'accéder au sous-jacent concurrent.Future vous pouvez inclure le wrap dans le décorateur :

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

Ensuite, chaque fois que vous avez besoin d'écarter du fil de la boucle d'événement un code à forte intensité de calcul ou bloquant, vous pouvez le placer dans une fonction décorée :

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()

0 votes

Je n'arrive pas à faire fonctionner ce système ; j'obtiens une erreur disant AttributeError: 'module' object has no attribute 'Lock' cela semble émaner de la ligne y = long_task(10) ... pensées ?

1 votes

Le code n'utilise pas explicitement Lock, le problème pourrait donc se situer ailleurs dans votre code. Vous pouvez poster une nouvelle question SO à ce sujet.

0 votes

Pourquoi result_queue est-il un attribut d'instance ? Serait-il préférable que ce soit un attribut de classe afin que les utilisateurs n'aient pas à savoir qu'il faut appeler result_queue lorsqu'on utilise @threaded qui n'est pas explicite et ambigu ?

29voto

user2426679 Points 137

J'ai volé la réponse de kindall et l'ai nettoyée un peu.

La partie la plus importante est d'ajouter *args et **kwargs à join() afin de gérer le délai d'attente.

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)

        self._return = None

    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)

    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)

        return self._return

RÉPONSE ACTUALISÉE CI-DESSOUS

C'est ma réponse la plus populaire, j'ai donc décidé de la mettre à jour avec un code qui fonctionnera à la fois sur py2 et py3.

De plus, je vois de nombreuses réponses à cette question qui montrent un manque de compréhension concernant Thread.join(). Certaines ne gèrent pas du tout la fonction timeout arg. Mais il y a également un cas de figure dont vous devez être conscient concernant les cas où vous avez (1) une fonction cible qui peut renvoyer None et (2) vous passez également l'épreuve timeout arg à join(). Veuillez consulter le "TEST 4" pour comprendre ce cas particulier.

Classe ThreadWithReturn qui fonctionne avec py2 et py3 :

import sys
from threading import Thread
from builtins import super    # https://stackoverflow.com/a/30159479

_thread_target_key, _thread_args_key, _thread_kwargs_key = (
    ('_target', '_args', '_kwargs')
    if sys.version_info >= (3, 0) else
    ('_Thread__target', '_Thread__args', '_Thread__kwargs')
)

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None

    def run(self):
        target = getattr(self, _thread_target_key)
        if not target is None:
            self._return = target(
                *getattr(self, _thread_args_key),
                **getattr(self, _thread_kwargs_key)
            )

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

Quelques exemples de tests sont présentés ci-dessous :

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

Pouvez-vous identifier le cas de figure que nous pourrions rencontrer avec TEST 4 ?

Le problème est que nous nous attendons à ce que giveMe() renvoie None (voir TEST 2), mais nous nous attendons également à ce que join() renvoie None s'il s'arrête.

returned is None signifie soit :

(1) c'est ce que giveMe() a retourné, ou

(2) join() a expiré

Cet exemple est trivial puisque nous savons que giveMe() retournera toujours None. Mais dans les cas réels (où la cible peut légitimement retourner None ou autre chose), nous voudrions vérifier explicitement ce qui s'est passé.

Voici comment aborder ce cas de figure :

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()

0 votes

Connaissez-vous l'équivalent de _Thread_target pour Python3 ? Cet attribut n'existe pas dans Python3.

0 votes

J'ai regardé dans le fichier threading.py, il s'avère que c'est _target (d'autres attributs portent un nom similaire).

0 votes

Vous pouvez éviter d'accéder aux variables privées de la classe thread, si vous enregistrez le fichier target , args et kwargs arguments pour init comme variables membres dans votre classe.

8voto

Peter Lonjers Points 51

Ma solution au problème est d'envelopper la fonction et le fil dans une classe. Il n'est pas nécessaire d'utiliser les pools, les files d'attente ou le passage de variables de type C. Il est également non bloquant. C'est également non bloquant. Vous vérifiez l'état à la place. Voir l'exemple d'utilisation à la fin du code.

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()

0 votes

Comment gérer une exception ? disons que la fonction add reçoit un int et un str. est-ce que tous les threads échouent ou seulement un ?

0 votes

+1 pour penser comme moi. Sérieusement - c'est le moindre effort. Et si vous codez en Python - votre travail devrait automatiquement être fait dans une classe, donc c'est légitimement la façon la plus sensée d'aborder ce problème.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X