90 votes

Comment attendre plusieurs Futures ?

Supposons que j'ai plusieurs contrats à terme et que je doive attendre jusqu'à ce que soit l'un d'entre eux échoue o tous réussissent.

Par exemple : Disons qu'il y a 3 contrats à terme : f1 , f2 , f3 .

  • Si f1 réussit et f2 échoue je n'attends pas f3 (et retour échec au client).

  • Si f2 échoue alors que f1 y f3 sont toujours en cours d'exécution, je ne les attends pas (et je retourne à la maison). échec )

  • Si f1 réussit et ensuite f2 réussit, je continue à attendre f3 .

Comment la mettriez-vous en œuvre ?

88voto

cmbaxter Points 15136

À la place, vous pourriez utiliser un for-comprehension comme suit :

val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}

val aggFut = for{
  f1Result <- fut1
  f2Result <- fut2
  f3Result <- fut3
} yield (f1Result, f2Result, f3Result)

Dans cet exemple, les futures 1, 2 et 3 sont lancées en parallèle. Ensuite, dans la compréhension du for, on attend que les résultats 1 puis 2 puis 3 soient disponibles. Si 1 ou 2 échoue, on n'attendra plus 3. Si les 3 réussissent, alors le aggFut val contiendra un tuple avec 3 slots, correspondant aux résultats des 3 futures.

Maintenant, si vous avez besoin du comportement où vous voulez arrêter d'attendre si fut2 échoue d'abord, les choses deviennent un peu plus délicates. Dans l'exemple ci-dessus, vous devriez attendre que le fut1 se termine avant de réaliser que le fut2 a échoué. Pour résoudre ce problème, vous pourriez essayer quelque chose comme ceci :

  val fut1 = Future{Thread.sleep(3000);1}
  val fut2 = Promise.failed(new RuntimeException("boo")).future
  val fut3 = Future{Thread.sleep(1000);3}

  def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
    val fut = if (futures.size == 1) futures.head._2
    else Future.firstCompletedOf(futures.values)

    fut onComplete{
      case Success(value) if (futures.size == 1)=> 
        prom.success(value :: values)

      case Success(value) =>
        processFutures(futures - value, value :: values, prom)

      case Failure(ex) => prom.failure(ex)
    }
    prom.future
  }

  val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
  aggFut onComplete{
    case value => println(value)
  }

Maintenant, cela fonctionne correctement, mais le problème vient du fait de savoir quelle Future à retirer de la Map lorsque l'une d'entre elles a été menée à bien. Tant que vous avez un moyen de corréler correctement un résultat avec le Future qui a engendré ce résultat, alors quelque chose comme ça fonctionne. Il suffit de retirer récursivement de la carte les Futures terminés, puis d'appeler Future.firstCompletedOf sur les autres Futures jusqu'à ce qu'il n'y en ait plus, en recueillant les résultats en cours de route. Ce n'est pas très joli, mais si vous avez vraiment besoin du comportement dont vous parlez, alors ceci, ou quelque chose de similaire, pourrait fonctionner.

36voto

gourlaysama Points 5137

Vous pouvez utiliser une promesse, et lui envoyer soit le premier échec, soit le succès final agrégé :

def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
  val p = Promise[M[A]]()

  // the first Future to fail completes the promise
  in.foreach(_.onFailure{case i => p.tryFailure(i)})

  // if the whole sequence succeeds (i.e. no failures)
  // then the promise is completed with the aggregated success
  Future.sequence(in).foreach(p trySuccess _)

  p.future
}

Alors vous pouvez Await sur ce résultat Future si vous voulez bloquer, ou simplement map en quelque chose d'autre.

La différence avec for comprehension est qu'ici vous obtenez l'erreur de la première à échouer, alors qu'avec for comprehension vous obtenez la première erreur dans l'ordre de traversée de la collection d'entrée (même si une autre a échoué en premier). Par exemple :

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
// this waits one second, then prints "java.lang.ArithmeticException: / by zero"
// the first to fail in traversal order

Y:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
// this immediately prints "java.util.NoSuchElementException: None.get"
// the 'actual' first to fail (usually...)
// and it returns early (it does not wait 1 sec)

8voto

FranklinChen Points 101

Voici une solution sans utiliser les acteurs.

import scala.util._
import scala.concurrent._
import java.util.concurrent.atomic.AtomicInteger

// Nondeterministic.
// If any failure, return it immediately, else return the final success.
def allSucceed[T](fs: Future[T]*): Future[T] = {
  val remaining = new AtomicInteger(fs.length)

  val p = promise[T]

  fs foreach {
    _ onComplete {
      case s @ Success(_) => {
        if (remaining.decrementAndGet() == 0) {
          // Arbitrarily return the final success
          p tryComplete s
        }
      }
      case f @ Failure(_) => {
        p tryComplete f
      }
    }
  }

  p.future
}

5voto

Robin Green Points 12926

Pour ce faire, j'utiliserais un acteur Akka. Contrairement au for-comprehension, il échoue dès que l'un des futurs échoue, il est donc un peu plus efficace dans ce sens.

class ResultCombiner(futs: Future[_]*) extends Actor {

  var origSender: ActorRef = null
  var futsRemaining: Set[Future[_]] = futs.toSet

  override def receive = {
    case () =>
      origSender = sender
      for(f <- futs)
        f.onComplete(result => self ! if(result.isSuccess) f else false)
    case false =>
      origSender ! SomethingFailed
    case f: Future[_] =>
      futsRemaining -= f
      if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
  }

}

sealed trait Result
case object SomethingFailed extends Result
case object EverythingSucceeded extends Result

Ensuite, créez l'acteur, envoyez-lui un message (pour qu'il sache où envoyer sa réponse) et attendez une réponse.

val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
try {
  val f4: Future[Result] = actor ? ()
  implicit val timeout = new Timeout(30 seconds) // or whatever
  Await.result(f4, timeout.duration).asInstanceOf[Result] match {
    case SomethingFailed => println("Oh noes!")
    case EverythingSucceeded => println("It all worked!")
  }
} finally {
  // Avoid memory leaks: destroy the actor
  actor ! PoisonPill
}

5voto

Rex Kerr Points 94401

Vous pouvez le faire uniquement avec les futurs. Voici une implémentation. Notez qu'elle ne met pas fin à l'exécution de manière anticipée ! Dans ce cas, vous devez faire quelque chose de plus sophistiqué (et probablement implémenter l'interruption vous-même). Mais si vous ne voulez pas continuer à attendre quelque chose qui ne va pas fonctionner, la clé est de continuer à attendre que la première chose se termine, et de s'arrêter quand il ne reste plus rien ou quand vous rencontrez une exception :

import scala.annotation.tailrec
import scala.util.{Try, Success, Failure}
import scala.concurrent._
import scala.concurrent.duration.Duration
import ExecutionContext.Implicits.global

@tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): 
Either[Throwable, Seq[A]] = {
  val first = Future.firstCompletedOf(fs)
  Await.ready(first, Duration.Inf).value match {
    case None => awaitSuccess(fs, done)  // Shouldn't happen!
    case Some(Failure(e)) => Left(e)
    case Some(Success(_)) =>
      val (complete, running) = fs.partition(_.isCompleted)
      val answers = complete.flatMap(_.value)
      answers.find(_.isFailure) match {
        case Some(Failure(e)) => Left(e)
        case _ =>
          if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done)
          else Right( answers.map(_.get) ++: done )
      }
  }
}

Voici un exemple en action lorsque tout fonctionne bien :

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); println("Fancy meeting you here!") },
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
Fancy meeting you here!
Bye!
res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))

Mais quand quelque chose va mal :

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); throw new Exception("boo"); () }, 
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo)

scala> Bye!

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X