56 votes

RxJS combineLatest sans attendre l'émission des observables de la source ?

J'ai deux observables sources à partir desquels je dois calculer certaines données dès qu'un observable source est émis. J'essaie d'utiliser la fonction combineAll() mais il n'émet une valeur que lorsque chacune des observables de la source émet pour la première fois.

Existe-t-il un opérateur similaire à combineAll() qui émet dès que l'une des observables de la source émet pour la première fois ? Si ce n'est pas le cas, quelle est la façon la plus claire de le faire ?

Ce que j'ai essayé :

const source1$ = service.getSomeData();
const source2$ = service.getOtherData();

combineLatest(
  source1$,
  source2$
).pipe(
  map([source1Data, source2Data] => {
    // this code only gets executed when both observables emits for the first time
    return source1Data + source2Data;
  })
)

75voto

Jeffrey Westerkamp Points 2734

Si je comprends bien, vous voulez un modèle comme le schéma suivant :

stream1$ => ------ 1 ------ 12 -----------------------
stream2$ => ------------------------- 30 -------------

result$  => ------ 1 ------ 12 ------ 42 --------------

Si une valeur est disponible, émettez-la. Si les deux sont disponibles, émettez la combinaison des deux, une simple somme dans ce cas (12 + 30 = 42) ;

Tout d'abord, les flux d'entrée, j'en ai fait des sujets pour les besoins de cet exemple, afin que nous puissions y introduire des données manuellement :

const stream1$ = new Subject();
const stream2$ = new Subject();

Ensuite, nous allons combiner les entrées, d'abord canalisées par l'opérateur startWith. Cela permet de s'assurer que combineLatest produit une observable qui émet immédiatement - [null, null] pour être précis.

const combined$ = combineLatest(
  stream1$.pipe(startWith(null)),
  stream2$.pipe(startWith(null)),
);

Vous disposez maintenant d'un observable qui émet toujours des tableaux de longueur 2, contenant n'importe quelle combinaison de vos données (des chiffres dans cet exemple) et de null, comme dans le schéma suivant :

stream1$ | startWith(NULL) => NULL ----------- 1 ----------- 12 ----------------------------
stream2$ | startWith(NULL) => NULL ---------------------------------------- 30 -------------

combined$                     [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------

Enfin, vous pouvez inspecter et map cette sortie au format souhaité : la somme de 2 nombres si les deux sont disponibles, ou la première valeur disponible :

const processedCombinations$ = combined$.pipe(
  map(([data1, data2]) => {
    if (data1 === null) return data2;
    if (data2 === null) return data1;

    return data1 + data2;
  }),
);

Résultat :

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 -------------

Un problème subsiste : la première valeur émise par combined$ es [null, null] causant processedCombinations$ pour émettre null initialement. Une façon de résoudre ce problème est de chaîner un autre tuyau en utilisant la fonction skipWhile sur processedCombinations$ :

const final$ = processedCombinations$.pipe(skipWhile((input) => input === null));

Résultat :

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 -------------
final$                     => ---------------- 1 ----------- 12 ----------- 42 -------------

Une autre méthode, meilleure à mon avis, consiste à filtrer le fichier combined$ flux avant processedCombinations$ (en fait, maintenant final$ ) est créé à partir de celui-ci :

const combinedFiltered$ = combined$.pipe(
    filter(([first, second])=> first !== null || second !== null),
);

const final$ = combinedFiltered$.pipe(
    map(([data1, data2]) => {
        if (data1 === null) return data2;
        if (data2 === null) return data1;

        return data1 + data2;
    }),
);

Un diagramme correspondant montre bien comment les valeurs non pertinentes sont éliminées le plus tôt possible dans la hiérarchie des flux :

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
combinedFiltered$          => ---------------- [1, NULL] --- [12, NULL] --- [12, 30] -------
final$                     => ---------------- 1 ----------- 12 ----------- 42 -------------

Les diagrammes ci-dessus peuvent être produits avec ce code :

final$.subscribe(console.log);

stream1$.next(1);
// logs: 1

stream1$.next(12);
// logs: 12

stream2$.next(30);
// logs: 42

Importations utilisées :

import { combineLatest, Subject } from 'rxjs';
import { filter, map, skipWhile, startWith } from 'rxjs/operators';

47voto

Martin Points 1093

Une façon de procéder est de préfixer toutes les sources avec startWith :

combineLatest([
  source1$.pipe(startWith(?)),
  source2$.pipe(startWith(?)),
])

qui émet dès que l'une des observables de la source émet pour la première fois ?

Cela ressemble à ce que vous recherchez race(source1$, source2$) Méthode de création d'observable ou peut-être simplement merge(source1$, source2$).pipe(take(1)) . Mais cela dépend vraiment de ce que vous voulez faire.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X