4 votes

Obtenir la valeur de retour de la sous-carte sur Observable

Utilisation de RxJS 5.0.0-rc.1 J'essaie de communiquer mon Observer y Observable d'une manière similaire à comment fonctionnent les générateurs et les itérateurs en échangeant des données en utilisant yield y .next() . L'objectif est de ce qu'un appel à .subscribe renvoie à et modifier/mettre à jour les valeurs suivantes dans mon flux d'observables en fonction de cela.

Je ne suis pas tout à fait sûr que ce soit possible. Bien que, j'ai découvert que vous puede attraper les exceptions lancées sur .subscribe les rappels. Les extraits suivants s'impriment "Boom!" :

var source = Observable.create((observer) => {
  try {
    observer.next(42);
  } catch (e) {
    // This will catch the Error
    // thrown on the subscriber
    console.log(e.message);
  }
  observer.complete();
});

source.subscribe(() => {
  throw new Error('Boom!');
});

Alors, et si au lieu de lancer, l'abonné retournait une valeur ? Existe-t-il un moyen pour le Observable pour le récupérer ? Peut-être ai-je une mauvaise approche. Si oui, quelle est la façon "réactive" de faire les choses dans ce scénario ?

Merci beaucoup.


EDIT

Une solution possible que j'ai trouvée consiste à fournir une fonction de rappel pour chaque élément du flux. Quelque chose comme :

var source = Observable.create((observer) => {
  // This will print "{ success: true }"
  observer.next({ value: 42, reply: console.log });
  observer.complete();
});

source.subscribe(({ value, reply }) => {
  console.log('Got', value);
  return reply({ success: true });
});

D'autres idées ?


EDIT 2

Comme ma question initiale a suscité une certaine confusion quant à ce que j'essayais de réaliser, je vais décrire mon scénario réel. Je suis en train d'écrire l'API d'un module de gestion de messages par le biais de files d'attente (un peu comme un mécanisme AMQP-RPC simplifié, en mémoire) et j'ai pensé que RxJS serait un bon choix.

Cela fonctionne comme prévu : un Publisher envoie des messages dans une file d'attente, qui sont remis à une Consumer . A terme, le Consumer peut répondre à la Publisher qui peut écouter cette réponse si elle est intéressée.

Dans un scénario idéal, l'API ressemblerait à quelque chose comme ceci :

Consumer().consume('some.pattern')
  .subscribe(function(msg) {
    // Do something with `msg`
    console.log(msg.foo);
    return { ok: true };
  });

Publisher().publish('some.pattern', { foo: 42 })
// (optional) `.subscribe()` to get reply from Consumer

Cet exemple imprimerait 42 .

La logique pour répondre à la Publisher se trouve dans le Consumer fonction. Mais la réponse réelle vient de la .subscribe() le rappel. Ce qui m'amène à ma question initiale : comment dois-je procéder pour récupérer la valeur renvoyée par le créateur du flux ?

Pensez à Consumer#consume() comme :

/**
 * Returns an async handler that gets invoked every time
 * a new message matching the pattern of this consumer
 * arrives.
 */
function waitOnMessage(observer) {
  return function(msg) {
    observer.next(msg);
    // Conceptually, I'd like the returned
    // object from `.subscribe()` to be available
    // in this scope, somehow.
    // That would allow me to go like: 
    // `sendToQueue(pubQueue, response);`
  }
}

return Observable.create((observer) => {
  queue.consume(waitOnMessage(observer));
});

Est-ce que ça a plus de sens ?

2voto

user3743222 Points 12564

Il existe en effet des similitudes entre les générateurs et les observables. Comme vous pouvez le constater aquí Les observables (séquence asynchrone de valeurs) sont la version asynchrone des itérables (séquence synchrone de valeurs).

Maintenant, un générateur est une fonction qui renvoie un Iterable . Cependant, Rxjs Observable englobe à la fois un générateur - alias producteur (que vous exécutez/démarrez en appelant subscribe ) et la séquence de valeurs asynchrones produite (que vous observez en passant une commande Observer objet). Et le subscribe renvoie un Disposable qui vous permet de ne plus recevoir de valeurs (déconnexion). Ainsi, si les générateurs et les observables sont des concepts doubles, les API permettant de les utiliser diffèrent.

Vous ne pouvez pas effectuer de communication bidirectionnelle par défaut avec l'API observable rxjs. Vous pouvez probablement y parvenir en construisant vous-même le canal de retour par le biais de sujets (notez que vous DEVEZ avoir une valeur initiale pour lancer le cycle).

var backChannel = Rx.Subject();
backChannel.startWith(initialValue).concatMap(generateValue)
  .subscribe(function observer(value){
  // Do whatever
  // pass a value through the backChannel
  backChannel.next(someValue)
})
// generateValue is a function which takes a value from the back channel 
// and returns a promise with the next value to be consumed by the observer.

Vous pourriez envisager d'envelopper cela avec :

function twoWayObsFactory (yield, initialValue) {
  var backChannel = Rx.BehaviorSubject(initialValue);
  var next = backChannel.next.bind(backChannel);
  return {
    subscribe : function (observer) {
      var disposable = backChannel.concatMap(yield)
        .subscribe(function(x) {
           observer(next, x);
        });
      return {
        dispose : function (){disposable.dispose(); backChannel.dispose();}
      }
    }
  }
}

// Note that the observer is now taking an additional parameter in its signature
// for instance
// observer = function (next, yieldedValue) {
//              doSomething(yieldedValue);
//              next(anotherValue);
//            }
// Note also that `next` is synchronous, as such you should avoir sequences
// of back-and-forth communication that is too long. If your `yield` function
// would be synchronous, you might run into stack overflow errors.
// All the same, the `next` function call should be the last line, so order of
// execution in your program is the same independently of the synchronicity of
// the `yield` function

Sinon, le comportement que vous décrivez semble être celui d'un générateur asynchrone. Je n'en ai jamais utilisé, mais comme il s'agit d'une proposition pour une future version de javascript, je pense que vous pouvez déjà commencer à l'essayer avec Babel (cf. https://github.com/tc39/proposal-async-iteration ).

EDIT :

Si vous recherchez un mécanisme de bouclage (approche moins générale mais qui pourrait très bien s'adapter à votre cas d'utilisation, si ce que vous voulez faire est assez simple), la fonction expand L'opérateur pourrait aider. Pour comprendre son comportement, veuillez consulter le doc et les réponses suivantes sur SO pour des exemples d'utilisation dans des contextes concrets :

En gros, expand vous permet à la fois d'émettre une valeur en aval et de renvoyer cette valeur en même temps dans votre producteur.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X