34 votes

Java ExecutorService: waitTermination de toutes les tâches créées récursivement

J'utilise un ExecutorService pour exécuter une tâche. Cette tâche de manière récursive peut créer d'autres tâches qui sont soumis à la même ExecutorService et ceux de l'enfant tâches peuvent aussi le faire.

J'ai maintenant le problème que je veux attendre jusqu'à ce que toutes les tâches sont effectuées (qui est, toutes les tâches sont terminées et ils n'ont pas à soumettre de nouvelles) avant de continuer.

Je ne peux pas appeler ExecutorService.shutdown() dans le thread principal, car cela empêche de nouvelles tâches d'être accepté par l' ExecutorService.

Et en Appelant ExecutorService.awaitTermination() semble ne rien faire si shutdown n'a pas été appelée.

Donc je suis un peu coincé ici. Il ne peut pas être si difficile que ça pour l' ExecutorService de voir que tous les travailleurs sont en veille, peut-il? La seule solution peu élégante que je pouvais venir est d'utiliser directement un ThreadPoolExecutor de requête et de ses getPoolSize() à chaque fois dans un tout. Il n'y a vraiment pas de meilleure façon de faire cela?

19voto

John Vint Points 19804

C'est vraiment un candidat idéal pour un Phaser. Java 7 est de sortir avec cette nouvelle classe. Sa souplesse CountdonwLatch/CyclicBarrier. Vous pouvez obtenir une version stable de la JSR 166 Intérêt du Site.

La façon dont il est plus souple CountdownLatch/CyclicBarrier est parce qu'il est capable non seulement de soutenir un nombre inconnu de parties (threads), mais également réutilisables (c'est là que la partie de la phase de vient)

Pour chaque tâche que vous soumettez vous inscrire, lorsque la tâche est terminée, vous arrivez. Cela peut être fait de manière récursive.

Phaser phaser = new Phaser();
ExecutorService e = //

Runnable recursiveRunnable = new Runnable(){
   public void run(){
      //do work recursively if you have to

      if(shouldBeRecursive){
           phaser.register();
           e.submit(recursiveRunnable);
      }

      phaser.arrive();
   }
}

public void doWork(){
   int phase = phaser.getPhase();

   phaser.register();
   e.submit(recursiveRunnable);

   phaser.awaitAdvance(phase);
}

Edit: Merci @depthofreality pour souligner la condition de la course dans mon exemple précédent. Je suis à jour de sorte que l'exécution du thread n'attend que l'avance de la phase en cours, car il bloque pour la fonction récursive pour terminer.

Le numéro de phase ne sera pas du voyage jusqu'à ce que le nombre d' arrives == registers. Car, avant chaque appel récursif invoque register d'une phase incrément de se produire lorsque toutes les invocations sont complètes.

17voto

axtavt Points 126632

Si le nombre de tâches dans l'arborescence de récursive des tâches est d'abord inconnu, peut-être la façon la plus simple serait de mettre en place votre propre primitives de synchronisation, une sorte de "inverse sémaphore", et de les partager parmi vos tâches. Avant de soumettre chaque tâche que vous incrémenter une valeur, lorsque la tâche est terminée, il décrémente la valeur, et vous attendez jusqu'à ce que la valeur est de 0.

La mise en œuvre comme primitive explicitement appelé à partir de tâches de dissocier cette logique à partir du pool de threads mise en œuvre et permet de présenter plusieurs des arbres indépendants de récursive des tâches dans la même piscine.

Quelque chose comme ceci:

public class InverseSemaphore {
    private int value = 0;
    private Object lock = new Object();

    public void beforeSubmit() {
        synchronized(lock) {
            value++;
        }
    }

    public void taskCompleted() {
        synchronized(lock) {
            value--;
            if (value == 0) lock.notifyAll();
        }
    }

    public void awaitCompletion() throws InterruptedException {
        synchronized(lock) {
            while (value > 0) lock.wait();
        }
    }
}

Notez que taskCompleted() devrait être appelée à l'intérieur d'un finally bloc, pour le rendre immunitaire à d'éventuelles exceptions.

Notez aussi que l' beforeSubmit() doit être appelé par le soumissionnaire filet avant que la tâche est soumis, et non par la tâche elle-même, pour éviter d'éventuels "faux achèvement" quand le vieux les tâches sont terminées et les nouveaux n'ont pas encore commencé.

EDIT: problème Important avec l'utilisation de modèle fixe.

7voto

Christoph Points 341

Wow, vous êtes rapide:)

Merci pour toutes les suggestions. Les contrats à terme ne sont pas facilement intégrer avec mon modèle car je ne sais pas combien de runnables sont planifiées à l'avance. Donc, si je garde une tâche parente vivante juste à attendre qu'il est récursif enfant tâches pour terminer, j'ai beaucoup de déchets autour de la pose.

J'ai résolu mon problème en utilisant la AtomicInteger suggestion. Essentiellement, je sous-classé ThreadPoolExecutor et incrémenter le compteur sur les appels à execute() et de décrémentation sur les appels à afterExecute(). Lorsque le compteur obtient 0 I appel de l'arrêt(). Cela semble fonctionner pour mes problèmes, ne sais pas si c'est généralement une bonne façon de le faire. En particulier, je suppose que vous n'utilisez execute() pour ajouter Runnables.

Comme un côté d'un nœud: j'ai d'abord essayé de vérifier dans afterExecute() le nombre de Runnables dans la file d'attente et le nombre de travailleurs actifs et à l'arrêt lorsque ceux-ci sont 0; mais cela ne fonctionne pas car pas tous les Runnables a montré dans la file d'attente et le getActiveCount() n'ai pas fait ce que j'attendais non plus.

De toute façon, voici ma solution: (si quelqu'un trouve de sérieux problèmes avec cela, s'il vous plaît laissez-moi savoir:)

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger executing = new AtomicInteger(0);

    public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
        TimeUnit seconds, BlockingQueue<Runnable> queue) {
        super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
    }


    @Override
    public void execute(Runnable command) {
        //intercepting beforeExecute is too late!
        //execute() is called in the parent thread before it terminates
        executing.incrementAndGet();
        super.execute(command);
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        int count = executing.decrementAndGet();
        if(count == 0) {
            this.shutdown();
        }
    }

}

3voto

Peter Lawrey Points 229686

Vous pouvez créer votre propre pool de threads qui étend ThreadPoolExecutor. Vous voulez savoir quand une tâche a été soumise et quand elle se termine.

 public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    private int counter = 0;

    public MyThreadPoolExecutor() {
        super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    public synchronized void execute(Runnable command) {
        counter++;
        super.execute(command);
    }

    @Override
    protected synchronized void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        counter--;
        notifyAll();
    }

    public synchronized void waitForExecuted() throws InterruptedException {
        while (counter == 0)
            wait();
    }
}
 

0voto

Suraj Chandran Points 12859

Utilisation CountDownLatch. Passer le CountDownLatch objet de chacune de vos tâches et le code de vos tâches quelque chose comme ci-dessous.

public void doTask() {
    // do your task
    latch.countDown(); 
}

Alors que le thread qui a besoin d'attendre que doit exécuter le code suivant:

public void doWait() {
    latch.await();
}

Mais bien sûr, cela suppose que vous le savez déjà, le nombre d'enfants des tâches de sorte que vous pouvez initialiser le loquet de comptage.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X