J'ai un seul thread producteur, ce qui crée une certaine tâche des objets qui sont ensuite ajoutés à un ArrayBlockingQueue
(qui est de taille fixe).
Je commence aussi un multi-thread consommateur. C'est construire en tant que fixe le pool de thread (Executors.newFixedThreadPool(threadCount);
). J'ai ensuite soumettre certains ConsumerWorker les intances de ce pool de threads, chaque ConsumerWorker avoir une refference de l'mentionnés ci-dessus ArrayBlockingQueue instance.
Chacune de ces Travailleurs vont faire un take()
sur la file d'attente et de traiter la tâche.
Ma question est, quelle est la meilleure façon d'avoir un Travailleur de savoir quand il n'y aura pas plus de travail à faire. En d'autres termes, comment puis-je dire aux Travailleurs que le producteur a terminé d'ajouter à la file d'attente, et à partir de ce point, chaque travailleur doit s'arrêter quand il voit que la File d'attente est vide.
Ce que j'ai maintenant est une installation où mon Producteur est initialisé avec une fonction de callback qui est déclenché quand il termine son travail (d'ajouter des trucs à la file d'attente). Je garde aussi une liste de tous les ConsumerWorkers j'ai créé et soumis à la ThreadPool. Lorsque le Producteur de Rappel me dit que le producteur est fait, je peux le dire à chacun des travailleurs. À ce stade, ils devraient tout simplement garder le contrôle si la file d'attente n'est pas vide, et quand il est vide, ils doivent s'arrêter, me permettant ainsi de gracieusement l'arrêt de la ExecutorService pool de threads. C'est quelque chose comme ceci
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
Le problème ici c'est que j'ai une condition évidente et où, parfois, le producteur de la finition, de signal, et la ConsumerWorkers va s'arrêter AVANT de consommer tout dans la file d'attente.
Ma question est quel est le meilleur moyen pour synchroniser ce qui fonctionne bien? Devrais-je synchroniser l'ensemble de la partie où il vérifie si le producteur est en cours d'exécution de plus, si la file d'attente est vide en plus de prendre quelque chose à partir de la file d'attente dans un bloc (sur la file de objet)? Dois-je viens de synchroniser la mise à jour de l' isRunning
booléen sur la ConsumerWorker exemple? Toute autre suggestion?
MISE À JOUR, VOICI LE TRAVAIL DE MISE EN ŒUVRE QUE J'AI FINI À L'AIDE DE:
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Cela a été inspirée fortement par JohnVint la réponse ci-dessous, avec seulement quelques modifications mineures.
=== Mise à jour en raison de @vendhan commentaire.
Je vous remercie pour votre obeservation. Vous avez raison, le premier extrait de code dans cette question (entre autres) de l'endroit où l' while(isRunning || !inputQueue.isEmpty())
n'a pas vraiment de sens.
Dans ma réelle mise en œuvre définitive de cela, je fais quelque chose qui est plus proche de votre suggestion de remplacement de "||" (ou) avec "&&" (et), dans le sens que chaque travailleur (de consommation) maintenant, vérifie seulement si l'élément qu'il est obtenu à partir de la liste est un poison, et si oui, s'arrête (donc, en théorie, nous pouvons dire que le travailleur doit être en cours d'exécution ET la file d'attente ne doit pas être vide).