tl;dr; mergeMap
est beaucoup plus puissant que map
. La compréhension mergeMap
est la condition nécessaire pour accéder à la pleine puissance de Rx.
les similitudes
les deux mergeMap
et map
agit sur un seul flux (contre zip
, combineLatest
)
les deux mergeMap
et map
peut transformer les éléments d'un flux (contre filter
, delay
)
les différences
carte
impossible de modifier la taille de la source de diffusion (hypothèse: map
ne throw
); pour chaque élément de la source exactement un mapped
élément est émise; map
ne peut pas ignorer les éléments (comme par exemple filter
);
en cas de défaut planificateur de la transformation se produit de manière synchrone; être 100% clair: les cours d'eau de source peut livrer ses éléments de façon asynchrone, mais chaque élément suivant immédiatement mapped
et ré-émise plus loin; map
ne peut pas déplacer des éléments dans le temps comme par exemple delay
pas de restrictions sur les valeurs de retour
id
: x => x
mergeMap
pouvez changer la taille de la source du ruisseau; pour chaque élément, il peut y avoir nombre arbitraire (0, 1 ou plusieurs) de nouveaux éléments créés/émis
il offre un contrôle de l'asynchronicité - à la fois lorsque de nouveaux éléments sont créés/émis et le nombre d'éléments à partir de la source de flux doivent être traités simultanément; par exemple, supposons source de flux émis 10 éléments, mais maxConcurrency
a la valeur 2, puis les deux premiers éléments seront traitées immédiatement et le reste 8 tampon; une fois l'un des traités complete
d l'élément suivant à partir de la source de flux seront traitées et ainsi de suite - c'est un peu délicat, mais jetez un oeil à l'exemple ci-dessous
tous les autres opérateurs peuvent être mises en œuvre avec juste mergeMap
et Observable
constructeur
peut être utilisé pour les opérations asynchrones
les valeurs de retour doit être Observables de type (ou Rx a pour savoir comment créer des observables pour en sortir, par exemple, promesse, array)
id
: x => Rx.Observable.of(x)
tableau analogie
let array = [1,2,3]
fn map mergeMap
x => x*x [1,4,9] error /*expects array as return value*/
x => [x,x*x] [[1,1],[2,4],[3,9]] [1,1,2,4,3,9]
L'analogie ne montre pas une image complète et il correspond à l' .mergeMap
avec maxConcurrency
valeur 1. Dans un tel cas, les éléments seront classés comme ci-dessus, mais dans le cas général, il ne devrait pas être le cas. La seule garantie que nous avons, c'est que l'émission de nouveaux éléments seront de l'ordre par leur position dans le flux sous-jacent. Par exemple: [3,1,2,4,9,1]
et [2,3,1,1,9,4]
sont valides, mais [1,1,4,2,3,9]
n'est pas (depuis 4
a été émise après le 2
dans le flux sous-jacent).
Quelques exemples d'utilisation de mergeMap
:
// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}
Rx.Observable.range(1, 3)
.mapWithMergeMap(x => x * x)
.subscribe(x => console.log('mapWithMergeMap', x))
// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
return this.mergeMap(x =>
filterFn(x) ?
Rx.Observable.of(x) :
Rx.Observable.empty()); // return no element
}
Rx.Observable.range(1, 3)
.filterWithMergeMap(x => x === 3)
.subscribe(x => console.log('filterWithMergeMap', x))
// implement .delay with .mergeMap
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
return this.mergeMap(x =>
Rx.Observable.create(obs => {
// setTimeout is naive - one should use scheduler instead
const token = setTimeout(() => {
obs.next(x);
obs.complete();
}, delayMs)
return () => clearTimeout(token);
}))
}
Rx.Observable.range(1, 3)
.delayWithMergeMap(500)
.take(2)
.subscribe(x => console.log('delayWithMergeMap', x))
// recursive count
const count = (from, to, interval) => {
if (from > to) return Rx.Observable.empty();
return Rx.Observable.timer(interval)
.mergeMap(() =>
count(from + 1, to, interval)
.startWith(from))
}
count(1, 3, 1000).subscribe(x => console.log('count', x))
// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
Rx.Observable.if(
() => from > to,
Rx.Observable.empty(),
Rx.Observable.timer(interval)
.mergeMap(() => countMoreRxWay(from + 1, to, interval)
.startWith(from)))
const maxConcurrencyExample = () =>
Rx.Observable.range(1,7)
.do(x => console.log('emitted', x))
.mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
.do(x => console.log('processed', x))
.subscribe()
setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>