468 votes

Personnalisée pool de threads en Java 8 courant parallèle

Est-il possible de préciser la personnalisation d'un pool de threads de Java 8 courant parallèle? Je ne peux pas le trouver n'importe où.

Imaginez que j'ai un serveur d'application, et je voudrais utiliser les flux parallèles. Mais la demande est grande et multi-thread, donc, je tiens à compartimenter. Je ne veux pas d'un lent en cours d'exécution de la tâche dans un module de la applicationblock des tâches à partir d'un autre module.

Si je ne peux pas utiliser les différents pools de threads pour les différents modules, cela signifie que je ne peux pas l'utiliser en toute sécurité des flux parallèles dans la plupart des situations du monde réel.

Essayez l'exemple suivant. Il y a quelques temps CPU des tâches exécutées dans des threads séparés. Les tâches de l'effet de levier flux parallèles. La première tâche est rompu, de sorte que chaque étape prend 1 seconde (simulé par un fil de sommeil). Le problème est que les autres threads coincé et d'attendre le cassé de la tâche à terminer. C'est l'exemple artificiel, mais imaginez un servlet application et quelqu'un de la soumission d'une longue tâche en cours d'exécution à la fourche de rejoindre la piscine.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

493voto

Lukas Points 510

En fait il y a un truc comment exécuter une opération parallèle dans un fork-join de la piscine. Si vous lancez une tâche dans un fork-join de la piscine, il y reste et ne pas utiliser de la commune.

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();

L'astuce est basée sur ForkJoinTask.fourche qui précise: "Organise de manière asynchrone exécuter cette tâche dans la piscine de la tâche en cours est en cours d'exécution, le cas échéant, ou à l'aide de la ForkJoinPool.commonPool() si pas inForkJoinPool()"

241voto

assylias Points 102015

Le flux parallèles utiliser la valeur par défaut ForkJoinPool qui par défaut a autant de threads que vous avez processeurs, tel que renvoyé par Runtime.getRuntime().availableProcessors():

Pour les applications qui nécessitent de se séparer ou personnalisé piscines, un ForkJoinPool peut être construit avec une cible donnée parallélisme niveau; par défaut, égal au nombre de processeurs disponibles.

Cependant, vous pouvez modifier la taille de la piscine en utilisant les propriétés du système.

Exemple: mon ordinateur a 8 processeurs. Si j'exécute le programme suivant:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

La sortie est:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

Donc vous pouvez voir que le courant parallèle des processus 8 éléments à la fois, c'est à dire qu'il utilise 8 threads. Cependant, si je décommentez la ligne de commentaire, la sortie est:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

Cette fois, le courant parallèle a utilisé 20 threads et tous les 20 éléments dans le flux de données ont été traitées comcurrently.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X