57 votes

Concat VS Opérateur de fusion

J'ai vérifié la documentation de RXJava et j'ai remarqué que les opérateurs concat et merge semblent faire la même chose. J'ai écrit quelques tests pour en être sûr.

@Test
public void testContact() {

    Observable.concat(Observable.just("Bonjour"),
                      Observable.just("réactif"),
                      Observable.just("monde"))
              .subscribe(System.out::println);
}

@Test
public void testMerge() {

    Observable.merge(Observable.just("Bonjour"),
                      Observable.just("réactif"),
                      Observable.just("monde"))
            .subscribe(System.out::println);
}

La documentation dit

L'opérateur Merge est également similaire. Il combine les émissions de deux ou plusieurs Observables, mais peut les entrelacer, tandis que Concat n'entrelace jamais les émissions de plusieurs Observables.

Mais je ne comprends toujours pas entièrement, en exécutant ce test des milliers de fois, le résultat de merge est toujours le même. Comme l'ordre n'est pas garanti, je m'attendais parfois à voir "réactif" "monde" "bonjour", par exemple.

Le code est disponible ici https://github.com/politrons/reactive

149voto

Artur Biesiadowski Points 2534

C'est comme décrit dans la documentation que vous avez citée - merge peut entrelacer les sorties, tandis que concat attendra d'abord que les flux précédents se terminent avant de traiter les flux ultérieurs. Dans votre cas, avec des flux statiques à élément unique, cela ne fait aucune réelle différence (mais en théorie, merge pourrait afficher des mots dans un ordre aléatoire et rester tout de même conforme à la spécification). Si vous voulez voir la différence, essayez ce qui suit (vous devrez ajouter quelques délais par la suite pour éviter une sortie prématurée)

    Observable.merge(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);

A0 B0 A1 B1 B2 A2 B3 A3 B4 A4

versus

    Observable.concat(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);

A0 A1 A2 A3 A4 A5 A6 A7 A8

Concat ne commencera jamais à afficher B, car le flux A ne se termine jamais.

s/flux/observable/g ;)

La documentation fournit de jolis graphiques pour montrer la différence. Vous devez vous rappeler que merge ne garantit pas l'intercalation des éléments un par un, c'est juste l'un des exemples possibles.

Concat

Opérateur Concat Merge

Opérateur Merge

37 votes

Une petite note est que si les sources sont synchrones alors merge = concat

0 votes

Bonne remarque!, cela explique pourquoi mon exemple va toujours dans le même ordre. C'était la partie qui me troublait. Avec l'exemple de Dave, puisque c'est interval(async) c'est reproductible

1 votes

@ArturBiesiadowski Vous acceptez d'ajouter zip dans votre réponse aussi?

8voto

Amit Shekhar Points 2184

Concat

Concat émet les émissions de deux observables ou plus sans les entrelacer. Il maintiendra l'ordre des observables tout en émettant les éléments. Cela signifie qu'il émettra tous les éléments du premier observable, puis il émettra tous les éléments du deuxième observable, et ainsi de suite.

Opérateur Concat

Comprenons cela clairement par un exemple.

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable observableFirst = Observable.fromArray(listFirst);
final Observable observableSecond = Observable.fromArray(listSecond);

Observable.concat(observableFirst, observableSecond)
        .subscribe(new Observer() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Comme nous utilisons l'opérateur Concat, il maintiendra l'ordre et émettra les valeurs comme A1, A2, A3, A4, B1, B2, B3.

Merge

Merge combine plusieurs observables en un seul en fusionnant leurs émissions. Il ne maintiendra pas l'ordre tout en émettant les éléments.

Opérateur Merge

Comprenons cela clairement par un exemple.

final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable observableFirst = Observable.fromArray(listFirst);
final Observable observableSecond = Observable.fromArray(listSecond);

Observable.merge(observableFirst, observableSecond)
        .subscribe(new Observer() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Comme nous utilisons l'opérateur Merge, il ne maintiendra pas l'ordre et peut émettre les valeurs dans n'importe quel ordre tel que A1, B1, A2, A3, B2, B3, A4 ou A1, A2, B1, B2, A3, A4, B3 ou autre chose.

Voilà comment nous devrions utiliser les opérateurs Concat et Merge en RxJava en fonction de notre cas d'utilisation.

2voto

Thracian Points 1602

Si les deux sources sont synchrones comme l'a indiqué OP, elles retournent exactement dans le même ordre avec merge et concat.

Mais lorsque les sources ne sont pas synchrones, par exemple deux sources telles qu'une base de données et un socket qui envoient des données avec des délais aléatoires ou indéterminés, le résultat ou l'ordre des données est différent pour merge et concat.

fun  Observable.getAsyncItems(): Observable {

    return concatMap {

        val delay = Random().nextInt(10)

        Observable.just(it)
                .delay(delay.toLong(), TimeUnit.MILLISECONDS)
    }
}

Cette fonction d'extension ci-dessus ajoute un délai à chaque émission de manière aléatoire, mais elles sont toujours émises dans le même ordre. Si les données sont A, B, C, D et E, alors la sortie est toujours A, B, C, D et E.

Comparons maintenant le comportement de merge et de concat lorsque les sources sont asynchrones.

val source1 =
        Observable.just("A", "B", "C", "D", "E").getAsyncItems()
val source2 =
        Observable.just(1, 2, 3, 4, 5).getAsyncItems()

Observable.merge(source1, source2).subscribe {print("$it-")}

Affiche quelque chose comme ceci A-B-1-C-2-D-3-4-E-5- ou A-1-2-3-B-C-D-4-E-5- cela dépend du délai aléatoire.

Observable.concat(source1, source2).subscribe {print("$it-")}

Affiche A-B-C-D-E-1-2-3-4-5- peu importe combien de fois il s'exécute.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X