Comme d'autres l'ont souligné: Il a quelques mises en garde. Tout d'abord, les flux ne sont pas censés être utilisés pour quelque chose comme ça.
Sur un plan plus technique, on pourrait argumenter:
- Un flux peut être infinie
- Même si vous connaissez le nombre d'éléments: Ce nombre peut être faussée par des opérations comme l'
filter
ou flatMap
- Pour un courant parallèle, le suivi de la progression va imposer un point de synchronisation
- Si il y a un terminal de fonctionnement qui est cher (comme l'agrégation dans votre cas), puis les progrès signalés pourrait même pas sensée refléter le temps de calcul
Toutefois, en gardant cela à l'esprit, une approche qui pourrait être raisonnable pour votre cas d'application est ceci:
Vous pouvez créer un Function<T,T>
qui est transmis à un map
du flux. (Au moins, je préférerais que sur l'aide d' peek
sur le flux, comme l'a suggéré dans une autre réponse). Cette fonction peut suivre l'état d'avancement, à l'aide d'un AtomicLong
pour le comptage des éléments. Afin de maintenir séparés les choses, cette progression pourrait alors être simplement transféré à un Consumer<Long>
,, qui prendra soin de la présentation
La "présentation" se réfère ici à l'impression de ce progrès de la console, normalisé ou en pourcentage, en référence à une taille qui pourrait être connu partout où le consommateur est créé. Mais le consommateur peut alors aussi prendre soin de l'impression que, par exemple, tous les 10 élément, ou seulement de les imprimer un message si au moins 5 secondes se sont écoulées depuis la précédente.
import java.util.Iterator;
import java.util.Locale;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class StreamProgress
{
public static void main(String[] args)
{
int size = 250;
Stream<Integer> stream = readData(size);
LongConsumer progressConsumer = progress ->
{
// "Filter" the output here: Report only every 10th element
if (progress % 10 == 0)
{
double relative = (double) progress / (size - 1);
double percent = relative * 100;
System.out.printf(Locale.ENGLISH,
"Progress %8d, relative %2.5f, percent %3.2f\n",
progress, relative, percent);
}
};
Integer result = stream
.map(element -> process(element))
.map(progressMapper(progressConsumer))
.reduce(0, (a, b) -> a + b);
System.out.println("result " + result);
}
private static <T> Function<T, T> progressMapper(
LongConsumer progressConsumer)
{
AtomicLong counter = new AtomicLong(0);
return t ->
{
long n = counter.getAndIncrement();
progressConsumer.accept(n);
return t;
};
}
private static Integer process(Integer element)
{
return element * 2;
}
private static Stream<Integer> readData(int size)
{
Iterator<Integer> iterator = new Iterator<Integer>()
{
int n = 0;
@Override
public Integer next()
{
try
{
Thread.sleep(10);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return n++;
}
@Override
public boolean hasNext()
{
return n < size;
}
};
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
iterator, Spliterator.ORDERED), false);
}
}