55 votes

Attendre jusqu'à ce que l'un quelconque de Future<T> soit terminé

J'ai quelques tâches asynchrones en cours d'exécution et j'ai besoin d'attendre qu'au moins l'une d'entre elles soit terminée (à l'avenir, probablement j'aurai besoin d'attendre que M sur N tâches soient terminées). Actuellement, elles sont présentées sous forme de Future, donc j'ai besoin de quelque chose comme

/**
 * Bloque le thread actuel jusqu'à ce qu'un des futurs spécifiés soit terminé et le renvoie.
 */
public static  Future waitForAny(Collection> futures) 
        throws AllFuturesFailedException

Est-ce qu'il y a quelque chose de similaire à cela? Ou quelque chose de similaire, pas nécessairement pour Future. Actuellement, je boucle à travers la collection de futurs, vérifie si l'un est terminé, puis attends pendant un certain temps et vérifie à nouveau. Cela ne semble pas être la meilleure solution, car si j'attends pendant une longue période, alors un retard non souhaité est ajouté, si j'attends pendant une courte période, cela peut affecter les performances.

Je pourrais essayer d'utiliser

new CountDownLatch(1)

et diminuer le décompte lorsque la tâche est terminée et faire

countdown.await()

, mais j'ai découvert qu'il est possible seulement si je contrôle la création du Future. C'est possible, mais cela nécessite une refonte du système, car actuellement la logique de création des tâches (envoi de Callable à ExecutorService) est séparée de la décision d'attendre pour quel Future. Je pourrais également remplacer

 RunnableFuture AbstractExecutorService.newTaskFor(Callable callable)

et créer une implémentation personnalisée de RunnableFuture avec la capacité de rattacher un auditeur pour être notifié lorsque la tâche est terminée, puis attacher un tel auditeur aux tâches nécessaires et utiliser CountDownLatch, mais cela signifie que je dois remplacer newTaskFor pour chaque ExecutorService que j'utilise - et potentiellement il y aura des implémentations qui n'étendent pas AbstractExecutorService. Je pourrais également essayer de décorer le ExecutorService donné à cette fin, mais alors je dois décorer toutes les méthodes produisant des Futures.

Toutes ces solutions peuvent fonctionner mais semblent très artificielles. On dirait que je rate quelque chose de simple, comme

WaitHandle.WaitAny(WaitHandle[] waitHandles)

en c#. Y a-t-il des solutions bien connues pour ce genre de problème?

MISE À JOUR:

À l'origine, je n'avais pas du tout accès à la création de Futures, donc il n'y avait pas de solution élégante. Après avoir redessiné le système, j'ai eu accès à la création de Futures et j'ai pu ajouter countDownLatch.countdown() au processus d'exécution, puis je peux countDownLatch.await() et tout fonctionne bien. Merci pour les autres réponses, je ne connaissais pas ExecutorCompletionService et cela peut effectivement être utile dans des tâches similaires, mais dans ce cas particulier, il ne pouvait pas être utilisé car certains Futures sont créés sans aucun exécuteur - la tâche réelle est envoyée à un autre serveur via le réseau, se termine à distance et la notification de terminaison est reçue.

4 votes

Beaucoup de personnes qui viennent à cette question préféreront probablement regarder en dessous de la réponse longuement acceptée aux réponses les mieux votées qui se réfèrent à ExecutorCompletionService et ExecutorService.invokeAny().

55voto

james Points 471

Simple, consultez ExecutorCompletionService.

6 votes

La documentation de cette classe, y compris un exemple de la façon d'annuler toutes les autres tâches après que la première est terminée (si c'est ce que vous voulez faire), peut être trouvée à java.sun.com/javase/6/docs/api/java/util/concurrent/…

4 votes

ExecutorCompletionService ne peut pas accepter de Futures, à ma connaissance cela ne répond pas à la question initiale.

0 votes

Curieusement, j'ai toujours été puni pour des réponses d'un seul mot dans le passé. Je suppose que c'est parce que je ne suis pas une personne décente.

9voto

ykaganovich Points 8497

4 votes

Presque. Cela prend une Collection plutôt qu'une Collection

7voto

Alex Miller Points 28225

Pourquoi ne pas simplement créer une file de résultats et attendre sur la file ? Ou plus simplement, utiliser un CompletionService car c'est ce que c'est : un ExecutorService + file de résultats.

6voto

Scott Stanchfield Points 15863

C'est en fait assez facile avec wait() et notifyAll().

Tout d'abord, définissez un objet de verrouillage. (Vous pouvez utiliser n'importe quelle classe pour cela, mais j'aime être explicite):

package com.javadude.sample;

public class Lock {}

Ensuite, définissez votre thread travailleur. Il doit notifier cet objet de verrouillage lorsqu'il a terminé son traitement. Notez que la notification doit se trouver dans un bloc synchronisé verrouillant sur l'objet de verrouillage.

package com.javadude.sample;

public class Worker extends Thread {
    private Lock lock_;
    private long timeToSleep_;
    private String name_;
    public Worker(Lock lock, String name, long timeToSleep) {
        lock_ = lock;
        timeToSleep_ = timeToSleep;
        name_ = name;
    }
    @Override
    public void run() {
        // faire du vrai travail -- en utilisant un sleep ici pour simuler le travail
        try {
            sleep(timeToSleep_);
        } catch (InterruptedException e) {
            interrupt();
        }
        System.out.println(name_ + " a fini... en train de notifier");
        // notifier ceux qui attendent, dans ce cas, le client
        synchronized (lock_) {
            lock_.notify();
        }
    }
}

Enfin, vous pouvez écrire votre client:

package com.javadude.sample;

public class Client {
    public static void main(String[] args) {
        Lock lock = new Lock();
        Worker worker1 = new Worker(lock, "worker1", 15000);
        Worker worker2 = new Worker(lock, "worker2", 10000);
        Worker worker3 = new Worker(lock, "worker3", 5000);
        Worker worker4 = new Worker(lock, "worker4", 20000);

        boolean started = false;
        int numNotifies = 0;
        while (true) {
            synchronized (lock) {
                try {
                    if (!started) {
                        // besoin de faire le start ici pour que nous prenions le verrou, juste
                        //   au cas où l'un des threads est rapide -- si nous avions fait le
                        //   starts en dehors du bloc synchronisé, un thread rapide pourrait
                        //   arriver à sa notification *avant* que le client l'attende
                        worker1.start();
                        worker2.start();
                        worker3.start();
                        worker4.start();
                        started = true;
                    }
                    lock.wait();
                } catch (InterruptedException e) {
                    break;
                }
                numNotifies++;
                if (numNotifies == 4) {
                    break;
                }
                System.out.println("Notifié!");
            }
        }
        System.out.println("Tout le monde m'a notifié... J'ai fini");
    }
}

0 votes

Votre solution est ce à quoi je pensais. Elle semble propre et plus facile à comprendre. +1

4voto

jdmichal Points 6283

As far as I know, Java has no analogous structure to the WaitHandle.WaitAny method.

Il me semble que cela pourrait être réalisé à travers un décorateur "WaitableFuture" :

public WaitableFuture
    extends Future
{
    private CountDownLatch countDownLatch;

    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();

        this.countDownLatch = countDownLatch;
    }

    void doTask()
    {
        super.doTask();

        this.countDownLatch.countDown();
    }
}

Cependant, ceci ne fonctionnerait que s'il peut être inséré avant le code d'exécution, sinon le code d'exécution n'aurait pas la nouvelle méthode doTask(). Mais je ne vois vraiment aucun moyen de le faire sans sondage si vous ne pouvez pas somehow obtenir le contrôle de l'objet Future avant l'exécution.

Ou si le futur s'exécute toujours dans son propre thread, et que vous pouvez somehow récupérer ce thread. Alors vous pourriez démarrer un nouveau thread pour rejoindre chaque autre thread, puis gérer le mécanisme d'attente après le retour du join... Cela serait vraiment laid et entraînerait beaucoup de surcharge cependant. Et si certains objets Future ne se terminent pas, vous pourriez avoir beaucoup de threads bloqués dépendant de threads morts. Si vous n'êtes pas prudent, cela pourrait provoquer des fuites de mémoire et des ressources système.

/**
 * Façon extrêmement laide de mettre en œuvre WaitHandle.WaitAny pour Thread.Join().
 */
public static joinAny(Collection threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);

    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }

    countDownLatch.await();
}

class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;

    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }

    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X