68 votes

Fils producteurs/consommateurs utilisant une file d'attente

J'aimerais créer une sorte de Producer/Consumer application d'enfilage. Mais je ne suis pas sûr de la meilleure façon d'implémenter une file d'attente entre les deux.

J'ai donc eu deux idées (qui peuvent toutes deux être totalement fausses). J'aimerais savoir laquelle serait la meilleure et si les deux sont nulles, quelle serait la meilleure façon d'implémenter la file d'attente. C'est principalement mon implémentation de la file d'attente dans ces exemples qui me préoccupe. J'étend une classe de file d'attente qui est une classe interne et qui est thread safe. Vous trouverez ci-dessous deux exemples comportant chacun 4 classes.

Classe principale-

public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

Classe de consommateurs-

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

Classe de producteur-

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

Classe de file d'attente-

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

OU

Classe principale-

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
} 

Classe de consommateurs-

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

Classe de producteur-

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

Classe de file d'attente-

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

Et allez-y !

1 votes

Producers enqueue et Consumer dequeue, en fait. Pas l'inverse

4 votes

Oh et ne démarrez pas de threads à partir d'un constructeur ! Ce thread pourrait observer l'objet dans un état incohérent. Reportez-vous à "Java Concurrency in Practice" pour plus de détails

0 votes

Merci Zwei, l'histoire de l'enqueue est due à mon manque de concentration. Pour ce qui est du démarrage du Thread à partir du constructeur, dois-je plutôt exécuter une méthode d'intiliation et le démarrer à cet endroit, ou dois-je le démarrer à partir de la classe de la méthode principale ?

82voto

cletus Points 276888

Java 5+ possède tous les outils nécessaires pour ce genre de choses. Vous le voudrez :

  1. Rassemblez tous vos producteurs en un seul ExecutorService ;
  2. Mettez tous vos consommateurs dans un autre ExecutorService ;
  3. Si nécessaire, communiquez entre les deux en utilisant un BlockingQueue .

Je dis "si nécessaire" pour (3) car d'après mon expérience, c'est une étape inutile. Tout ce que vous faites est de soumettre de nouvelles tâches au service d'exécution du consommateur. Donc :

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

Ainsi, le producers soumettre directement à consumers .

4 votes

Cletus a raison de demander plus d'informations pour aider à clarifier "par où commencer". java.sun.com/docs/books/tutorial/essential/concurrency

0 votes

"Ainsi, les producteurs se soumettent directement aux consommateurs" - Est-il sûr d'appeler consumers.submit(...) en parallèle ou dois-je synchroniser autour de cela ?

0 votes

Si vous partagez une BlockingQueue, pouvez-vous utiliser un seul exécuteur pour les producteurs et les consommateurs ?

20voto

Enno Shioji Points 12298

OK, comme d'autres l'ont fait remarquer, la meilleure chose à faire est d'utiliser java.util.concurrent paquet. Je recommande vivement "Java Concurrency in Practice". C'est un excellent livre qui couvre presque tout ce que vous devez savoir.

En ce qui concerne votre implémentation particulière, comme je l'ai noté dans les commentaires, ne démarrez pas les Threads à partir des Constructors -- cela peut être dangereux.

En laissant cela de côté, la deuxième mise en œuvre semble meilleure. Vous ne voulez pas mettre les files d'attente dans des champs statiques. Vous perdez probablement de la flexibilité pour rien.

Si vous souhaitez réaliser votre propre implémentation (à des fins d'apprentissage, je suppose ?), fournissez un fichier start() au moins. Vous devez construire l'objet (vous pouvez instancier l'objet Thread ), et ensuite appeler start() pour commencer le fil.

Editar: ExecutorService ont leur propre file d'attente, ce qui peut prêter à confusion Voici quelque chose pour vous aider à démarrer.

public class Main {
    public static void main(String[] args) {
        //The numbers are just silly tune parameters. Refer to the API.
        //The important thing is, we are passing a bounded queue.
        ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));

        //No need to bound the queue for this executor.
        //Use utility method instead of the complicated Constructor.
        ExecutorService producer = Executors.newSingleThreadExecutor();

        Runnable produce = new Produce(consumer);
        producer.submit(produce);   
    }
}

class Produce implements Runnable {
    private final ExecutorService consumer;

    public Produce(ExecutorService consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        Pancake cake = Pan.cook();
        Runnable consume = new Consume(cake);
        consumer.submit(consume);
    }
}

class Consume implements Runnable {
    private final Pancake cake;

    public Consume(Pancake cake){
        this.cake = cake;
    }

    @Override
    public void run() {
        cake.eat();
    }
}

Plus d'EDIT : Pour le producteur, au lieu de while(true) vous pouvez faire quelque chose comme :

@Override
public void run(){
    while(!Thread.currentThread().isInterrupted()){
        //do stuff
    }
}

De cette façon, vous pouvez arrêter l'exécuteur en appelant .shutdownNow() . Si vous utilisez while(true) il ne s'arrêtera pas.

Notez également que le Producer est toujours vulnérable à RuntimeExceptions (c'est-à-dire un RuntimeException arrêtera le traitement)

0 votes

Je dois donc ajouter une méthode start() au Consumer et au Producer ? Êtes-vous en train de dire que je devrais plutôt mettre quelque chose comme ceci dans ma méthode principale ? consumer = new Consumer() ; consumer.start(readQ) ; ou ceci ? consumer = new Comsumer(readQ) ; consumer.start() ;

1 votes

Vous devriez normalement faire new Comsumer(readQ) ; consumer.start() ;. Dans votre cas, il est conseillé de déclarer la queue private final, et si vous faites cela, vous devez définir la queue dans le constructeur. S'il s'agit d'un code de production, je vous conseille fortement de suivre la réponse de cletus. Si vous avez absolument besoin d'utiliser votre file d'attente interne, alors vous devriez utiliser ExecutorService executor = Executors.newSingleThreadExecutor() au lieu d'un thread brut. Cela vous permettra, entre autres, d'éviter que des RuntimeException n'arrêtent votre système.

0 votes

Merci, c'est très utile. J'ai opté pour la BlockingQueue comme l'a suggéré Cletus plutôt que pour la file d'attente interne. J'essaie encore de me faire une idée de la classe ExecutorService, mais quand j'y arriverai, je l'utiliserai certainement. Merci pour votre aide.

15voto

Ravindra babu Points 5571

J'ai étendu la réponse proposée par Cletus à un exemple de code fonctionnel.

  1. Un ExecutorService (pes) accepte Producer tâches.
  2. Un ExecutorService (ces) accepte Consumer tâches.
  3. Les deux sites Producer y Consumer actions BlockingQueue .
  4. Multiple Producer Les tâches génèrent différents chiffres.
  5. L'un des Consumer Les tâches peuvent consommer le numéro généré par Producer

Code :

import java.util.concurrent.*;

public class ProducerConsumerWithES {
    public static void main(String args[]){
         BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

         ExecutorService pes = Executors.newFixedThreadPool(2);
         ExecutorService ces = Executors.newFixedThreadPool(2);

         pes.submit(new Producer(sharedQueue,1));
         pes.submit(new Producer(sharedQueue,2));
         ces.submit(new Consumer(sharedQueue,1));
         ces.submit(new Consumer(sharedQueue,2));
         // shutdown should happen somewhere along with awaitTermination
         / * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
         pes.shutdown();
         ces.shutdown();
    }
}
class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

sortie :

Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Consumed: 11:by thread:1
Produced:12:by thread:1
Consumed: 22:by thread:1
Consumed: 21:by thread:2
Produced:23:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Consumed: 23:by thread:2
Produced:24:by thread:2
Consumed: 13:by thread:1
Produced:14:by thread:1
Consumed: 24:by thread:2
Produced:25:by thread:2
Consumed: 14:by thread:1
Produced:15:by thread:1
Consumed: 25:by thread:2
Consumed: 15:by thread:1

Remarque. Si vous n'avez pas besoin de plusieurs producteurs et consommateurs, gardez un seul producteur et consommateur. J'ai ajouté plusieurs Producers et Consumers pour montrer les capacités de BlockingQueue parmi plusieurs Producers et Consumers.

7voto

flybywire Points 36050

Vous réinventez la roue.

Si vous avez besoin de persistance et d'autres fonctionnalités d'entreprise, utilisez JMS (Je suggère ActiveMq ).

Si vous avez besoin de files d'attente rapides en mémoire, utilisez l'une des implémentations de l'algorithme java File d'attente .

Si vous devez prendre en charge java 1.4 ou une version antérieure, utilisez l'excellent outil de Doug Lea intitulé concurrentes paquet.

7 votes

On peut toujours vous demander d'implémenter Producer Consumer lors d'un entretien d'embauche :)

0 votes

Je trouve les utilitaires de java.util.concurrent utiles, mais j'ai du mal à les qualifier d'"excellents" alors qu'ils m'obligent encore à passer deux paramètres juste pour spécifier un délai. Cela aurait-il tué Doug de créer une classe appelée Duration ?

2voto

roottraveller Points 4532

Il s'agit d'un code très simple.

import java.util.*;

// @author : rootTraveller, June 2017

class ProducerConsumer {
    public static void main(String[] args) throws Exception {
        Queue<Integer> queue = new LinkedList<>();
        Integer buffer = new Integer(10);  //Important buffer or queue size, change as per need.

        Producer producerThread = new Producer(queue, buffer, "PRODUCER");
        Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER");

        producerThread.start();  
        consumerThread.start();
    }   
}

class Producer extends Thread {
    private Queue<Integer> queue;
    private int queueSize ;

    public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super(ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.size() == queueSize){
                    System.out.println(Thread.currentThread().getName() + " FULL         : waiting...\n");
                    try{
                        queue.wait();   //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue empty then produce one, add and notify  
                int randomInt = new Random().nextInt(); 
                System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); 
                queue.add(randomInt); 
                queue.notifyAll();  //Important
            } //synchronized ends here : NOTE
        }
    }
}

class Consumer extends Thread {
    private Queue<Integer> queue;
    private int queueSize;

    public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super (ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.isEmpty()){
                    System.out.println(Thread.currentThread().getName() + " Empty        : waiting...\n");
                    try {
                        queue.wait();  //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue not empty then consume one and notify
                System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove());
                queue.notifyAll();
            } //synchronized ends here : NOTE
        }
    }
}

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X