66 votes

Scala Futures - délai d'attente intégré ?

Il y a un aspect des futures que je ne comprends pas exactement à partir du tutoriel officiel réf. http://docs.scala-lang.org/overviews/core/futures.html

Les contrats à terme en Scala ont-ils un mécanisme de temporisation intégré ? Disons que l'exemple ci-dessous était un fichier texte de 5 gigaoctets... est-ce que la portée implicite de "Implicits.global" fait que onFailure se déclenche de manière non bloquante ou est-ce que cela peut être défini ? Et sans une sorte de délai d'attente par défaut, cela ne signifierait-il pas qu'il est possible que ni le succès ni l'échec ne se déclenchent jamais ?

import scala.concurrent._
import ExecutionContext.Implicits.global

val firstOccurence: Future[Int] = future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
  case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
  case t => println("Could not process file: " + t.getMessage)
}

0 votes

Voir awaitAll

4 votes

Gardez à l'esprit qu'aucune de ces solutions n'arrêtera réellement l'épidémie. Future de la course. Le seul endroit où vous pouvez arrêter un Future est de l'intérieur.

1 votes

@NikitaVolkov Votre lien ne fonctionne plus. J'ai essayé de trouver le lien correct mais j'ai échoué.

73voto

cmbaxter Points 15136

Vous n'obtenez un comportement de dépassement de délai que si vous utilisez le blocage pour obtenir les résultats de la méthode Future . Si vous voulez utiliser les callbacks non bloquants onComplete , onSuccess o onFailure vous devrez alors mettre en place votre propre gestion des délais d'attente. Akka a intégré la gestion des délais d'attente pour les requêtes/réponses ( ? ) entre acteurs, mais vous n'êtes pas certain de vouloir commencer à utiliser Akka. FWIW, dans Akka, pour la gestion du timeout, ils composent deux Futures ensemble via Future.firstCompletedOf L'un représente la tâche asynchrone actuelle et l'autre le délai d'attente. Si le délai d'attente (via un HashedWheelTimer ) pops d'abord, vous obtenez un échec sur le callback asynchrone.

Un exemple très simplifié de l'utilisation d'un rouleau de papier pourrait ressembler à ceci. Tout d'abord, un objet pour planifier les délais d'attente :

import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException

object TimeoutScheduler{
  val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
  def scheduleTimeout(promise:Promise[_], after:Duration) = {
    timer.newTimeout(new TimerTask{
      def run(timeout:Timeout){              
        promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))        
      }
    }, after.toNanos, TimeUnit.NANOSECONDS)
  }
}

Puis une fonction pour prendre un futur et lui ajouter un comportement de temporisation :

import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration

def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
  val prom = Promise[T]()
  val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
  val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
  fut onComplete{case result => timeout.cancel()}
  combinedFut
}

Notez que le HashedWheelTimer J'utilise ici est de Netty.

3 votes

Merci beaucoup ! Pouvez-vous me donner un conseil général sur la façon de gérer les futures (après coup). Je suis en train de lire sur Akka, et divers paquets HTTP pour Scala qui utilisent des futures. Il semble qu'à un moment donné, pour utiliser un Future, un événement bloquant doit se produire à ce moment-là ou abandonner le processus mais de nombreux tutoriels semblent se concentrer sur l'appel non bloquant plutôt que de faire quelque chose de pratique avec lui après coup ?

0 votes

Il est tout à fait possible de construire une logique autour de l'utilisation non bloquante des Futures, et je suggère de pencher dans cette direction car les performances sont nettement meilleures. Par exemple, nous utilisons Unfiltered pour notre couche HTTP/REST. Les appels entrent et vont vers les acteurs Akka pour être servis. Nous utilisons la méthode non-bloquante onComplete sur le futur renvoyé par l'appel à l'acteur et ensuite compléter la requête HTTP asynchrone Netty non filtrée. Ce n'est qu'un exemple (bien que peu détaillé) de l'utilisation des callbacks non bloquants pour quelque chose de réel.

0 votes

@cmbaxter - merci pour cette information. J'ai essayé de le mettre en œuvre (BTW - problèmes de licence de fourmis ici) mais cela ne semble pas fonctionner. J'essaie d'utiliser avec des futurs liés en utilisant la carte sur le premier futur. 1. Puis-je avoir 2 délais d'attente ? J'ai essayé d'appeler quelque chose comme ceci mais cela ne semble rien faire : TimeoutScheduler.withTimeout(createVMFuture)(global,2 secondes).recover{ case ex:TimeoutException => { logError("create vm timed out") }

27voto

justinhj Points 5060

Toutes ces réponses nécessitent des dépendances supplémentaires. J'ai décidé d'écrire une version utilisant java.util.Timer qui est un moyen efficace d'exécuter une fonction dans le futur, dans ce cas pour déclencher un timeout.

Blog post avec plus de détails ici

En utilisant ceci avec Promise de Scala, nous pouvons créer un Future avec timeout comme suit :

package justinhj.concurrency

import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps

object FutureUtil {

  // All Future's that use futureWithTimeout will use the same Timer object
  // it is thread safe and scales to thousands of active timers
  // The true parameter ensures that timeout timers are daemon threads and do not stop
  // the program from shutting down

  val timer: Timer = new Timer(true)

  /**
    * Returns the result of the provided future within the given time or a timeout exception, whichever is first
    * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
    * Thread.sleep would
    * @param future Caller passes a future to execute
    * @param timeout Time before we return a Timeout exception instead of future's outcome
    * @return Future[T]
    */
  def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {

    // Promise will be fulfilled with either the callers Future or the timer task if it times out
    val p = Promise[T]

    // and a Timer task to handle timing out

    val timerTask = new TimerTask() {
      def run() : Unit = {
            p.tryFailure(new TimeoutException())
        }
      }

    // Set the timeout to check in the future
    timer.schedule(timerTask, timeout.toMillis)

    future.map {
      a =>
        if(p.trySuccess(a)) {
          timerTask.cancel()
        }
    }
    .recover {
      case e: Exception =>
        if(p.tryFailure(e)) {
          timerTask.cancel()
        }
    }

    p.future
  }

}

0 votes

J'ai upvoted la réponse. Une chose mineure est que p n'a pas besoin d'être mutable. A val suffirait.

0 votes

Merci, j'ai fait le changement ici.

0 votes

Cette solution empêche-t-elle réellement le Future en arrière-plan de s'exécuter après le délai d'attente ? .

23voto

Pablo Fernandez Points 32003

Je viens de créer un TimeoutFuture pour un collègue de travail :

TimeoutFuture

package model

import scala.concurrent._
import scala.concurrent.duration._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits._

object TimeoutFuture {
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {

    val prom = promise[A]

    // timeout logic
    Akka.system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future { 
      prom success block
    }

    prom.future
  } 
}

Utilisation

val future = TimeoutFuture(10 seconds) { 
  // do stuff here
}

future onComplete {
  case Success(stuff) => // use "stuff"
  case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
}

Notes :

  • Suppose le cadre Play ! (mais il est assez facile de l'adapter).
  • Chaque morceau de code s'exécute dans le même ExecutionContext ce qui n'est pas forcément idéal.

0 votes

Un problème que j'ai rencontré avec cette implémentation était que si le bloc jetait une exception, et que je faisais un Await.result(future, 5 secondes), une TimeoutException serait lancée plutôt que l'exception sous-jacente. J'exécute cette implémentation sous scala 2.11, donc je ne sais pas pourquoi le bloc de réussite de la promesse ne compléterait pas le Future si le bloc lançait une exception. La façon dont j'ai contourné ce problème a été de faire un try-catch sur le bloc prom success, et de faire prom failure e dans le catch handler.

0 votes

En y regardant de plus près, je pense que le problème est que si le bloc lève une exception, elle n'est pas écrite dans le journal.

0 votes

@pablo Comme vous l'avez mentionné, cela nécessite le framework play, et si je l'inclus directement dans une application scala, le message "There is no started application" est lancé, ce qui m'oblige à démarrer une application play, etc (ce que je ne veux pas faire). Est-ce qu'il existe un autre programmateur disponible pour exécuter ceci, car j'ai besoin de la fonctionnalité de futur avec timeout dans le code scala. Je suis assez novice en Scala.

6voto

Kir Points 61

Le framework Play contient Promise.timeout, vous pouvez donc écrire un code comme le suivant

private def get(): Future[Option[Boolean]] = {
  val timeoutFuture = Promise.timeout(None, Duration("1s"))
  val mayBeHaveData = Future{
    // do something
    Some(true)
  }

  // if timeout occurred then None will be result of method
  Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
}

6voto

Raul Points 571

Je suis assez surpris que cela ne soit pas standard en Scala. Mes versions sont courtes et n'ont pas de dépendances

import scala.concurrent.Future

sealed class TimeoutException extends RuntimeException

object FutureTimeout {

  import scala.concurrent.ExecutionContext.Implicits.global

  implicit class FutureTimeoutLike[T](f: Future[T]) {
    def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
      Thread.sleep(ms)
      throw new TimeoutException
    }))

    lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
  }

}

Exemple d'utilisation

import FutureTimeout._
Future { /* do smth */ } withTimeout

1 votes

N'oubliez pas que cela n'empêchera PAS le futur sous-jacent de s'exécuter ! Cela ne peut se faire qu'à l'intérieur du futur.

1 votes

@ScalaWilliam des suggestions ? Le blocage à l'intérieur d'un futur est considéré comme sûr.

0 votes

La réponse de Pablo Fernandez est la bonne façon de procéder.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X