Comment puis-je contourner cette limitation dans ThreadPoolExecutor
où la file d'attente doit être délimitée et pleine avant que d'autres threads ne soient lancés.
Je crois que j'ai finalement trouvé une solution assez élégante (peut-être un peu bricolée) à cette limitation avec ThreadPoolExecutor
. Il s'agit d'étendre LinkedBlockingQueue
pour qu'il revienne false
pour queue.offer(...)
alors qu'il y a déjà des tâches en attente. Si les threads actuels ne parviennent pas à suivre les tâches mises en file d'attente, la TPE ajoute des threads supplémentaires. Si le pool a déjà atteint le nombre maximum de threads, alors la TPE va ajouter des threads supplémentaires. RejectedExecutionHandler
sera appelé. C'est le gestionnaire qui fait ensuite le put(...)
dans la file d'attente.
Il est certainement étrange d'écrire une file d'attente où offer(...)
peut retourner false
y put()
ne bloque jamais, donc c'est la partie pirate. Mais cela fonctionne bien avec l'utilisation de la file d'attente par TPE donc je ne vois pas de problème à le faire.
Voici le code :
// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
// Offer it to the queue if there is 0 items already queued, else
// return false so the TPE will add another thread. If we return false
// and max threads have been reached then the RejectedExecutionHandler
// will be called which will do the put into the queue.
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// This does the actual put into the queue. Once the max threads
// have been reached, the tasks will then queue up.
executor.getQueue().put(r);
// we do this after the put() to stop race conditions
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
Avec ce mécanisme, lorsque je soumets des tâches à la file d'attente, la ThreadPoolExecutor
volonté :
- Mettez à l'échelle le nombre de threads jusqu'à la taille du noyau initialement (ici 1).
- Proposez-le à la file d'attente. Si la file d'attente est vide, il sera mis en attente pour être traité par les threads existants.
- Si la file d'attente a déjà 1 ou plusieurs éléments, la fonction
offer(...)
retournera faux.
- Si false est retourné, augmenter le nombre de threads dans le pool jusqu'à ce qu'ils atteignent le nombre maximum (ici 50).
- Si elle est au maximum, elle appelle la
RejectedExecutionHandler
- El
RejectedExecutionHandler
puis place la tâche dans la file d'attente pour être traitée par le premier thread disponible dans l'ordre FIFO.
Bien que dans mon exemple de code ci-dessus, la file d'attente soit non bornée, vous pouvez également la définir comme une file d'attente bornée. Par exemple, si vous ajoutez une capacité de 1000 à la file d'attente LinkedBlockingQueue
alors elle le fera :
- mettre à l'échelle les fils au maximum
- puis faire la queue jusqu'à ce qu'elle soit pleine avec 1000 tâches
- puis bloquer l'appelant jusqu'à ce qu'un espace se libère dans la file d'attente.
De plus, si vous aviez besoin d'utiliser offer(...)
dans le RejectedExecutionHandler
alors vous pourriez utiliser le offer(E, long, TimeUnit)
à la place de la méthode Long.MAX_VALUE
comme délai d'attente.
Attention :
Si vous vous attendez à ce que des tâches soient ajoutées à l'exécuteur après il a été arrêté, alors vous pouvez être plus intelligent en jetant RejectedExecutionException
hors de notre coutume RejectedExecutionHandler
lorsque le service d'exécution a été arrêté. Merci à @RaduToader de l'avoir signalé.
Editar:
Une autre modification de cette réponse pourrait être de demander à la TPE s'il y a des threads inactifs et de ne mettre l'élément en file d'attente que si c'est le cas. Il faudrait créer une vraie classe pour cela et ajouter ourQueue.setThreadPoolExecutor(tpe);
sur celui-ci.
Alors votre offer(...)
pourrait ressembler à quelque chose comme :
- Vérifiez si le
tpe.getPoolSize() == tpe.getMaximumPoolSize()
dans ce cas, il suffit d'appeler super.offer(...)
.
- Sinon, si
tpe.getPoolSize() > tpe.getActiveCount()
puis appeler super.offer(...)
puisqu'il semble y avoir des fils vides.
- Sinon, retournez
false
de bifurquer vers un autre fil.
Peut-être ça :
int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}
Notez que les méthodes get sur TPE sont coûteuses puisqu'elles accèdent à volatile
ou (dans le cas de getActiveCount()
) verrouillent le TPE et parcourent la liste des fils. De plus, il existe ici des conditions de course qui peuvent entraîner la mise en file d'attente d'une tâche de manière inappropriée ou la bifurcation d'un autre thread alors qu'il y avait un thread inactif.
1 votes
Étant donné que votre exemple crée un maximum de 10 threads, y a-t-il une réelle économie à utiliser quelque chose qui croît/rétrécit par rapport à un pool de threads de taille fixe ?
0 votes
Bon point @bstempi. Le nombre était quelque peu arbitraire. Je l'ai augmenté dans la question à 50. Je ne sais pas exactement combien de threads simultanés je veux réellement faire fonctionner maintenant que j'ai cette solution.
1 votes
Oh bon sang ! 10 upvotes si je pouvais ici, exactement la même position dans laquelle je suis.