115 votes

Comment faire pour que le ThreadPoolExecutor augmente le nombre de threads au maximum avant la mise en file d'attente ?

Cela fait un certain temps que je suis frustré par le comportement par défaut de l'application ThreadPoolExecutor qui soutient le ExecutorService Les pools de threads que nous sommes nombreux à utiliser. Pour citer les Javadocs :

Si le nombre de threads en cours d'exécution est supérieur à corePoolSize mais inférieur à maximumPoolSize, un nouveau thread sera créé. seulement si la file d'attente est pleine .

Ce que cela signifie, c'est que si vous définissez un pool de threads avec le code suivant, il va jamais commencer le 2ème fil parce que le LinkedBlockingQueue est sans limite.

ExecutorService threadPool =
   new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

Seulement si vous avez un borné et la file d'attente est complet Est-ce qu'il y a des fils de discussion au-dessus du numéro de noyau qui sont commencés. Je soupçonne qu'un grand nombre de programmeurs Java multithreads débutants ne sont pas conscients de ce comportement de la fonction ThreadPoolExecutor .

J'ai maintenant un cas d'utilisation spécifique où ce n'est pas optimal. Je cherche des moyens, sans écrire ma propre classe TPE, de contourner ce problème.

Mes exigences concernent un service Web qui fait des rappels à une tierce partie qui n'est peut-être pas fiable.

  • Je ne veux pas faire le rappel de manière synchrone avec la requête web, donc je veux utiliser un pool de threads.
  • J'en reçois généralement deux par minute, donc je ne veux pas avoir un newFixedThreadPool(...) avec un grand nombre de fils qui sont pour la plupart dormants.
  • De temps en temps, j'ai une explosion de ce trafic et je veux augmenter le nombre de fils à une valeur maximale (disons 50).
  • Je dois faire un meilleur essaie de faire tous les callbacks, donc je veux mettre en file d'attente tous ceux qui sont au-dessus de 50. Je ne veux pas surcharger le reste de mon serveur web en utilisant un serveur newCachedThreadPool() .

Comment puis-je contourner cette limitation dans ThreadPoolExecutor où la file d'attente doit être limitée et complète antes de d'autres fils seront lancés ? Comment faire pour qu'il y ait plus de fils de discussion ? antes de des tâches en file d'attente ?

Editar:

@Flavio soulève un bon point concernant l'utilisation de l'option ThreadPoolExecutor.allowCoreThreadTimeOut(true) pour que les threads principaux se terminent et sortent. J'ai envisagé cette solution, mais je voulais toujours la fonction core-threads. Je ne voulais pas que le nombre de threads dans le pool tombe en dessous de la taille du noyau si possible.

1 votes

Étant donné que votre exemple crée un maximum de 10 threads, y a-t-il une réelle économie à utiliser quelque chose qui croît/rétrécit par rapport à un pool de threads de taille fixe ?

0 votes

Bon point @bstempi. Le nombre était quelque peu arbitraire. Je l'ai augmenté dans la question à 50. Je ne sais pas exactement combien de threads simultanés je veux réellement faire fonctionner maintenant que j'ai cette solution.

1 votes

Oh bon sang ! 10 upvotes si je pouvais ici, exactement la même position dans laquelle je suis.

60voto

Gray Points 58585

Comment puis-je contourner cette limitation dans ThreadPoolExecutor où la file d'attente doit être délimitée et pleine avant que d'autres threads ne soient lancés.

Je crois que j'ai finalement trouvé une solution assez élégante (peut-être un peu bricolée) à cette limitation avec ThreadPoolExecutor . Il s'agit d'étendre LinkedBlockingQueue pour qu'il revienne false pour queue.offer(...) alors qu'il y a déjà des tâches en attente. Si les threads actuels ne parviennent pas à suivre les tâches mises en file d'attente, la TPE ajoute des threads supplémentaires. Si le pool a déjà atteint le nombre maximum de threads, alors la TPE va ajouter des threads supplémentaires. RejectedExecutionHandler sera appelé. C'est le gestionnaire qui fait ensuite le put(...) dans la file d'attente.

Il est certainement étrange d'écrire une file d'attente où offer(...) peut retourner false y put() ne bloque jamais, donc c'est la partie pirate. Mais cela fonctionne bien avec l'utilisation de la file d'attente par TPE donc je ne vois pas de problème à le faire.

Voici le code :

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        // Offer it to the queue if there is 0 items already queued, else
        // return false so the TPE will add another thread. If we return false
        // and max threads have been reached then the RejectedExecutionHandler
        // will be called which will do the put into the queue.
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // This does the actual put into the queue. Once the max threads
            //  have been reached, the tasks will then queue up.
            executor.getQueue().put(r);
            // we do this after the put() to stop race conditions
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

Avec ce mécanisme, lorsque je soumets des tâches à la file d'attente, la ThreadPoolExecutor volonté :

  1. Mettez à l'échelle le nombre de threads jusqu'à la taille du noyau initialement (ici 1).
  2. Proposez-le à la file d'attente. Si la file d'attente est vide, il sera mis en attente pour être traité par les threads existants.
  3. Si la file d'attente a déjà 1 ou plusieurs éléments, la fonction offer(...) retournera faux.
  4. Si false est retourné, augmenter le nombre de threads dans le pool jusqu'à ce qu'ils atteignent le nombre maximum (ici 50).
  5. Si elle est au maximum, elle appelle la RejectedExecutionHandler
  6. El RejectedExecutionHandler puis place la tâche dans la file d'attente pour être traitée par le premier thread disponible dans l'ordre FIFO.

Bien que dans mon exemple de code ci-dessus, la file d'attente soit non bornée, vous pouvez également la définir comme une file d'attente bornée. Par exemple, si vous ajoutez une capacité de 1000 à la file d'attente LinkedBlockingQueue alors elle le fera :

  1. mettre à l'échelle les fils au maximum
  2. puis faire la queue jusqu'à ce qu'elle soit pleine avec 1000 tâches
  3. puis bloquer l'appelant jusqu'à ce qu'un espace se libère dans la file d'attente.

De plus, si vous aviez besoin d'utiliser offer(...) dans le RejectedExecutionHandler alors vous pourriez utiliser le offer(E, long, TimeUnit) à la place de la méthode Long.MAX_VALUE comme délai d'attente.

Attention :

Si vous vous attendez à ce que des tâches soient ajoutées à l'exécuteur après il a été arrêté, alors vous pouvez être plus intelligent en jetant RejectedExecutionException hors de notre coutume RejectedExecutionHandler lorsque le service d'exécution a été arrêté. Merci à @RaduToader de l'avoir signalé.

Editar:

Une autre modification de cette réponse pourrait être de demander à la TPE s'il y a des threads inactifs et de ne mettre l'élément en file d'attente que si c'est le cas. Il faudrait créer une vraie classe pour cela et ajouter ourQueue.setThreadPoolExecutor(tpe); sur celui-ci.

Alors votre offer(...) pourrait ressembler à quelque chose comme :

  1. Vérifiez si le tpe.getPoolSize() == tpe.getMaximumPoolSize() dans ce cas, il suffit d'appeler super.offer(...) .
  2. Sinon, si tpe.getPoolSize() > tpe.getActiveCount() puis appeler super.offer(...) puisqu'il semble y avoir des fils vides.
  3. Sinon, retournez false de bifurquer vers un autre fil.

Peut-être ça :

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

Notez que les méthodes get sur TPE sont coûteuses puisqu'elles accèdent à volatile ou (dans le cas de getActiveCount() ) verrouillent le TPE et parcourent la liste des fils. De plus, il existe ici des conditions de course qui peuvent entraîner la mise en file d'attente d'une tâche de manière inappropriée ou la bifurcation d'un autre thread alors qu'il y avait un thread inactif.

1 votes

J'ai également rencontré le même problème, et j'ai fini par remplacer la méthode execute. Mais c'est vraiment une bonne solution :)

0 votes

Même si je n'aime pas l'idée de rompre le contrat de Queue pour y parvenir, vous n'êtes certainement pas seul dans votre idée : groovy-programming.com/post/26923146865

0 votes

Intéressant. POUR INFO. En fait, je viens d'améliorer ma méthode @bstempi en fonction des commentaires de Ralf.

36voto

Flavio Points 5369

Définissez la taille du noyau et la taille maximale à la même valeur, et permettez aux threads du noyau d'être retirés du pool avec allowCoreThreadTimeOut(true) .

0 votes

+1 Oui, j'ai pensé à ça mais je voulais quand même avoir la fonctionnalité core-threads. Je ne voulais pas que le pool de threads passe à 0 thread pendant les périodes de dormance. Je vais modifier ma question pour le signaler. Mais excellent point.

0 votes

Merci ! C'est juste la façon la plus simple de le faire.

35voto

J'ai déjà reçu deux autres réponses à cette question, mais je pense que celle-ci est la meilleure.

C'est basé sur la technique de la réponse actuellement acceptée à savoir :

  1. Remplacez les paramètres de la file d'attente offer() qui renvoie (parfois) un faux,
  2. ce qui provoque le ThreadPoolExecutor pour lancer un nouveau fil ou rejeter la tâche, et
  3. fixer le RejectedExecutionHandler à en fait mettre la tâche en file d'attente en cas de rejet.

Le problème est que lorsque offer() devrait retourner false. La réponse actuellement acceptée renvoie false lorsque la file d'attente contient quelques tâches, mais comme je l'ai souligné dans mon commentaire, cela provoque des effets indésirables. Alternativement, si vous retournez toujours false, vous continuerez à créer de nouveaux threads même si vous avez des threads en attente dans la file d'attente.

La solution consiste à utiliser Java 7 LinkedTransferQueue et ont offer() appelez tryTransfer() . S'il y a un thread consommateur en attente, la tâche sera simplement transmise à ce thread. Dans le cas contraire, offer() retournera false et le ThreadPoolExecutor créera un nouveau fil de discussion.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });

0 votes

Je suis d'accord, cette solution me semble la plus propre. Le seul inconvénient de cette solution est que LinkedTransferQueueue n'est pas limité, ce qui ne permet pas d'obtenir une file d'attente de tâches à capacité limitée sans travail supplémentaire.

0 votes

Il y a un problème lorsque le pool atteint la taille maximale. Disons que le pool atteint sa taille maximale et que tous les threads sont en train d'exécuter une tâche. Lorsque runnable soumet cette offre, impl renvoie false et ThreadPoolExecutor tente d'ajouter un thread worker, mais le pool a déjà atteint sa taille maximale et runnable est simplement rejeté. Selon le rejectedExceHandler que vous avez écrit, il sera à nouveau proposé dans la file d'attente, ce qui aura pour conséquence que cette danse de singe se reproduira depuis le début.

1 votes

@Sudheera Je crois que vous vous trompez. queue.offer() car il appelle en fait LinkedTransferQueue.tryTransfer() retournera false et ne mettra pas la tâche en file d'attente. Cependant, la RejectedExecutionHandler appelle queue.put() qui n'échoue pas et met la tâche en file d'attente.

7voto

Note : Je préfère et recommande maintenant mon autre réponse .

Voici une version qui me semble beaucoup plus simple : Augmenter le corePoolSize (jusqu'à la limite du maximumPoolSize) chaque fois qu'une nouvelle tâche est exécutée, puis diminuer le corePoolSize (jusqu'à la limite de la "taille du core pool" spécifiée par l'utilisateur) chaque fois qu'une tâche est terminée.

En d'autres termes, gardez une trace du nombre de tâches en cours d'exécution ou en file d'attente, et assurez-vous que le corePoolSize est égal au nombre de tâches tant qu'il se situe entre la "taille du pool principal" spécifiée par l'utilisateur et le maximumPoolSize.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

Telle qu'elle est écrite, la classe ne permet pas de modifier la taille du pool de base ou la taille maximale du pool spécifiées par l'utilisateur après la construction, et ne permet pas de manipuler la file d'attente directement ou par l'intermédiaire de la fonction remove() o purge() .

0 votes

Je l'aime bien, sauf pour le synchronized blocs. Pouvez-vous appeler la file d'attente pour obtenir le nombre de tâches. Ou peut-être utiliser un AtomicInteger ?

0 votes

Je voulais les éviter, mais le problème est le suivant. S'il y a un certain nombre de execute() dans des threads séparés, chacun va (1) déterminer combien de threads sont nécessaires, (2) setCorePoolSize à ce numéro, et (3) appeler super.execute() . Si les étapes (1) et (2) ne sont pas synchronisées, je ne sais pas comment empêcher un ordre malheureux où vous définissez la taille du pool de base à un nombre inférieur après un nombre supérieur. Avec un accès direct au champ de la superclasse, cela pourrait être fait en utilisant la fonction compare-and-set, mais je ne vois pas de moyen propre de le faire dans une sous-classe sans synchronisation.

0 votes

Je pense que les pénalités pour cette condition de course sont relativement faibles tant que la taskCount est valide (c'est-à-dire qu'un AtomicInteger ). Si deux threads recalculent la taille du pool immédiatement l'un après l'autre, ils devraient obtenir des valeurs correctes. Si le second rétrécit les threads de base, il doit avoir vu une baisse dans la file d'attente ou quelque chose comme ça.

6voto

Ralf H Points 853

Nous avons une sous-classe de ThreadPoolExecutor qui prend un creationThreshold et remplace execute .

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

peut-être que ça aide aussi, mais le tien a l'air plus artistique bien sûr

0 votes

Intéressant. Merci pour cette information. En fait, je ne savais pas que la taille du noyau était mutable.

0 votes

Maintenant que j'y pense un peu plus, cette solution est meilleure que la mienne en ce qui concerne la vérification de la taille de la file d'attente. J'ai modifié ma réponse pour que l'élément offer(...) ne renvoie que des false sous condition. Nous vous remercions.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X