7 votes

Comment implémenter un flux de futures pour un appel bloquant en utilisant futures.rs et Redis PubSub ?

J'essaie de créer un système par lequel mon application peut recevoir des données en continu à partir d'un canal Redis PubSub et les traiter. Le site Pilote Redis que j'utilise, ainsi que tous les autres pilotes Redis pour Rust que j'ai vus, utilisent une opération de blocage pour obtenir des données du canal qui ne renvoie une valeur que lorsqu'il reçoit des données :

let msg = match pubsub.get_message() {
        Ok(m) => m,
        Err(_) => panic!("Could not get message from pubsub!")
};
let payload: String = match msg.get_payload() {
    Ok(s) => s,
    Err(_) => panic!("Could not convert redis message to string!")
};

Je voulais utiliser le futures-rs pour envelopper cet appel de fonction bloquant dans un futur, de sorte que je puisse effectuer d'autres tâches dans mon application tout en attendant une entrée.

J'ai lu le tutoriel pour les contrats à terme et a essayé de créer un Stream qui signalerait la réception de données par le PubSub, mais je n'arrive pas à trouver comment le faire.

Comment puis-je créer schedule y poll fonctions pour le blocage pubsub.get_message() fonction ?

11voto

Shepmaster Points 1732

Une mise en garde importante Je n'ai jamais utilisé cette bibliothèque auparavant, et mes connaissances de bas niveau de certains concepts sont un peu... insuffisantes. Je suis surtout en train de lire le tutoriel . Je suis sûr que tous ceux qui ont fait du travail asynchrone vont lire ceci et en rire, mais cela peut être un point de départ utile pour d'autres personnes. Caveat emptor !


Commençons par quelque chose d'un peu plus simple, en démontrant comment une Stream travaux. Nous pouvons convertir un itérateur de Result dans un flux :

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

Cela nous montre une façon de consommer le flux. Nous utilisons and_then pour faire quelque chose à chaque charge utile (ici juste l'imprimer) et ensuite for_each pour convertir le Stream de retour dans un Future . Nous pouvons alors exécuter le futur en appelant l'étrangement nommé forget método .


Il s'agit ensuite d'intégrer la bibliothèque Redis dans le mélange, en ne traitant qu'un seul message. Puisque la bibliothèque get_message() est bloquante, nous devons introduire quelques fils dans le mélange. Ce n'est pas une bonne idée d'effectuer une grande quantité de travail dans ce type de système asynchrone, car tout le reste sera bloqué. Par exemple :

A moins qu'il n'en soit disposé autrement, il convient de s'assurer que les implémentations de cette fonction se terminent très rapidement .

Dans un monde idéal, la caisse de redis serait construite sur une bibliothèque comme futures et exposerait tout cela de manière native.

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Ma compréhension devient plus floue ici. Dans un thread séparé, nous bloquons pour le message et le poussons dans le canal quand nous l'obtenons. Ce que je ne comprends pas, c'est pourquoi nous devons nous accrocher à l'identifiant du thread. Je m'attendrais à ce que foo.forget se bloquerait, attendant que le flux soit vide.

Dans une connexion telnet au serveur Redis, envoyez ceci :

publish rust awesome

Et vous verrez que ça marche. L'ajout de déclarations d'impression montre que le (pour moi) le foo.forget est exécutée avant que le thread ne soit créé.


Les messages multiples sont plus délicats. Le site Sender se consomme lui-même pour éviter que le côté générateur ne prenne trop d'avance sur le côté consommateur. Ceci est accompli en retournant un autre futur de send ! Nous devons le sortir de là pour le réutiliser lors de la prochaine itération de la boucle :

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Je suis sûr qu'il y aura davantage d'écosystèmes pour ce type d'interopérabilité au fil du temps. Par exemple, le futures-cpupool La caisse pourrait probablement être étendu pour prendre en charge un cas d'utilisation similaire à celui-ci.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X