2 votes

Autoriser une seule connexion avec Akka stream comme serveur tcp

J'essaie de construire un serveur tcp simple en utilisant les flux Akka.

 Tcp()
  .bind(props.host, props.port)
  .to(Sink.foreach(_.handleWith(handler)))
  .run()
  .onComplete {
    case Success(i) => logger.info(s"Server is bound at ${props.host}:${props.port}")
    case Failure(e) => logger.error("Server binding failure", e)
  }

Je veux autoriser au maximum une connexion à la fois. Pour y parvenir, j'ai ajouté la ligne suivante dans mon fichier application.conf archivo.

akka.io.tcp.max-channels = 2

Avec cette configuration, akka n'autorise qu'une seule connexion à la fois. Cependant, dès que la deuxième connexion est tentée, il rejette la demande et échoue lui-même avec le message suivant :

Could not register incoming connection since selector capacity limit is reached, closing connection

A partir de ce point, il n'est pas possible d'établir une quelconque connexion puisque le serveur Tcp est hors service.

Pregunta : Quelle est la bonne façon d'activer une seule connexion à la fois ? Le but principal est de répondre à la première demande de connexion et de rejeter les autres alors qu'elle est toujours en cours. Il devrait être à nouveau possible d'établir une autre connexion, après la fermeture de la précédente. Comme je l'ai mentionné, une seule connexion devrait être autorisée à la fois.

BONUS : Est-il possible de fournir une liste blanche pour qu'akka stream n'accepte que les connexions provenant de cette liste ? J'envisage d'autoriser uniquement les adresses ip connues à se connecter à mon serveur. Pour y parvenir, je pense qu'il est suffisant de connaître la bonne manière de rejeter une requête. Je peux donc comparer l'adresse IP de la connexion entrante avec une liste donnée et la rejeter si elle n'y figure pas. Mais toute meilleure solution est également appréciée.

1voto

maks Points 1770

Méthode de liaison de Tcp a un paramètre options qui accepte un Traversable d'options Socket. Vous pouvez passer quelque chose comme ceci à ce paramètre :

case class AllowedAddresses(addresses: Seq[InetAddress]) extends SocketOption {
    override def beforeConnect(s: Socket): Unit = {
      if (!addresses.contains(s.getInetAddress)) s.close()
    }
  }

donc votre code ressemblera à ceci :

Tcp()
  .bind(props.host, props.port, options = List(AllowedAddresses(listOfAddresses)))
  .to(Sink.foreach(_.handleWith(handler)))
  .run()
  .onComplete {
    case Success(i) => logger.info(s"Server is bound at ${props.host}:${props.port}")
    case Failure(e) => logger.error("Server binding failure", e)
  }

L'approche consistant à limiter le nombre de requêtes est la même, les méthodes d'enquête dans le cadre de l SocketOptions trait

PS. Je n'ai pas essayé de le faire fonctionner, je viens juste de le conclure après avoir étudié l'API de flux, donc merci de vérifier si c'est correct.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X