Utilisation de RxJS 5.0.0-rc.1
J'essaie de communiquer mon Observer
y Observable
d'une manière similaire à comment fonctionnent les générateurs et les itérateurs en échangeant des données en utilisant yield
y .next()
. L'objectif est de ce qu'un appel à .subscribe
renvoie à et modifier/mettre à jour les valeurs suivantes dans mon flux d'observables en fonction de cela.
Je ne suis pas tout à fait sûr que ce soit possible. Bien que, j'ai découvert que vous puede attraper les exceptions lancées sur .subscribe
les rappels. Les extraits suivants s'impriment "Boom!"
:
var source = Observable.create((observer) => {
try {
observer.next(42);
} catch (e) {
// This will catch the Error
// thrown on the subscriber
console.log(e.message);
}
observer.complete();
});
source.subscribe(() => {
throw new Error('Boom!');
});
Alors, et si au lieu de lancer, l'abonné retournait une valeur ? Existe-t-il un moyen pour le Observable
pour le récupérer ? Peut-être ai-je une mauvaise approche. Si oui, quelle est la façon "réactive" de faire les choses dans ce scénario ?
Merci beaucoup.
EDIT
Une solution possible que j'ai trouvée consiste à fournir une fonction de rappel pour chaque élément du flux. Quelque chose comme :
var source = Observable.create((observer) => {
// This will print "{ success: true }"
observer.next({ value: 42, reply: console.log });
observer.complete();
});
source.subscribe(({ value, reply }) => {
console.log('Got', value);
return reply({ success: true });
});
D'autres idées ?
EDIT 2
Comme ma question initiale a suscité une certaine confusion quant à ce que j'essayais de réaliser, je vais décrire mon scénario réel. Je suis en train d'écrire l'API d'un module de gestion de messages par le biais de files d'attente (un peu comme un mécanisme AMQP-RPC simplifié, en mémoire) et j'ai pensé que RxJS serait un bon choix.
Cela fonctionne comme prévu : un Publisher
envoie des messages dans une file d'attente, qui sont remis à une Consumer
. A terme, le Consumer
peut répondre à la Publisher
qui peut écouter cette réponse si elle est intéressée.
Dans un scénario idéal, l'API ressemblerait à quelque chose comme ceci :
Consumer().consume('some.pattern')
.subscribe(function(msg) {
// Do something with `msg`
console.log(msg.foo);
return { ok: true };
});
Publisher().publish('some.pattern', { foo: 42 })
// (optional) `.subscribe()` to get reply from Consumer
Cet exemple imprimerait 42
.
La logique pour répondre à la Publisher
se trouve dans le Consumer
fonction. Mais la réponse réelle vient de la .subscribe()
le rappel. Ce qui m'amène à ma question initiale : comment dois-je procéder pour récupérer la valeur renvoyée par le créateur du flux ?
Pensez à Consumer#consume()
comme :
/**
* Returns an async handler that gets invoked every time
* a new message matching the pattern of this consumer
* arrives.
*/
function waitOnMessage(observer) {
return function(msg) {
observer.next(msg);
// Conceptually, I'd like the returned
// object from `.subscribe()` to be available
// in this scope, somehow.
// That would allow me to go like:
// `sendToQueue(pubQueue, response);`
}
}
return Observable.create((observer) => {
queue.consume(waitOnMessage(observer));
});
Est-ce que ça a plus de sens ?