99 votes

Comment interrompre une BlockingQueue qui bloque sur take ()?

J'ai une classe qui prend des objets d'un BlockingQueue et les traite en appelant take() dans une boucle continue. À un moment donné, je sais qu'aucun autre objet ne sera ajouté à la file d'attente. Comment puis-je interrompre la méthode take() pour qu'elle cesse de bloquer?

Voici la classe qui traite les objets:

 public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}
 

Et voici la méthode qui utilise cette classe pour traiter des objets:

 public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}
 

86voto

Chris Thornhill Points 2538

Si l'interruption du fil n'est pas une option, une autre consiste à placer un objet "marqueur" ou "commande" sur la file d'attente qui serait reconnu en tant que tel par MyObjHandler et sortirait de la boucle.

16voto

erickson Points 127945
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

Cependant, si vous faites cela, le thread peut être interrompu s'il reste des éléments dans la file d'attente, en attente de traitement. Vous pouvez envisager d'utiliser l' poll au lieu de take, ce qui permettra le thread de traitement de délai d'attente et de s'arrêter lorsqu'il a attendu pendant un certain temps sans nouvel apport.

15voto

dbw Points 3808

Très en retard, mais Espérons que cela aide les autres aussi, comme j'ai affronté le problème similaire et utilisé l' poll approche proposée par erickson ci-dessus avec quelques modifications mineures,

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(Finished && queue.isEmpty())
                    return;
                if(obj!= null)
                {
                    // process obj here
                    // ...
                }
            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

Ceci a résolu les problèmes, les

  1. Signalé BlockingQueue alors qu'il sait qu'il n'a pas à attendre plus pour les éléments
  2. N'a pas interrompu entre les deux, de sorte que les blocs de traitement ne prend fin que lorsque tous les éléments dans la file d'attente sont traitées et il n'y a pas d'éléments restant à ajouter

4voto

stepancheg Points 2229

Interrompre le fil:

 thread.interrupt()
 

1voto

tomasb Points 508

Ou n'interrompez pas, c'est méchant.

     public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
 
  1. lorsque le producteur place un objet dans la file d'attente, appelle queue.notify() , s'il se termine, appelle queue.done()
  2. boucle while (! queue.isDone () ||! queue.isEmpty ())
  3. test take () valeur de retour pour null

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X