2 votes

Akka Streams ActorRefSource ordre des messages

Je veux construire une séquence d'éléments en utilisant l'ActorRefSource d'Akka Streams. Cette source est alimentée en continu avec des données. Une fois le calcul terminé, le Stream est terminé avec une pilule empoisonnée.

L'exemple simplifié suivant montre mon intention :

val source = Source.actorRef[Int](1000, OverflowStrategy.fail)
    .mapMaterializedValue{ ref =>
      for(i <- 1 to 1000) {
        ref ! i
      }

      ref ! PoisonPill
    }

    source.runWith(Sink.seq).foreach(s => println("count: "+s.size))

Je m'attendais à ce que le flux traite les 1000 éléments, puis se termine en raison de la réception de la pilule empoisonnée. Malheureusement, le Stream se termine généralement beaucoup plus tôt. Voici quelques exemples de sorties :

count: 24

Si l'on attend un certain temps avant d'envoyer la pilule empoisonnée, par exemple 1000 ms, tous les numéros seront traités.

Toute idée sur la façon de s'assurer que tous les articles ont été traités avant la réception de la pilule empoisonnée serait très appréciée.

2voto

Roland Kuhn Points 7589

Véase la documentation de Source.actorRef : PoisonPill ne vide pas le tampon avant de terminer le flux.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X