216 votes

En attente d'une liste de futurs

J'ai une méthode qui renvoie un List des contrats à terme

List<Future<O>> futures = getFutures();

Je veux maintenant attendre que tous les futures soient traités avec succès ou que l'une des tâches dont la sortie est renvoyée par un future lève une exception. Même si une tâche lève une exception, il est inutile d'attendre les autres futures.

Une approche simple consisterait à

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

Mais le problème est que si, par exemple, le quatrième futur lance une exception, j'attendrai inutilement que les trois premiers futurs soient disponibles.

Comment résoudre ce problème ? Est-ce que le count down latch peut aider d'une manière ou d'une autre ? Je ne peux pas utiliser Future isDone parce que la documentation java dit

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.

0voto

Druckles Points 435

Pour tous ceux qui utilisent le logiciel Vavr Future Vous pouvez soit les attendre tous de cette manière :

static <T> Optional<Future<T>> waitForAll(Collection<Future<T>> futures) {
  return futures.stream()
      .reduce((last, next) -> last.flatMap(ignored -> next));

Ou si vous avez une valeur par défaut au cas où il n'y aurait pas de futur dans la collection :

static <T> Future<T> waitForAll(Collection<Future<T>> futures, T defaultValue) {
  return futures.stream()
      .reduce(Future.successful(defaultValue), (last, next) -> last.flatMap(ignored -> next));
}

Cette procédure s'applique à tous les avenirs, qu'il y ait eu ou non un échec.


Pour revenir dès qu'il y a une défaillance, changez la fonction de votre accumulateur en :

(last, next) -> Future.firstCompletedOf(List.of(last, next))
    .flatMap(v -> last.flatMap(ignored -> next));

Comme nous n'avons que deux éléments dans notre fonction de fusion, nous pouvons attendre que l'un ou l'autre se termine (la fonction de fusion de Vavr firstCompletedOf ). S'il a échoué, il sera ignoré flatMap et renvoyer le futur qui a échoué. S'il a réussi (quel qu'il soit), il tombe dans le premier avenir flatMap où l'on attend aussi que l'autre se termine.

Cela fonctionne quelle que soit la longueur de la collection de contrats à terme, car l'accumulateur les associe tous :

accumulate(accumulate(accumulate(1, 2), 3), 4)

donde accumulate fait le "attendre les deux à moins que l'un d'eux ne soit défaillant".

Avertissement : Cela n'arrêtera pas l'exécution sur les autres threads.

0voto

Peut-être que ceci pourrait aider (rien ne remplacerait le fil brut, ouais !) Je suggère d'exécuter chaque Future avec un thread séparé (ils fonctionnent en parallèle), puis lorsque l'un d'entre eux obtient une erreur, il signale simplement au manager( Handler ).

class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
  thisThread=Thread.currentThread();
  List<Future<Object>> futures = getFutures();
  trds=new Thread[futures.size()];
  for (int i = 0; i < trds.length; i++) {
    RunTask rt=new RunTask(futures.get(i), this);
    trds[i]=new Thread(rt);
  }
  synchronized (this) {
    for(Thread tx:trds){
      tx.start();
    }  
  }
  for(Thread tx:trds){
    try {tx.join();
    } catch (InterruptedException e) {
      System.out.println("Job failed!");break;
    }
  }if(!failed){System.out.println("Job Done");}
}

private List<Future<Object>> getFutures() {
  return null;
}

public synchronized void cancelOther(){if(failed){return;}
  failed=true;
  for(Thread tx:trds){
    tx.stop();//Deprecated but works here like a boss
  }thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}

Je dois dire que le code ci-dessus serait une erreur (je n'ai pas vérifié), mais j'espère que j'ai pu expliquer la solution.

0voto

fl0w Points 21

Le CompletionService prendra vos Callables avec la méthode .submit() et vous pourrez récupérer les futures calculés avec la méthode .take().

Une chose que vous ne devez pas oublier est de mettre fin au service d'exécution en appelant la méthode .shutdown(). De plus, vous ne pouvez appeler cette méthode que si vous avez sauvegardé une référence au service d'exécution ; veillez donc à en conserver une.

Exemple de code - Pour un nombre fixe d'éléments de travail à traiter en parallèle :

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

CompletionService<YourCallableImplementor> completionService = 
new ExecutorCompletionService<YourCallableImplementor>(service);

ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();

for (String computeMe : elementsToCompute) {
    futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;

while(received < elementsToCompute.size()) {
 Future<YourCallableImplementor> resultFuture = completionService.take(); 
 YourCallableImplementor result = resultFuture.get();
 received ++;
}
//important: shutdown your ExecutorService
service.shutdown();

Exemple de code - Pour un nombre dynamique d'éléments de travail à traiter en parallèle :

public void runIt(){
    ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
    ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();

    //Initial workload is 8 threads
    for (int i = 0; i < 9; i++) {
        futures.add(completionService.submit(write.new CallableImplementor()));             
    }
    boolean finished = false;
    while (!finished) {
        try {
            Future<CallableImplementor> resultFuture;
            resultFuture = completionService.take();
            CallableImplementor result = resultFuture.get();
            finished = doSomethingWith(result.getResult());
            result.setResult(null);
            result = null;
            resultFuture = null;
            //After work package has been finished create new work package and add it to futures
            futures.add(completionService.submit(write.new CallableImplementor()));
        } catch (InterruptedException | ExecutionException e) {
            //handle interrupted and assert correct thread / work packet count              
        } 
    }

    //important: shutdown your ExecutorService
    service.shutdown();
}

public class CallableImplementor implements Callable{
    boolean result;

    @Override
    public CallableImplementor call() throws Exception {
        //business logic goes here
        return this;
    }

    public boolean getResult() {
        return result;
    }

    public void setResult(boolean result) {
        this.result = result;
    }
}

0voto

Mohamed.Abdo Points 85
 /**
     * execute suppliers as future tasks then wait / join for getting results
     * @param functors a supplier(s) to execute
     * @return a list of results
     */
    private List getResultsInFuture(Supplier<?>... functors) {
        CompletableFuture[] futures = stream(functors)
                .map(CompletableFuture::supplyAsync)
                .collect(Collectors.toList())
                .toArray(new CompletableFuture[functors.length]);
        CompletableFuture.allOf(futures).join();
        return stream(futures).map(a-> {
            try {
                return a.get();
            } catch (InterruptedException | ExecutionException e) {
                //logger.error("an error occurred during runtime execution a function",e);
                return null;
            }
        }).collect(Collectors.toList());
    };

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X