32 votes

Afficher la progression du traitement des flux Java 8

J'ai un Stream le traitement des quelques des millions d'éléments. La Carte-Réduire l'algorithme derrière elle prend quelques millisecondes, donc l'achèvement de la tâche dure environ vingt minutes.

Stream<MyData> myStream = readData();
MyResult result = myStream
    .map(row -> process(row))
    .peek(stat -> System.out.println("Hi, I processed another item"))
    .reduce(MyStat::aggregate);

Je voudrais un moyen d'afficher des progrès d'ensemble, au lieu d'imprimer une ligne par élément (qui se traduit par des milliers de lignes par seconde, prend du temps et ne pas fournir toutes les informations utiles concernant les progrès d'ensemble). Je voudrais afficher quelque chose de similaire à:

 5% (08s)
10% (14s)
15% (20s)
...

Quelle serait la meilleure (et/ou plus simple) pour faire cela?

19voto

Yassin Hajaj Points 4659

Tout d'abord, les flux ne sont pas destinés à réaliser ce type de tâches (contrairement à une structure de données classique). Si vous savez déjà combien d'éléments votre flux traitera, vous pouvez opter pour l'option suivante, qui, je le répète, n'est pas l'objectif des flux.

 Stream<MyData> myStream = readData();
final AtomicInteger loader = new AtomicInteger();
int fivePercent = elementsCount / 20;
MyResult result = myStream
    .map(row -> process(row))
    .peek(stat -> {
        if (loader.incrementAndGet() % fivePercent == 0) {
            System.out.println(loader.get() + " elements on " + elementsCount + " treated");
            System.out.println((5*(loader.get() / fivePercent)) + "%");
        }
    })
    .reduce(MyStat::aggregate);
 

9voto

Marco13 Points 14743

Comme d'autres l'ont souligné: Il a quelques mises en garde. Tout d'abord, les flux ne sont pas censés être utilisés pour quelque chose comme ça.

Sur un plan plus technique, on pourrait argumenter:

  • Un flux peut être infinie
  • Même si vous connaissez le nombre d'éléments: Ce nombre peut être faussée par des opérations comme l' filter ou flatMap
  • Pour un courant parallèle, le suivi de la progression va imposer un point de synchronisation
  • Si il y a un terminal de fonctionnement qui est cher (comme l'agrégation dans votre cas), puis les progrès signalés pourrait même pas sensée refléter le temps de calcul

Toutefois, en gardant cela à l'esprit, une approche qui pourrait être raisonnable pour votre cas d'application est ceci:

Vous pouvez créer un Function<T,T> qui est transmis à un map du flux. (Au moins, je préférerais que sur l'aide d' peek sur le flux, comme l'a suggéré dans une autre réponse). Cette fonction peut suivre l'état d'avancement, à l'aide d'un AtomicLong pour le comptage des éléments. Afin de maintenir séparés les choses, cette progression pourrait alors être simplement transféré à un Consumer<Long>,, qui prendra soin de la présentation

La "présentation" se réfère ici à l'impression de ce progrès de la console, normalisé ou en pourcentage, en référence à une taille qui pourrait être connu partout où le consommateur est créé. Mais le consommateur peut alors aussi prendre soin de l'impression que, par exemple, tous les 10 élément, ou seulement de les imprimer un message si au moins 5 secondes se sont écoulées depuis la précédente.

import java.util.Iterator;
import java.util.Locale;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StreamProgress
{
    public static void main(String[] args)
    {
        int size = 250;
        Stream<Integer> stream = readData(size);

        LongConsumer progressConsumer = progress -> 
        {
            // "Filter" the output here: Report only every 10th element
            if (progress % 10 == 0)
            {
                double relative = (double) progress / (size - 1);
                double percent = relative * 100;
                System.out.printf(Locale.ENGLISH,
                    "Progress %8d, relative %2.5f, percent %3.2f\n",
                    progress, relative, percent);
            }
        };

        Integer result = stream
            .map(element -> process(element))
            .map(progressMapper(progressConsumer))
            .reduce(0, (a, b) -> a + b);

        System.out.println("result " + result);
    }

    private static <T> Function<T, T> progressMapper(
        LongConsumer progressConsumer)
    {
        AtomicLong counter = new AtomicLong(0);
        return t -> 
        {
            long n = counter.getAndIncrement();
            progressConsumer.accept(n);
            return t;
        };

    }

    private static Integer process(Integer element)
    {
        return element * 2;
    }

    private static Stream<Integer> readData(int size)
    {
        Iterator<Integer> iterator = new Iterator<Integer>()
        {
            int n = 0;
            @Override
            public Integer next()
            {
                try
                {
                    Thread.sleep(10);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                return n++;
            }

            @Override
            public boolean hasNext()
            {
                return n < size;
            }
        };
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(
                iterator, Spliterator.ORDERED), false);
    }
}

1voto

NiVeR Points 6876

La possibilité de faire cela dépend fortement du type d' source vous avez dans l' stream. Si vous avez une collection et vous voulez appliquer des opérations sur elle, vous pouvez le faire parce que vous savez quelle est la taille de la collection et vous pouvez tenir un compte de la transformation des éléments. Mais il y a une mise en garde aussi dans ce cas. Si vous allez faire des calculs parallèles dans le cours d'eau, cela devient plus difficile.

Dans le cas où vous sont données en continu à partir de l'extérieur de l'application, il est très difficile que vous pouvez modéliser les progrès que vous ne savez pas quand le courant sera fin.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X