86 votes

RXJS Attend que tous les éléments observables d'un tableau soient terminés (ou erronés)

Je suis poussant observables dans un tableau comme tel...

var tasks$ = [];
tasks$.push(Observable.timer(1000));
tasks$.push(Observable.timer(3000));
tasks$.push(Observable.timer(10000));

J'ai envie d'une Observable qui émet lorsque toutes les tâches de$ avoir terminé. Gardez à l'esprit, dans la pratique, les tâches de$ n'a pas connu un certain nombre de phénomènes Observables.

J'ai essayé d' Observable.zip(tasks$).subscribe() mais cela semble échouer dans le cas où il est à seulement 1 tâche, et me mène à croire que ZIP nécessite un nombre pair d'éléments, afin de travailler à la manière dont je m'attends.

J'ai essayé d' Observable.concat(tasks$).subscribe() , mais le résultat de la méthode concat opérateur semble juste être un tableau des observables... par exemple fondamentalement le même que l'entrée. Vous ne pouvez même pas appeler vous abonner sur elle.

En C#, cela s'apparenterait à du Task.WhenAll(). Dans l'ES6 promesse qu'elle serait semblable à Promise.all().

Je suis venu à travers un certain nombre de questions, mais ils semblent tous pour faire face à l'attente sur un numéro de flux (par exemple, la cartographie ensemble).

128voto

cartant Points 35253

Si vous voulez composer un observable qui émet une fois que tous les observables sources sont terminés, vous pouvez utiliser forkJoin :

 import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/forkJoin';
import 'rxjs/add/operator/first';

var tasks$ = [];
tasks$.push(Observable.timer(1000).first());
tasks$.push(Observable.timer(3000).first());
tasks$.push(Observable.timer(10000).first());
Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); });
 

22voto

João Ghignatti Points 344

Vous pouvez faire usage de zip.

Combine plusieurs Observables pour créer une Observable dont les valeurs sont calculées à partir des valeurs, dans l'ordre, de chacun de ses intrants Observables.

const obsvA = this._service.getObjA();
const obsvB = this._service.getObjB();
// or with array
// const obsvArray = [obsvA, obsvB];

const zip = Observable.zip(obsvA, obsvB);
// or with array
// const zip = Observable.zip(...obsvArray);
zip.subscribe(
  result => console.log(result), // result is an array with the responses [respA, respB]
);

Les choses à considérer:

N'a pas besoin d'être un même nombre de phénomènes observables. zip visuellement

enter image description here Comme l'a dit ici,

Le zip opérateur de s'abonner à tous les intérieurs observables, en attente pour chacun d'émettre une valeur. Une fois que cela se produit, toutes les valeurs correspondant à l'indice sera émis. Cela continuera jusqu'à ce qu'au moins l'une intérieure observables complète.

Lorsque l'un des observables renvoie une erreur (ou même les deux), l'abonnement est fermé (onComplete sur complet est appelé), et avec un onError méthode, vous obtenez seulement la première erreur.
zip.subscribe(
  result => console.log(result), // result is an array with the responses [respA, respB]
  error => console.log(error), // will return the error message of the first observable that throws error and then finish it
  () => console.log ('completed after first error or if first observable finishes)
);

0voto

Pour moi, cet échantillon était la meilleure solution.

 const source = Observable.interval(500);
const example = source.sample(Observable.interval(2000));
const subscribe = example.subscribe(val => console.log('sample', val));
 

Donc .. seulement lorsque second (exemple) émet - vous verrez la dernière valeur émise de premier (source).

Dans ma tâche, j'attends la validation du formulaire et tout autre événement DOM.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X