27 votes

Comment les E/S fonctionnent-elles dans Akka ?

Comment le modèle d'acteur (dans Akka) fonctionne-t-il lorsque vous devez effectuer des E/S (par exemple, une opération de base de données) ?

D'après ce que j'ai compris, une opération bloquante lèvera une exception (et ruinera essentiellement toute concurrence en raison de la nature événementielle de Netty, qu'Akka utilise). Par conséquent, je devrais utiliser un Future ou quelque chose de similaire - mais je ne comprends pas le modèle de concurrence.

  1. Un acteur peut-il traiter plusieurs messages simultanément ?
  2. Si un acteur fait un appel bloquant dans un fichier future (c'est-à-dire. future.get() ) bloque-t-il uniquement l'exécution de l'acteur actuel ou empêche-t-il l'exécution de tous les acteurs jusqu'à ce que l'appel bloquant soit terminé ?
  3. S'il bloque toute exécution, comment l'utilisation d'un futur favorise-t-elle la concurrence (c'est-à-dire que le fait d'invoquer des appels bloquants dans un futur ne revient-il pas à créer un acteur et à exécuter l'appel bloquant) ?
  4. Quelle est la meilleure façon de traiter un processus à plusieurs étapes (par exemple, lecture de la base de données, appel d'un service Web bloquant, lecture de la base de données, écriture dans la base de données) où chaque étape dépend de la précédente ?

Le contexte de base est le suivant :

  • J'utilise un serveur Websocket qui va maintenir des milliers de sessions.
  • Chaque session a un certain état (par exemple, les détails d'authentification, etc.) ;
  • Le client Javascript envoie un message JSON-RPC au serveur, qui le transmet à l'acteur de session approprié, qui l'exécute et renvoie un résultat.
  • L'exécution de l'appel RPC implique des entrées/sorties et des appels bloquants.
  • Il y aura un grand nombre de demandes simultanées (chaque utilisateur effectuera un nombre important de demandes sur la connexion WebSocket et il y aura beaucoup d'utilisateurs).

Y a-t-il un meilleur moyen d'y parvenir ?

27voto

Raymond Roestenburg Points 2542

Les opérations bloquantes ne lèvent pas d'exceptions dans Akka. Vous pouvez consulter peut bloquer les appels d'un acteur (ce que vous voulez probablement minimiser, mais c'est une autre histoire).

  1. non, une instance d'acteur ne peut pas.
  2. Il ne bloquera aucun autre acteur. Vous pouvez influencer cela en utilisant un Dispatcher spécifique. Les Futures utilisent le distributeur par défaut (celui qui est normalement piloté par les événements globaux) et s'exécutent donc sur un thread dans un pool. Vous pouvez choisir le distributeur que vous voulez utiliser pour vos acteurs (par acteur, ou pour tous). Je suppose que si vous voulez vraiment créer un problème, vous pourriez être en mesure de passer exactement le même distributeur (basé sur un thread) aux futures et aux acteurs, mais cela nécessiterait une certaine intention de votre part. Je suppose que si vous avez un grand nombre de futures bloquant indéfiniment et que le service d'exécution a été configuré pour un nombre fixe de threads, vous pourriez faire exploser le service d'exécution. Donc beaucoup de 'ifs'. Un f.get ne bloque que si le Future n'est pas encore terminé. Il bloque le "thread actuel" de l'Actor à partir duquel vous l'appelez (si vous l'appelez à partir d'un Actor, ce qui n'est d'ailleurs pas nécessaire).
  3. vous ne devez pas nécessairement bloquer. vous pouvez utiliser un callback au lieu de f.get. Vous pouvez même composer des Futures sans bloquer. Consultez l'exposé de Viktor sur " l'avenir prometteur d'akka " pour plus de détails : http://skillsmatter.com/podcast/scala/talk-by-viktor-klang
  4. J'utiliserais une communication asynchrone entre les étapes (si les étapes sont des processus significatifs en soi), donc utiliser un acteur pour chaque étape, où chaque acteur envoie un message oneway au suivant, éventuellement aussi des messages oneway à un autre acteur qui ne bloquera pas et qui pourra superviser le processus. De cette façon, vous pouvez créer des chaînes d'acteurs, dont vous pouvez faire beaucoup, en face de cela, vous pouvez mettre un acteur d'équilibrage de charge, de sorte que si un acteur se bloque dans une chaîne, un autre du même type pourrait ne pas le faire dans l'autre chaîne. Cela pourrait également fonctionner pour votre question de "contexte", en transmettant la charge de travail aux acteurs locaux, en les enchaînant derrière un acteur d'équilibrage de charge.

En ce qui concerne netty (et je suppose que vous voulez dire Remote Actors, car c'est la seule chose pour laquelle netty est utilisé dans Akka), passez votre travail dès que possible à un acteur local ou à un futur (avec callback) si vous êtes préoccupé par le timing ou si vous empêchez netty de faire son travail d'une manière ou d'une autre.

10voto

paradigmatic Points 20871

Les opérations de blocage ne lèveront généralement pas d'exceptions, mais l'attente d'un futur (par exemple en utilisant l'option !! ou !!! ) peut déclencher une exception de dépassement de délai. C'est pourquoi il faut s'en tenir autant que possible à la méthode "fire-and-forget", utiliser une valeur de time-out significative et préférer les callbacks lorsque c'est possible.

  1. Un acteur akka ne peut pas explicitement traiter plusieurs messages d'affilée, mais vous pouvez jouer avec l'option throughput via le fichier de configuration. L'acteur va alors traiter plusieurs messages (i.e. sa méthode de réception sera appelée plusieurs fois séquentiellement) si sa file d'attente de messages n'est pas vide : http://akka.io/docs/akka/1.1.3/scala/dispatchers.html#id5

  2. Le blocage des opérations à l'intérieur d'un acteur ne "bloquera" pas tous les acteurs, mais si vous partagez des threads entre les acteurs (usage recommandé), l'un des threads du distributeur sera bloqué jusqu'à ce que les opérations reprennent. Essayez donc de composer des futures autant que possible et faites attention à la valeur du time-out).

3 et 4. Je suis d'accord avec les réponses de Raymond.

1voto

aij Points 408

Ce que Raymond et paradigmatique ont dit, mais aussi, si vous voulez éviter d'affamer le pool de threads, vous devriez envelopper toutes les opérations bloquantes dans scala.concurrent.blocking .

Il est bien sûr préférable d'éviter les opérations bloquantes, mais il est parfois nécessaire d'utiliser une bibliothèque qui se bloque. Si vous enveloppez ledit code dans blocking il fera savoir au contexte d'exécution que vous risquez de bloquer ce thread afin qu'il puisse en allouer un autre si nécessaire.

Le problème est pire que ce que décrit le paradigme, car si vous avez plusieurs opérations bloquantes, vous pouvez finir par bloquer tous les threads du pool de threads et ne plus avoir de threads libres. Vous pourriez aboutir à un blocage si tous vos threads sont bloqués sur quelque chose qui ne se produira pas avant qu'un autre acteur/futur soit programmé.

Voici un exemple :

import scala.concurrent.blocking
...

Future {
  val image = blocking { load\_image\_from\_potentially\_slow\_media() }
  val enhanced = image.enhance()
  blocking {
    if (oracle.queryBetter(image, enhanced)) {
      write\_new\_image(enhanced)
    }
  }
  enhanced
}

La documentation est ici .

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X