216 votes

En attente d'une liste de futurs

J'ai une méthode qui renvoie un List des contrats à terme

List<Future<O>> futures = getFutures();

Je veux maintenant attendre que tous les futures soient traités avec succès ou que l'une des tâches dont la sortie est renvoyée par un future lève une exception. Même si une tâche lève une exception, il est inutile d'attendre les autres futures.

Une approche simple consisterait à

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

Mais le problème est que si, par exemple, le quatrième futur lance une exception, j'attendrai inutilement que les trois premiers futurs soient disponibles.

Comment résoudre ce problème ? Est-ce que le count down latch peut aider d'une manière ou d'une autre ? Je ne peux pas utiliser Future isDone parce que la documentation java dit

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.

165voto

dcernahoschi Points 7214

Vous pouvez utiliser un Service d'achèvement pour recevoir les futures dès qu'ils sont prêts et, si l'un d'entre eux lève une exception, annuler le traitement. Quelque chose comme ça :

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService = 
       new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
   completionService.submit(new Callable<SomeResult>() {
       public SomeResult call() {
           ...
           return result;
       }
   });
}

int received = 0;
boolean errors = false;

while(received < 4 && !errors) {
      Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
      try {
         SomeResult result = resultFuture.get();
         received ++;
         ... // do something with the result
      }
      catch(Exception e) {
             //log
         errors = true;
      }
}

Je pense qu'il est possible d'améliorer encore le système en annulant toutes les tâches en cours d'exécution si l'une d'entre elles génère une erreur.

157voto

Andrejs Points 4235

Si vous utilisez Java 8 vous pouvez le faire plus facilement avec CompletableFuture et CompletableFuture.allOf qui n'applique le rappel qu'une fois que tous les CompletableFutures fournis sont terminés.

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(ignored -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}

81voto

sendon1982 Points 610

Utiliser un CompletableFuture dans Java 8

    // Kick of multiple, asynchronous lookups
    CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
    CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
    CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");

    // Wait until they are all done
    CompletableFuture.allOf(page1,page2,page3).join();

    logger.info("--> " + page1.get());

19voto

jmiserez Points 1507

Vous pouvez utiliser un ExecutorCompletionService . La documentation contient même un exemple pour votre cas d'utilisation exact :

Supposons plutôt que vous souhaitiez utiliser le premier résultat non nul de l'ensemble des tâches, en ignorant celles qui rencontrent des exceptions et en annulant toutes les autres tâches lorsque la première est prête :

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    int n = solvers.size();
    List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
    Result result = null;
    try {
        for (Callable<Result> s : solvers)
            futures.add(ecs.submit(s));
        for (int i = 0; i < n; ++i) {
            try {
                Result r = ecs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {
            }
        }
    } finally {
        for (Future<Result> f : futures)
            f.cancel(true);
    }

    if (result != null)
        use(result);
}

La chose importante à noter ici est que ecs.take() obtiendra le premier achevé et pas seulement la première tâche soumise. Vous devez donc les obtenir dans l'ordre de la fin de l'exécution (ou de la levée d'une exception).

3voto

Bohao LI Points 113

Si vous souhaitez combiner une liste de Futures Complétables, vous pouvez procéder comme suit :

List<CompletableFuture<Void>> futures = new ArrayList<>();
// ... Add futures to this ArrayList of CompletableFutures

// CompletableFuture.allOf() method demand a variadic arguments
// You can use this syntax to pass a List instead
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[futures.size()]));

// Wait for all individual CompletableFuture to complete
// All individual CompletableFutures are executed in parallel
allFutures.get();

Pour plus de détails sur Future & CompletableFuture, liens utiles :
1. L'avenir : https://www.baeldung.com/java-future
2. L'avenir est achevable : https://www.baeldung.com/java-completablefuture
3. L'avenir achevable : https://www.callicoder.com/java-8-completablefuture-tutorial/

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X