La contre-pression dans WebFlux
Pour comprendre comment fonctionne la contre-pression dans l'implémentation actuelle du cadre WebFlux, nous devons récapituler la couche de transport utilisée par défaut ici. Comme on s'en souvient, la communication normale entre le navigateur et le serveur (la communication de serveur à serveur est généralement la même) se fait par le biais d'une connexion TCP. WebFlux utilise également ce transport pour la communication entre un client et le serveur. Ensuite, afin d'obtenir la signification de l'élément contrôle de la contre-pression nous devons récapituler ce que signifie la rétropression du point de vue de la spécification Reactive Streams.
La sémantique de base définit comment la transmission des éléments du flux est régulée par la contre-pression.
Donc, à partir de cette déclaration, nous pouvons conclure que dans les flux réactifs, la contre-pression est un mécanisme qui régule la demande par la transmission (notification) du nombre d'éléments que le destinataire peut consommer ; et ici nous avons un point délicat. Le TCP a une abstraction d'octets plutôt qu'une abstraction d'éléments logiques. Ce que nous voulons habituellement en disant contrôle de la contre-pression est le contrôle du nombre d'éléments logiques envoyés/reçus vers/depuis le réseau. Même si le TCP possède son propre contrôle de flux (voir la signification de ici et l'animation là ), ce contrôle de flux concerne toujours les octets plutôt que les éléments logiques.
Dans l'implémentation actuelle du module WebFlux, la contre-pression est régulée par le contrôle du flux de transport, mais elle n'expose pas la demande réelle du destinataire. Afin de voir enfin le flux d'interaction, veuillez consulter le diagramme suivant :
Pour simplifier, le diagramme ci-dessus montre la communication entre deux microservices où celui de gauche envoie des flux de données, et celui de droite consomme ce flux. La liste numérotée suivante fournit une brève explication de ce diagramme :
- C'est le cadre WebFlux qui se charge de convertir les éléments logiques en octets et inversement et de les transférer/recevoir vers/depuis le TCP (réseau).
- C'est le début d'un traitement de longue haleine de l'élément qui demande les éléments suivants une fois le travail terminé.
- Ici, alors qu'il n'y a pas de demande de la part de la logique métier, le WebFlux met en file d'attente les octets qui proviennent du réseau sans leur acquittement (il n'y a pas de demande de la part de la logique métier).
- En raison de la nature du contrôle de flux TCP, le Service A peut toujours envoyer des données au réseau.
Comme nous pouvons le remarquer dans le diagramme ci-dessus, la demande exposée par le destinataire est différente de la demande de l'expéditeur (demande ici en éléments logiques). Cela signifie que la demande des deux est isolée et fonctionne seulement pour l'interaction WebFlux <-> Business logic (Service) et expose moins la contre-pression pour l'interaction Service A <-> Service B. Tout cela signifie que le contrôle de la contre-pression n'est pas aussi équitable dans WebFlux que nous l'attendons.
Tout cela signifie que le contrôle de la contre-pression n'est pas aussi équitable dans WebFlux que nous l'attendions.
Mais je veux toujours savoir comment contrôler la contre-pression.
Si nous voulons toujours avoir un contrôle injuste de la contre-pression dans WebFlux, nous pouvons le faire avec le soutien des opérateurs de Project Reactor tels que limitRate()
. L'exemple suivant montre comment nous pouvons utiliser cet opérateur :
@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {
return tweetService.process(tweetsFlux.limitRate(10))
.then();
}
Comme nous pouvons le voir dans l'exemple, limitRate()
permet de définir le nombre d'éléments à préempter en une seule fois. Cela signifie que même si l'abonné final demande Long.MAX_VALUE
les éléments limitRate
L'opérateur divise cette demande en morceaux et ne permet pas de consommer plus que cela en une seule fois. On peut faire la même chose avec le processus d'envoi d'éléments :
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
return tweetService.retreiveAll()
.limitRate(10);
}
L'exemple ci-dessus montre que, même si WebFlux demande plus de 10 éléments à la fois, le système de gestion de l'information de WebFlux ne peut pas les traiter. limitRate()
limite la demande à la taille du préfixe et empêche de consommer plus que le nombre spécifié d'éléments en une seule fois.
Une autre option consiste à mettre en œuvre sa propre Subscriber
ou de prolonger le BaseSubscriber
de Project Reactor. Par exemple, ce qui suit est un exemple naïf de la façon dont nous pouvons le faire :
class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {
int consumed;
final int limit = 5;
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(limit);
}
@Override
protected void hookOnNext(T value) {
// do business logic there
consumed++;
if (consumed == limit) {
consumed = 0;
request(limit);
}
}
}
Contre-pression équitable avec le protocole RSocket
Afin de réaliser la rétropression des éléments logiques à travers les frontières du réseau, nous avons besoin d'un protocole approprié pour cela. Heureusement, il en existe un appelé Protocole RScoket . RSocket est un protocole de niveau application qui permet de transférer une demande réelle à travers les frontières du réseau. Il existe une implémentation RSocket-Java de ce protocole qui permet de mettre en place un serveur RSocket. Dans le cas d'une communication de serveur à serveur, la même bibliothèque RSocket-Java fournit également une implémentation client. Pour en savoir plus sur l'utilisation de RSocket-Java, veuillez consulter les exemples suivants ici . Pour la communication navigateur-serveur, il existe un RSocket-JS qui permet de câbler la communication en continu entre le navigateur et le serveur via WebSocket.
Cadres connus au-dessus de RSocket
Il existe aujourd'hui quelques cadres de travail basés sur le protocole RSocket.
Proteus
L'un de ces cadres est un projet Proteus qui propose des microservices à part entière construits au-dessus de RSocket. De plus, Proteus est bien intégré au framework Spring, ce qui nous permet d'obtenir un bon contrôle de la contre-pression (voir les exemples ci-dessous). là )
Autres lectures
0 votes
Comment d'avoir contre-pression ou comment pour faire face à vous avez déjà des mécanismes intégrés pour y faire face, alors voulez-vous les découvrir ?
0 votes
Si c'est le cas, consultez cette page
0 votes
Je ne savais pas que je l'avais déjà par défaut, alors peut-être que la façon de s'en occuper est meilleure. Je veux connaître le mécanisme et sa configuration dans Spring Web-Flux.
2 votes
@Andrew Tobilko RxJava est différent de Reactor qui est sous le capot de Spring WebFlux.