Une mise en garde importante Je n'ai jamais utilisé cette bibliothèque auparavant, et mes connaissances de bas niveau de certains concepts sont un peu... insuffisantes. Je suis surtout en train de lire le tutoriel . Je suis sûr que tous ceux qui ont fait du travail asynchrone vont lire ceci et en rire, mais cela peut être un point de départ utile pour d'autres personnes. Caveat emptor !
Commençons par quelque chose d'un peu plus simple, en démontrant comment une Stream
travaux. Nous pouvons convertir un itérateur de Result
dans un flux :
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
Cela nous montre une façon de consommer le flux. Nous utilisons and_then
pour faire quelque chose à chaque charge utile (ici juste l'imprimer) et ensuite for_each
pour convertir le Stream
de retour dans un Future
. Nous pouvons alors exécuter le futur en appelant l'étrangement nommé forget
método .
Il s'agit ensuite d'intégrer la bibliothèque Redis dans le mélange, en ne traitant qu'un seul message. Puisque la bibliothèque get_message()
est bloquante, nous devons introduire quelques fils dans le mélange. Ce n'est pas une bonne idée d'effectuer une grande quantité de travail dans ce type de système asynchrone, car tout le reste sera bloqué. Par exemple :
A moins qu'il n'en soit disposé autrement, il convient de s'assurer que les implémentations de cette fonction se terminent très rapidement .
Dans un monde idéal, la caisse de redis serait construite sur une bibliothèque comme futures et exposerait tout cela de manière native.
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Ma compréhension devient plus floue ici. Dans un thread séparé, nous bloquons pour le message et le poussons dans le canal quand nous l'obtenons. Ce que je ne comprends pas, c'est pourquoi nous devons nous accrocher à l'identifiant du thread. Je m'attendrais à ce que foo.forget
se bloquerait, attendant que le flux soit vide.
Dans une connexion telnet au serveur Redis, envoyez ceci :
publish rust awesome
Et vous verrez que ça marche. L'ajout de déclarations d'impression montre que le (pour moi) le foo.forget
est exécutée avant que le thread ne soit créé.
Les messages multiples sont plus délicats. Le site Sender
se consomme lui-même pour éviter que le côté générateur ne prenne trop d'avance sur le côté consommateur. Ceci est accompli en retournant un autre futur de send
! Nous devons le sortir de là pour le réutiliser lors de la prochaine itération de la boucle :
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Je suis sûr qu'il y aura davantage d'écosystèmes pour ce type d'interopérabilité au fil du temps. Par exemple, le futures-cpupool La caisse pourrait probablement être étendu pour prendre en charge un cas d'utilisation similaire à celui-ci.