Il s'agit d'un bogue dans le JDK 9 - de numéro 8193856 :
takeWhile
suppose à tort qu'une opération en amont supporte et honore l'annulation, ce qui n'est malheureusement pas le cas de flatMap
.
Explication
Si le flux est ordonné, takeWhile
devrait montrer le comportement attendu. Ce n'est pas tout à fait le cas dans votre code car vous utilisez forEach
qui renonce à l'ordre. Si vous y tenez, ce qui est le cas dans cet exemple, vous devriez utiliser forEachOrdered
à la place. C'est drôle : ça ne change rien.
Alors peut-être que le flux n'est pas ordonné en premier lieu ? (Dans ce cas le comportement est correct .) Si vous créez une variable temporaire pour le flux créé à partir de strArray
et vérifier s'il est ordonné en exécutant l'expression ((StatefulOp) stream).isOrdered();
au point d'arrêt, vous constaterez qu'il est effectivement ordonné :
String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};
Stream<String> stream = Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream))
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
// breakpoint here
System.out.println(stream);
Cela signifie qu'il s'agit très probablement d'une erreur de mise en œuvre.
Into The Code
Comme d'autres l'ont soupçonné, je pense maintenant aussi que cette pourrait être connecté à flatMap
être impatient. Plus précisément, les deux problèmes pourraient avoir la même cause racine.
En cherchant la source de WhileOps
nous pouvons voir ces méthodes :
@Override
public void accept(T t) {
if (take = predicate.test(t)) {
downstream.accept(t);
}
}
@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}
Ce code est utilisé par takeWhile
pour vérifier un élément de flux donné t
si le predicate
est remplie :
- Si c'est le cas, il transmet l'élément à la fonction
downstream
l'opération, dans ce cas System.out::println
.
- Si ce n'est pas le cas, il fixe
take
à false, de sorte que lorsqu'on lui demande la prochaine fois si le pipeline doit être annulé (c'est-à-dire s'il est terminé), il renvoie true
.
Cela couvre les takeWhile
opération. L'autre chose que vous devez savoir est que forEachOrdered
conduit à l'opération terminale exécutant la méthode ReferencePipeline::forEachWithCancel
:
@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
boolean cancelled;
do { } while (
!(cancelled = sink.cancellationRequested())
&& spliterator.tryAdvance(sink));
return cancelled;
}
Tout ce que ça fait, c'est :
- vérifier si le pipeline a été annulé
- si non, avancer l'évier d'un élément
- arrêter si c'était le dernier élément
Ça a l'air prometteur, non ?
Sans flatMap
Dans le "bon cas" (sans flatMap
(votre deuxième exemple) forEachWithCancel
opère directement sur le WhileOp
comme sink
et vous pouvez voir comment ça se passe :
-
ReferencePipeline::forEachWithCancel
fait sa boucle :
-
WhileOps::accept
est donné à chaque élément du flux
-
WhileOps::cancellationRequested
est interrogé après chaque élément
- à un moment donné
"Sample4"
échoue le prédicat et le flux est annulé.
Yay !
Avec flatMap
Dans le "mauvais cas" (avec flatMap
(votre premier exemple), forEachWithCancel
fonctionne sur le flatMap
qui appelle simplement forEachRemaining
sur le ArraySpliterator
para {"Sample3", "Sample4", "Sample5"}
qui fait cela :
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
do { action.accept((T)a[i]); } while (++i < hi);
}
Ignorer tout cela hi
y fence
qui n'est utilisé que si le traitement du tableau est divisé pour un flux parallèle, il s'agit d'une simple for
qui transmet chaque élément à la boucle takeWhile
fonctionnement, mais ne vérifie jamais si elle est annulée . Il va donc parcourir avec empressement tous les éléments de ce "sous-flux" avant de s'arrêter, et probablement même à travers le reste de la rivière .