Si je comprends bien, vous voulez un modèle comme le schéma suivant :
stream1$ => ------ 1 ------ 12 -----------------------
stream2$ => ------------------------- 30 -------------
result$ => ------ 1 ------ 12 ------ 42 --------------
Si une valeur est disponible, émettez-la. Si les deux sont disponibles, émettez la combinaison des deux, une simple somme dans ce cas (12 + 30 = 42) ;
Tout d'abord, les flux d'entrée, j'en ai fait des sujets pour les besoins de cet exemple, afin que nous puissions y introduire des données manuellement :
const stream1$ = new Subject();
const stream2$ = new Subject();
Ensuite, nous allons combiner les entrées, d'abord canalisées par l'opérateur startWith. Cela permet de s'assurer que combineLatest produit une observable qui émet immédiatement - [null, null]
pour être précis.
const combined$ = combineLatest(
stream1$.pipe(startWith(null)),
stream2$.pipe(startWith(null)),
);
Vous disposez maintenant d'un observable qui émet toujours des tableaux de longueur 2, contenant n'importe quelle combinaison de vos données (des chiffres dans cet exemple) et de null, comme dans le schéma suivant :
stream1$ | startWith(NULL) => NULL ----------- 1 ----------- 12 ----------------------------
stream2$ | startWith(NULL) => NULL ---------------------------------------- 30 -------------
combined$ [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
Enfin, vous pouvez inspecter et map
cette sortie au format souhaité : la somme de 2 nombres si les deux sont disponibles, ou la première valeur disponible :
const processedCombinations$ = combined$.pipe(
map(([data1, data2]) => {
if (data1 === null) return data2;
if (data2 === null) return data1;
return data1 + data2;
}),
);
Résultat :
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$ => NULL ----------- 1 ----------- 12 ----------- 42 -------------
Un problème subsiste : la première valeur émise par combined$
es [null, null]
causant processedCombinations$
pour émettre null
initialement. Une façon de résoudre ce problème est de chaîner un autre tuyau en utilisant la fonction skipWhile
sur processedCombinations$
:
const final$ = processedCombinations$.pipe(skipWhile((input) => input === null));
Résultat :
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$ => NULL ----------- 1 ----------- 12 ----------- 42 -------------
final$ => ---------------- 1 ----------- 12 ----------- 42 -------------
Une autre méthode, meilleure à mon avis, consiste à filtrer le fichier combined$
flux avant processedCombinations$
(en fait, maintenant final$
) est créé à partir de celui-ci :
const combinedFiltered$ = combined$.pipe(
filter(([first, second])=> first !== null || second !== null),
);
const final$ = combinedFiltered$.pipe(
map(([data1, data2]) => {
if (data1 === null) return data2;
if (data2 === null) return data1;
return data1 + data2;
}),
);
Un diagramme correspondant montre bien comment les valeurs non pertinentes sont éliminées le plus tôt possible dans la hiérarchie des flux :
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
combinedFiltered$ => ---------------- [1, NULL] --- [12, NULL] --- [12, 30] -------
final$ => ---------------- 1 ----------- 12 ----------- 42 -------------
Les diagrammes ci-dessus peuvent être produits avec ce code :
final$.subscribe(console.log);
stream1$.next(1);
// logs: 1
stream1$.next(12);
// logs: 12
stream2$.next(30);
// logs: 42
Importations utilisées :
import { combineLatest, Subject } from 'rxjs';
import { filter, map, skipWhile, startWith } from 'rxjs/operators';