75 votes

takeWhile() fonctionne différemment avec flatmap

Je crée des snippets avec takeWhile pour explorer ses possibilités. Lorsqu'il est utilisé en conjonction avec flatMap, le comportement n'est pas conforme à l'attente. Veuillez trouver l'extrait de code ci-dessous.

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

Sortie réelle :

Sample1
Sample2
Sample3
Sample5

Sortie attendue :

Sample1
Sample2
Sample3

La raison de cette attente est que takeWhile doit être exécuté jusqu'à ce que la condition interne devienne vraie. J'ai également ajouté des instructions d'impression dans Flatmap pour le débogage. Les flux sont renvoyés deux fois seulement, ce qui est conforme à l'attente.

Cependant, cela fonctionne très bien sans flatmap dans la chaîne.

String[] strArraySingle = {"Sample3", "Sample4", "Sample5"};
Arrays.stream(strArraySingle)
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

Sortie réelle :

Sample3

Ici, la sortie réelle correspond à la sortie attendue.

Clause de non-responsabilité : Ces extraits ne sont que des exemples de code et ne servent pas à des cas d'utilisation valables.

Mise à jour : Bug JDK-8193856 : la correction sera disponible dans le cadre du JDK 10. La modification consistera à corriger whileOps Sink::accept

@Override 
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

Mise en œuvre modifiée :

@Override
public void accept(T t) {
    if (take && (take = predicate.test(t))) {
        downstream.accept(t);
    }
}

54voto

Nicolai Points 17516

Il s'agit d'un bogue dans le JDK 9 - de numéro 8193856 :

takeWhile suppose à tort qu'une opération en amont supporte et honore l'annulation, ce qui n'est malheureusement pas le cas de flatMap .

Explication

Si le flux est ordonné, takeWhile devrait montrer le comportement attendu. Ce n'est pas tout à fait le cas dans votre code car vous utilisez forEach qui renonce à l'ordre. Si vous y tenez, ce qui est le cas dans cet exemple, vous devriez utiliser forEachOrdered à la place. C'est drôle : ça ne change rien.

Alors peut-être que le flux n'est pas ordonné en premier lieu ? (Dans ce cas le comportement est correct .) Si vous créez une variable temporaire pour le flux créé à partir de strArray et vérifier s'il est ordonné en exécutant l'expression ((StatefulOp) stream).isOrdered(); au point d'arrêt, vous constaterez qu'il est effectivement ordonné :

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Stream<String> stream = Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));

// breakpoint here
System.out.println(stream);

Cela signifie qu'il s'agit très probablement d'une erreur de mise en œuvre.

Into The Code

Comme d'autres l'ont soupçonné, je pense maintenant aussi que cette pourrait être connecté à flatMap être impatient. Plus précisément, les deux problèmes pourraient avoir la même cause racine.

En cherchant la source de WhileOps nous pouvons voir ces méthodes :

@Override
public void accept(T t) {
    if (take = predicate.test(t)) {
        downstream.accept(t);
    }
}

@Override
public boolean cancellationRequested() {
    return !take || downstream.cancellationRequested();
}

Ce code est utilisé par takeWhile pour vérifier un élément de flux donné t si le predicate est remplie :

  • Si c'est le cas, il transmet l'élément à la fonction downstream l'opération, dans ce cas System.out::println .
  • Si ce n'est pas le cas, il fixe take à false, de sorte que lorsqu'on lui demande la prochaine fois si le pipeline doit être annulé (c'est-à-dire s'il est terminé), il renvoie true .

Cela couvre les takeWhile opération. L'autre chose que vous devez savoir est que forEachOrdered conduit à l'opération terminale exécutant la méthode ReferencePipeline::forEachWithCancel :

@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    boolean cancelled;
    do { } while (
            !(cancelled = sink.cancellationRequested())
            && spliterator.tryAdvance(sink));
    return cancelled;
}

Tout ce que ça fait, c'est :

  1. vérifier si le pipeline a été annulé
  2. si non, avancer l'évier d'un élément
  3. arrêter si c'était le dernier élément

Ça a l'air prometteur, non ?

Sans flatMap

Dans le "bon cas" (sans flatMap (votre deuxième exemple) forEachWithCancel opère directement sur le WhileOp comme sink et vous pouvez voir comment ça se passe :

  • ReferencePipeline::forEachWithCancel fait sa boucle :
    • WhileOps::accept est donné à chaque élément du flux
    • WhileOps::cancellationRequested est interrogé après chaque élément
  • à un moment donné "Sample4" échoue le prédicat et le flux est annulé.

Yay !

Avec flatMap

Dans le "mauvais cas" (avec flatMap (votre premier exemple), forEachWithCancel fonctionne sur le flatMap qui appelle simplement forEachRemaining sur le ArraySpliterator para {"Sample3", "Sample4", "Sample5"} qui fait cela :

if ((a = array).length >= (hi = fence) &&
    (i = index) >= 0 && i < (index = hi)) {
    do { action.accept((T)a[i]); } while (++i < hi);
}

Ignorer tout cela hi y fence qui n'est utilisé que si le traitement du tableau est divisé pour un flux parallèle, il s'agit d'une simple for qui transmet chaque élément à la boucle takeWhile fonctionnement, mais ne vérifie jamais si elle est annulée . Il va donc parcourir avec empressement tous les éléments de ce "sous-flux" avant de s'arrêter, et probablement même à travers le reste de la rivière .

17 votes

@Eugene : bien, je parie que c'est connecté à celui-ci . Il se trouve que cela fonctionnait pour les opérations de court-circuitage terminales, parce qu'elles ignorent les éléments en excès, mais maintenant nous avons des opérations de court-circuitage intermédiaires Donc c'est en fait une bonne nouvelle, car cela implique qu'il y a maintenant un peu plus de pression pour corriger ce bug (les performances médiocres ou la rupture lorsque les sous-flux sont infinis ne suffisaient apparemment pas)

10 votes

Il n'itère pas sur l'ensemble du flux. Si le dernier élément d'un sous-flux correspond au prédicat, le support d'annulation du flux externe fonctionnera, par exemple en utilisant String[][] strArray = { {"Sample1", "Sample2"}, {"Sample3", "Sample4"}, {"Sample5", "Sample6"}, }; comme entrée et cela semble fonctionner. Si seulement un élément intermédiaire correspond, flatMap L'ignorance de la part de l'utilisateur à l'égard de l'annulation entraîne l'écrasement de l'indicateur lors de l'évaluation de l'élément suivant.

0 votes

@Holger Je voulais dire que "substream" seulement (ce qui n'était pas clair dans ma formulation) et je n'ai même pas pensé à suivre "substream". J'ai modifié la formulation et établi un lien vers votre commentaire pour plus de clarté.

20voto

Eugene Points 6271

Ce site es un bogue, quelle que soit la façon dont je le regarde - et merci à Holger pour ses commentaires. Je ne voulais pas mettre cette réponse ici (sérieusement !), mais aucune des réponses n'indique clairement qu'il s'agit d'un bogue.

Les gens disent que cela a à voir avec l'ordre/le désordre, et ce n'est pas vrai car cela rapportera true 3 fois :

Stream<String[]> s1 = Arrays.stream(strArray);
System.out.println(s1.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s2 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream));
System.out.println(s2.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s3 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream))
            .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
System.out.println(s3.spliterator().hasCharacteristics(Spliterator.ORDERED));

C'est aussi très intéressant que si vous le changez en :

String[][] strArray = { 
         { "Sample1", "Sample2" }, 
         { "Sample3", "Sample5", "Sample4" }, // Sample4 is the last one here
         { "Sample7", "Sample8" } 
};

puis Sample7 y Sample8 ne feront pas partie de la sortie, sinon ils le feront. Il semble que flatmap ignore un drapeau d'annulation qui serait introduit par dropWhile .

11voto

Michael Points 20266

Si vous regardez la documentation pour takeWhile :

si ce flux est ordonné, [renvoie] un flux composé des éléments suivants préfixe le plus long des éléments pris dans ce flux qui correspondent à l'ordre donné. prédicat donné.

si ce flux n'est pas ordonné, [retourne] un flux constitué d'un sous-ensemble d'éléments pris dans ce flux qui correspondent au prédicat donné.

Votre flux est ordonné par coïncidence, mais takeWhile ne sait pas qu'il est. En tant que tel, il renvoie la 2e condition - le sous-ensemble. Votre takeWhile agit juste comme un filter .

Si vous ajoutez un appel à sorted avant takeWhile vous obtiendrez le résultat escompté :

Arrays.stream(strArray)
      .flatMap(indStream -> Arrays.stream(indStream))
      .sorted()
      .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
      .forEach(ele -> System.out.println(ele));

17 votes

Pourquoi n'est-il pas ordonné, ou pourquoi ne sait-il pas qu'il l'est ? La "concaténation" de flux ordonnés devrait être ordonnée, n'est-ce pas ?

0 votes

@JBNizet takeWhile utilise un drapeau qui est une propriété du flux pour déterminer si celui-ci est ordonné ou non. Le fait que le flux soit en fait ordonné alors qu'il dit ne pas l'être est vraisemblablement un détail d'implémentation auquel il ne faut pas se fier.

9 votes

@JBNizet mais alors si vous prenez chaque étape séparée Stream<String[]> s1 = Arrays.stream(strArray); System.out.println(s1.spliterator().hasCharacteristics(Split‌​erator.ORDERED)) et ainsi de suite pour chaque étape - ils produiront tous une ORDERED stream, cela ressemble à un bug qui n'est pas encore signalé

9voto

nullpointer Points 1135

La raison en est que flatMap l'opération étant également une opérations intermédiaires avec lequel (l'un des) opération intermédiaire de court-circuitage avec état takeWhile est utilisé.

Le comportement de flatMap comme l'a souligné Holger dans cette réponse est certainement une référence à ne pas manquer pour comprendre le résultat inattendu de telles opérations de court-circuitage.

Le résultat escompté peut être obtenu en séparant ces deux opérations intermédiaires en introduisant une opération terminale pour utiliser de manière déterministe un flux ordonné et en les exécutant pour un échantillon comme :

List<String> sampleList = Arrays.stream(strArray).flatMap(Arrays::stream).collect(Collectors.toList());
sampleList.stream().takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
            .forEach(System.out::println);

De plus, il semble qu'il y ait un lien entre Bogue#JDK-8075939 pour retracer ce comportement déjà enregistré.

Modifier : Ce point peut être suivi à l'adresse suivante JDK-8193856 accepté comme un bogue.

8 votes

Je ne comprends pas votre explication. Pour moi, ce comportement ressemble à un bug. Et l'alternative que vous proposez nécessite deux pipelines Stream, ce qui pourrait être moins souhaitable.

2 votes

@Eran En effet le comportement semble être un bug. L'alternative proposée est simplement d'introduire une opération terminale pour compléter(exhaust) flatMap et ensuite traiter le flux pour exécuter takeWhile .

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X