94 votes

Java: ExecutorService qui bloque la soumission après une certaine taille de file d'attente

Je suis en train de coder une solution dans laquelle un seul thread produit I/O-intensif tâches qui peuvent être exécutées en parallèle. Chaque tâche considérable de données en mémoire. Donc, je veux être en mesure de limiter le nombre de tâches en attente à un moment.

Si je créer ThreadPoolExecutor comme ceci:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

Puis l' executor.submit(callable) jette RejectedExecutionException lorsque la file d'attente se remplit et tous les fils sont déjà occupés.

Que puis-je faire pour l' executor.submit(callable) bloc lorsque la file d'attente est pleine et tous les threads sont occupés?

EDIT: J'ai essayé ceci:

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Et il réalise un peu l'effet que je veux atteindre, mais dans une façon inélégante.

67voto

jtahlborn Points 32515

J'ai fait la même chose. L'astuce consiste à créer une BlockingQueue où la méthode offer () est vraiment un put (). (vous pouvez utiliser ce que vous voulez de base dans BlockingQueue).

 public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}
 

15voto

cvacca Points 151

Voici comment j'ai résolu ceci de mon côté:

(Remarque: cette solution bloque le thread qui soumet le Callable, empêchant ainsi la levée de RejectedExecutionException.)

 public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}
 

5voto

beginner_ Points 1241

Je sais que c'est une vieille question, mais a eu un problème similaire, avec la création de nouvelles tâches a été très rapide et si il y avait trop d'un OutOfMemoryError se produire parce que les tâches n'ont pas été complété assez rapidement.

Dans mon cas Callables sont soumis et j'ai besoin de la suite donc j'ai besoin de stocker toutes les Futures retournée par executor.submit(). Ma solution a été de mettre l' Futures en BlockingQueue avec une taille maximale. Une fois que la file d'attente est pleine, plus les tâches sont générées jusqu'à ce que certains sont terminés (éléments supprimés de la file d'attente). En pseudo-code:

final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try {   
    Thread taskGenerator = new Thread() {
        @Override
        public void run() {
            while (reader.hasNext) {
                Callable task = generateTask(reader.next());
                Future future = executor.submit(task);
                try {
                    // if queue is full blocks until a task
                    // is completed and hence no future tasks are submitted.
                    futures.put(compoundFuture);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();         
                }
            }
        executor.shutdown();
        }
    }
    taskGenerator.start();

    // read from queue as long as task are being generated
    // or while Queue has elements in it
    while (taskGenerator.isAlive()
                    || !futures.isEmpty()) {
        Future compoundFuture = futures.take();
        // do something
    }
} catch (InterruptedException ex) {
    Thread.currentThread().interrupt();     
} catch (ExecutionException ex) {
    throw new MyException(ex);
} finally {
    executor.shutdownNow();
}

3voto

Petro Semeniuk Points 2674

J'ai eu le même problème et j'ai mis en œuvre que par l'utilisation d' beforeExecute/afterExecute crochets ThreadPoolExecutor:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Blocks current task execution if there is not enough resources for it.
 * Maximum task count usage controlled by maxTaskCount property.
 */
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {

    private final ReentrantLock taskLock = new ReentrantLock();
    private final Condition unpaused = taskLock.newCondition();
    private final int maxTaskCount;

    private volatile int currentTaskCount;

    public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, int maxTaskCount) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.maxTaskCount = maxTaskCount;
    }

    /**
     * Executes task if there is enough system resources for it. Otherwise
     * waits.
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        taskLock.lock();
        try {
            // Spin while we will not have enough capacity for this job
            while (maxTaskCount < currentTaskCount) {
                try {
                    unpaused.await();
                } catch (InterruptedException e) {
                    t.interrupt();
                }
            }
            currentTaskCount++;
        } finally {
            taskLock.unlock();
        }
    }

    /**
     * Signalling that one more task is welcome
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        taskLock.lock();
        try {
            currentTaskCount--;
            unpaused.signalAll();
        } finally {
            taskLock.unlock();
        }
    }
}

Ce devrait être assez bon pour vous. Btw, l'origine de la mise en œuvre était la tâche de la taille car une tâche pourrait être plus de 100 fois supérieur à un autre et la soumission de deux immenses tâches était en train de tuer la boîte, mais l'exécution d'un grand et beaucoup de petites allait bien. Si votre I/O des tâches à forte intensité sont à peu près de la même taille, vous pouvez utiliser cette classe, sinon faites le moi savoir et je vais poster la taille en fonction de la mise en œuvre.

P. S. Vous pouvez vérifier les ThreadPoolExecutor javadoc. C'est vraiment très gentil guide de l'utilisateur de Doug Lea sur la façon dont il pourrait être facilement personnalisé.

1voto

Gareth Davis Points 16190

Je pense que c'est aussi simple que d'utiliser ArrayBlockingQueue au lieu de aa LinkedBlockingQueue .

Ignore moi ... c'est totalement faux. ThreadPoolExecutor appels Queue#offer non put qui auraient l'effet souhaité.

Vous pouvez étendre ThreadPoolExecutor et fournir une implémentation de execute(Runnable) qui appelle put à la place de offer .

Cela ne semble pas être une réponse tout à fait satisfaisante, j'en ai bien peur.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X