Je veux créer un ThreadPoolExecutor
tel que quand il a atteint sa taille maximale et la file d'attente est pleine, l' sumbit()
méthode bloque lorsque vous essayez d'ajouter de nouvelles tâches. Ai-je besoin pour mettre en œuvre une coutume RejectedExecutionHandler
pour qui ou est-il un moyen de faire cela à l'aide de la bibliothèque standard de java?
Réponses
Trop de publicités?L'une des solutions possibles, je viens de trouver:
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
}
Existe-il d'autres solutions? Je préfère quelque chose de basé sur RejectedExecutionHandler
, car il semble être un moyen standard pour gérer de telles situations.
Vous pouvez utiliser ThreadPoolExecutor et un blockingQueue:
public class ImageManager {
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
private ExecutorService executorService = new ThreadPoolExecutor(numOfThread, numOfThread, 0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);
private int downloadThumbnail(String fileListPath){
executorService.submit(new yourRunnable());
}
}
Vous devez utiliser l' CallerRunsPolicy
, qui exécute les rejetés de la tâche dans le thread appelant. De cette façon, il ne peut pas présenter toutes les nouvelles missions de l'exécuteur jusqu'à ce que la tâche est accomplie, à quel point il sera libre de pool de threads ou le processus est répété.
À partir de la documentation:
Rejeté tâches
De nouvelles tâches présentée dans la méthode execute(java.lang.Runnable) sera rejeté lorsque l'Exécuteur testamentaire a été arrêtez, et aussi lorsque l'Exécuteur testamentaire utilise finis limites pour deux maximum les threads et la capacité de file d'attente de travail, et est saturé. Dans les deux cas, l' appelle la méthode execute RejectedExecutionHandler.rejectedExecution(java.lang.Praticable, java.util.de façon concomitante.ThreadPoolExecutor) la méthode de son RejectedExecutionHandler. Quatre prédéfinis gestionnaire politiques sont condition:
- Dans le défaut ThreadPoolExecutor.AbortPolicy, l' gestionnaire jette un runtime RejectedExecutionException sur le rejet.
- Dans ThreadPoolExecutor.CallerRunsPolicy, le thread qui appelle exécuter lui-même exécute la tâche. Cela fournit un moyen simple mécanisme de contrôle de rétroaction qui ralentir le rythme que les nouvelles tâches sont soumis.
- Dans ThreadPoolExecutor.DiscardPolicy, un tâche qui ne peut être exécutée est tout simplement de chute.
- Dans ThreadPoolExecutor.DiscardOldestPolicy, si l'exécuteur testamentaire n'est pas arrêté, le tâche à la tête de la file d'attente de travail est a chuté, et l'exécution est retentée (qui peut échouer encore une fois, la cause de cette être répété.)
Aussi, assurez-vous d'utiliser un délimitée de la file d'attente, comme ArrayBlockingQueue, lors de l'appel de l' ThreadPoolExecutor
constructeur. Sinon, rien ne sera rejetée.
Edit: en réponse à votre commentaire, définissez la taille de la ArrayBlockingQueue être égale à la taille maximale du pool de threads et de l'utilisation de la AbortPolicy.
Edit 2: Ok, je vois ce que vous allez obtenir. Que dire de ce: remplacer l' beforeExecute()
méthode pour vérifier qu' getActiveCount()
ne pas dépasser getMaximumPoolSize()
, et si elle le fait, le sommeil et essayer de nouveau?
Découvrez , quatre solutions pour le faire: la Création d'un NotifyingBlockingThreadPoolExecutor
Hibernate a un BlockPolicy
qui est simple et peut faire ce que vous voulez:
Voir: Executors.java
/**
* A handler for rejected tasks that will have the caller block until
* space is available.
*/
public static class BlockPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>BlockPolicy</tt>.
*/
public BlockPolicy() { }
/**
* Puts the Runnable to the blocking queue, effectively blocking
* the delegating thread until space is available.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
e.getQueue().put( r );
}
catch (InterruptedException e1) {
log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
}
}
}