51 votes

Comment fonctionne RxJS MergeMap?

Je ne comprends pas le but de l' mergeMap à tous. J'ai entendu deux "explications:

  1. "C'est comme SelectAll" dans LINQ - nan.
  2. "Eh bien, c'est une combinaison de RxJS merge et map" - nan (ou je ne peux pas reproduire ce).

Considérons le code suivant:

    var obs1 = new Rx.Observable.interval(1000);
    var obs2 = new Rx.Observable.interval(1000);

    //Just a merge and a map, works fine
    obs1.merge(obs2).map(x=> x+'a').subscribe(
      next => console.log(next)
    )

    //Who know what - seems to do the same thing as a plain map on 1 observable
    obs1.mergeMap(val => Rx.Observable.of(val + `B`))
        .subscribe(
          next => console.log(next)
        )

JS Bin

La dernière pièce étiquetés "Qui sait quoi" n'est rien de plus qu'une carte en obs1 - ce qui est le point?

Qu'est - mergeMap faire réellement? Qu'est ce qu'un exemple d'un permis de cas d'utilisation? (De préférence avec un peu de code)

Les Articles qui ne m'a pas aidé du tout (mergeMap code ci-dessus est extraite de l'un de ces éléments): 1, 2

119voto

artur grzesiak Points 4056

tl;dr; mergeMap est beaucoup plus puissant que map. La compréhension mergeMap est la condition nécessaire pour accéder à la pleine puissance de Rx.


les similitudes

  • les deux mergeMap et map agit sur un seul flux (contre zip, combineLatest)

  • les deux mergeMap et map peut transformer les éléments d'un flux (contre filter, delay)

les différences

carte

  • impossible de modifier la taille de la source de diffusion (hypothèse: map ne throw); pour chaque élément de la source exactement un mapped élément est émise; map ne peut pas ignorer les éléments (comme par exemple filter);

  • en cas de défaut planificateur de la transformation se produit de manière synchrone; être 100% clair: les cours d'eau de source peut livrer ses éléments de façon asynchrone, mais chaque élément suivant immédiatement mapped et ré-émise plus loin; map ne peut pas déplacer des éléments dans le temps comme par exemple delay

  • pas de restrictions sur les valeurs de retour

  • id: x => x

mergeMap

  • pouvez changer la taille de la source du ruisseau; pour chaque élément, il peut y avoir nombre arbitraire (0, 1 ou plusieurs) de nouveaux éléments créés/émis

  • il offre un contrôle de l'asynchronicité - à la fois lorsque de nouveaux éléments sont créés/émis et le nombre d'éléments à partir de la source de flux doivent être traités simultanément; par exemple, supposons source de flux émis 10 éléments, mais maxConcurrency a la valeur 2, puis les deux premiers éléments seront traitées immédiatement et le reste 8 tampon; une fois l'un des traités completed l'élément suivant à partir de la source de flux seront traitées et ainsi de suite - c'est un peu délicat, mais jetez un oeil à l'exemple ci-dessous

  • tous les autres opérateurs peuvent être mises en œuvre avec juste mergeMap et Observable constructeur

  • peut être utilisé pour les opérations asynchrones

  • les valeurs de retour doit être Observables de type (ou Rx a pour savoir comment créer des observables pour en sortir, par exemple, promesse, array)

  • id: x => Rx.Observable.of(x)

tableau analogie

let array = [1,2,3]
fn             map                    mergeMap
x => x*x       [1,4,9]                error /*expects array as return value*/
x => [x,x*x]   [[1,1],[2,4],[3,9]]    [1,1,2,4,3,9]

L'analogie ne montre pas une image complète et il correspond à l' .mergeMap avec maxConcurrency valeur 1. Dans un tel cas, les éléments seront classés comme ci-dessus, mais dans le cas général, il ne devrait pas être le cas. La seule garantie que nous avons, c'est que l'émission de nouveaux éléments seront de l'ordre par leur position dans le flux sous-jacent. Par exemple: [3,1,2,4,9,1] et [2,3,1,1,9,4] sont valides, mais [1,1,4,2,3,9] n'est pas (depuis 4 a été émise après le 2 dans le flux sous-jacent).

Quelques exemples d'utilisation de mergeMap:

// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
  return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}

Rx.Observable.range(1, 3)
  .mapWithMergeMap(x => x * x)
  .subscribe(x => console.log('mapWithMergeMap', x))

// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
  return this.mergeMap(x =>
    filterFn(x) ?
    Rx.Observable.of(x) :
    Rx.Observable.empty()); // return no element
}

Rx.Observable.range(1, 3)
  .filterWithMergeMap(x => x === 3)
  .subscribe(x => console.log('filterWithMergeMap', x))

// implement .delay with .mergeMap 
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
  return this.mergeMap(x =>
    Rx.Observable.create(obs => {
      // setTimeout is naive - one should use scheduler instead
      const token = setTimeout(() => {
        obs.next(x);
        obs.complete();
      }, delayMs)
      return () => clearTimeout(token);
    }))
}

Rx.Observable.range(1, 3)
  .delayWithMergeMap(500)
  .take(2)
  .subscribe(x => console.log('delayWithMergeMap', x))

// recursive count
const count = (from, to, interval) => {
  if (from > to) return Rx.Observable.empty();
  return Rx.Observable.timer(interval)
    .mergeMap(() =>
      count(from + 1, to, interval)
      .startWith(from))
}

count(1, 3, 1000).subscribe(x => console.log('count', x))

// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
  Rx.Observable.if(
    () => from > to,
    Rx.Observable.empty(),
    Rx.Observable.timer(interval)
    .mergeMap(() => countMoreRxWay(from + 1, to, interval)
      .startWith(from)))

const maxConcurrencyExample = () =>
  Rx.Observable.range(1,7)
    .do(x => console.log('emitted', x))
    .mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
    .do(x => console.log('processed', x))
    .subscribe()

setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>

24voto

Mark Points 859

.mergeMap() vous permet d'aplatir un ordre supérieur Observable en un seul flux. Par exemple:

Rx.Observable.from([1,2,3,4])
  .map(i => getFreshApiData())
  .subscribe(val => console.log('regular map result: ' + val));

//vs

Rx.Observable.from([1,2,3,4])
  .mergeMap(i => getFreshApiData())
  .subscribe(val => console.log('mergeMap result: ' + val));

function getFreshApiData() {
  return Rx.Observable.of('retrieved new data')
    .delay(1000);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

Voir ma réponse à cette autre question pour un en-dept explication de l' .xxxMap() opérateurs: Rxjs - Comment puis-je extraire plusieurs valeurs dans un tableau et de les nourrir de retour à la observables flux synchrone

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X