73 votes

Blocage de la file d'attente et consommateur multithread, comment savoir quand arrêter

J'ai un seul thread producteur, ce qui crée une certaine tâche des objets qui sont ensuite ajoutés à un ArrayBlockingQueue (qui est de taille fixe).

Je commence aussi un multi-thread consommateur. C'est construire en tant que fixe le pool de thread (Executors.newFixedThreadPool(threadCount);). J'ai ensuite soumettre certains ConsumerWorker les intances de ce pool de threads, chaque ConsumerWorker avoir une refference de l'mentionnés ci-dessus ArrayBlockingQueue instance.

Chacune de ces Travailleurs vont faire un take() sur la file d'attente et de traiter la tâche.

Ma question est, quelle est la meilleure façon d'avoir un Travailleur de savoir quand il n'y aura pas plus de travail à faire. En d'autres termes, comment puis-je dire aux Travailleurs que le producteur a terminé d'ajouter à la file d'attente, et à partir de ce point, chaque travailleur doit s'arrêter quand il voit que la File d'attente est vide.

Ce que j'ai maintenant est une installation où mon Producteur est initialisé avec une fonction de callback qui est déclenché quand il termine son travail (d'ajouter des trucs à la file d'attente). Je garde aussi une liste de tous les ConsumerWorkers j'ai créé et soumis à la ThreadPool. Lorsque le Producteur de Rappel me dit que le producteur est fait, je peux le dire à chacun des travailleurs. À ce stade, ils devraient tout simplement garder le contrôle si la file d'attente n'est pas vide, et quand il est vide, ils doivent s'arrêter, me permettant ainsi de gracieusement l'arrêt de la ExecutorService pool de threads. C'est quelque chose comme ceci

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

Le problème ici c'est que j'ai une condition évidente et où, parfois, le producteur de la finition, de signal, et la ConsumerWorkers va s'arrêter AVANT de consommer tout dans la file d'attente.

Ma question est quel est le meilleur moyen pour synchroniser ce qui fonctionne bien? Devrais-je synchroniser l'ensemble de la partie où il vérifie si le producteur est en cours d'exécution de plus, si la file d'attente est vide en plus de prendre quelque chose à partir de la file d'attente dans un bloc (sur la file de objet)? Dois-je viens de synchroniser la mise à jour de l' isRunning booléen sur la ConsumerWorker exemple? Toute autre suggestion?

MISE À JOUR, VOICI LE TRAVAIL DE MISE EN ŒUVRE QUE J'AI FINI À L'AIDE DE:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

Cela a été inspirée fortement par JohnVint la réponse ci-dessous, avec seulement quelques modifications mineures.

=== Mise à jour en raison de @vendhan commentaire.

Je vous remercie pour votre obeservation. Vous avez raison, le premier extrait de code dans cette question (entre autres) de l'endroit où l' while(isRunning || !inputQueue.isEmpty()) n'a pas vraiment de sens.

Dans ma réelle mise en œuvre définitive de cela, je fais quelque chose qui est plus proche de votre suggestion de remplacement de "||" (ou) avec "&&" (et), dans le sens que chaque travailleur (de consommation) maintenant, vérifie seulement si l'élément qu'il est obtenu à partir de la liste est un poison, et si oui, s'arrête (donc, en théorie, nous pouvons dire que le travailleur doit être en cours d'exécution ET la file d'attente ne doit pas être vide).

88voto

John Vint Points 19804

Vous devriez continuer à take() de la file d'attente. Vous pouvez utiliser une pilule empoisonnée pour dire au travailleur d'arrêter. Par exemple:

 private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}
 

14voto

NPE Points 169956

J'envoyais aux travailleurs un paquet de travail spécial pour signaler qu'ils devaient fermer leurs portes:

 public class ConsumerWorker implements Runnable{

private static final Produced DONE = new Produced();

private BlockingQueue<Produced> inputQueue;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    for (;;) {
        try {
            Produced item = inputQueue.take();
            if (item == DONE) {
                inputQueue.add(item); // keep in the queue so all workers stop
                break;
            }
            // process `item`
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 

}

Pour arrêter les travailleurs, ajoutez simplement ConsumerWorker.DONE à la file d'attente.

1voto

Bhaskar Points 3314

Dans votre bloc de code où vous essayez de récupérer un élément de la file d'attente, utilisez poll(time,unit) au lieu de take() .

 try { 
    Object queueElement = inputQueue.poll(timeout,unit);
     //process queueElement        
 } catch (InterruptedException e) {
        if(!isRunning && queue.isEmpty())
         return ; 
 } 
 

En spécifiant les valeurs appropriées de timeout, vous vous assurez que les threads ne continueront pas à se bloquer en cas de séquence malencontreuse de

  1. isRunning est vrai
  2. La file d'attente devient vide, les threads entrent donc en attente bloquée (si vous utilisez take()
  3. isRunning est défini sur false

0voto

Kaelin Colclasure Points 2746

Vous pouvez utiliser un certain nombre de stratégies, mais la plus simple consiste à créer une sous-classe de tâches signalant la fin du travail. Le producteur n'envoie pas ce signal directement. Au lieu de cela, il met en file d'attente une instance de cette sous-classe de tâche. Lorsqu'un de vos clients termine cette tâche et l'exécute, le signal est envoyé.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X