Je veux construire une séquence d'éléments en utilisant l'ActorRefSource d'Akka Streams. Cette source est alimentée en continu avec des données. Une fois le calcul terminé, le Stream est terminé avec une pilule empoisonnée.
L'exemple simplifié suivant montre mon intention :
val source = Source.actorRef[Int](1000, OverflowStrategy.fail)
.mapMaterializedValue{ ref =>
for(i <- 1 to 1000) {
ref ! i
}
ref ! PoisonPill
}
source.runWith(Sink.seq).foreach(s => println("count: "+s.size))
Je m'attendais à ce que le flux traite les 1000 éléments, puis se termine en raison de la réception de la pilule empoisonnée. Malheureusement, le Stream se termine généralement beaucoup plus tôt. Voici quelques exemples de sorties :
count: 24
Si l'on attend un certain temps avant d'envoyer la pilule empoisonnée, par exemple 1000 ms, tous les numéros seront traités.
Toute idée sur la façon de s'assurer que tous les articles ont été traités avant la réception de la pilule empoisonnée serait très appréciée.