3 votes

Création d'un observable paresseux, mis en cache, qui n'exécute la source qu'une seule fois.

J'essaie d'utiliser un observable rxjs pour déléguer, mais partager, un travail coûteux pendant toute la durée de vie d'une application.

Essentiellement, quelque chose comme :

var work$ = Observable.create((o) => {
  const expensive = doSomethingExpensive();
  o.next(expensive);
  observer.complete();
})
.publishReplay(1)
.refCount();

Cela fonctionne bien et fait exactement ce que je veux, à l'exception d'une chose : si tous les abonnés se désabonnent, lorsque le suivant s'abonne, mon travail coûteux recommence. Je veux le conserver.

maintenant, je pourrais utiliser un sujet, ou je pourrais supprimer le refCount() et utiliser connect manuellement (et ne jamais me déconnecter). Mais cela ferait en sorte que le travail coûteux se produise au moment où je me connecte, et non la première fois qu'un abonné essaie de consommer work$.

Essentiellement, je veux quelque chose de semblable à refCount qui ne regarde que le premier abonnement à se connecter, et ne se déconnecte jamais. Un "lazy connect".

Une telle chose est-elle possible ?

3voto

Mark Points 859

Comment publishReplay() fonctionnent réellement

Il crée en interne un ReplaySubject et le rend multicast compatible. La valeur de relecture minimale de ReplaySubject est de 1 émission. Il en résulte ce qui suit :

  • La première souscription déclenchera le publishReplay(1) pour s'abonner en interne au flux source et acheminer toutes les émissions par l'intermédiaire de l'outil de gestion des émissions. ReplaySubject en mettant effectivement en cache le dernier n (=1) émissions
  • Si un deuxième abonnement est lancé alors que la source est toujours active, le programme multicast() nous reliera au même replaySubject et nous recevrons toutes les prochaines émissions jusqu'à ce que le flux source soit complet.
  • Si un abonnement est lancé alors que la source est déjà terminée, le replaySubject a mis en cache la dernière version de la source. n et il ne recevra que celles-ci avant de terminer.

    const source = Rx.Observable.from([1,2]) .mergeMap(i => Rx.Observable.of('emission:'+i).delay(i * 100)) .do(null,null,() => console.log('source stream completed')) .publishReplay(1) .refCount();

    // two subscriptions which are both in time before the stream completes source.subscribe(val => console.log(sub1:${val}), null, () => console.log('sub1 completed')); source.subscribe(val => console.log(sub2:${val}), null, () => console.log('sub2 completed'));

    // new subscription after the stream has completed already setTimeout(() => { source.subscribe(val => console.log(sub_late-to-the-party:${val}), null, () => console.log('sub_late-to-the-party completed')); }, 500);

    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X