216 votes

En attente d'une liste de futurs

J'ai une méthode qui renvoie un List des contrats à terme

List<Future<O>> futures = getFutures();

Je veux maintenant attendre que tous les futures soient traités avec succès ou que l'une des tâches dont la sortie est renvoyée par un future lève une exception. Même si une tâche lève une exception, il est inutile d'attendre les autres futures.

Une approche simple consisterait à

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

Mais le problème est que si, par exemple, le quatrième futur lance une exception, j'attendrai inutilement que les trois premiers futurs soient disponibles.

Comment résoudre ce problème ? Est-ce que le count down latch peut aider d'une manière ou d'une autre ? Je ne peux pas utiliser Future isDone parce que la documentation java dit

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.

3voto

djechelon Points 8092

Si vous utilisez Java 8 et que vous ne souhaitez pas manipuler les fichiers CompletableFuture j'ai écrit un outil permettant de récupérer les résultats d'un List<Future<T>> en utilisant la diffusion en continu. L'essentiel est que vous n'ayez pas le droit de map(Future::get) qu'il lance.

public final class Futures
{

    private Futures()
    {}

    public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
    {
        return new FutureCollector<>();
    }

    private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
    {
        private final List<Throwable> exceptions = new LinkedList<>();

        @Override
        public Supplier<Collection<T>> supplier()
        {
            return LinkedList::new;
        }

        @Override
        public BiConsumer<Collection<T>, Future<T>> accumulator()
        {
            return (r, f) -> {
                try
                {
                    r.add(f.get());
                }
                catch (InterruptedException e)
                {}
                catch (ExecutionException e)
                {
                    exceptions.add(e.getCause());
                }
            };
        }

        @Override
        public BinaryOperator<Collection<T>> combiner()
        {
            return (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            };
        }

        @Override
        public Function<Collection<T>, List<T>> finisher()
        {
            return l -> {

                List<T> ret = new ArrayList<>(l);
                if (!exceptions.isEmpty())
                    throw new AggregateException(exceptions, ret);

                return ret;
            };

        }

        @Override
        public Set<java.util.stream.Collector.Characteristics> characteristics()
        {
            return java.util.Collections.emptySet();
        }
    }

Cela nécessite un AggregateException qui fonctionne comme la fonction

public class AggregateException extends RuntimeException
{
    /**
     *
     */
    private static final long serialVersionUID = -4477649337710077094L;

    private final List<Throwable> causes;
    private List<?> successfulElements;

    public AggregateException(List<Throwable> causes, List<?> l)
    {
        this.causes = causes;
        successfulElements = l;
    }

    public AggregateException(List<Throwable> causes)
    {
        this.causes = causes;
    }

    @Override
    public synchronized Throwable getCause()
    {
        return this;
    }

    public List<Throwable> getCauses()
    {
        return causes;
    }

    public List<?> getSuccessfulElements()
    {
        return successfulElements;
    }

    public void setSuccessfulElements(List<?> successfulElements)
    {
        this.successfulElements = successfulElements;
    }

}

Ce composant agit exactement comme le composant Task.WaitAll . Je travaille sur une variante qui fait la même chose que CompletableFuture.allOf (équivalent à Task.WhenAll )

La raison pour laquelle j'ai fait cela est que j'utilise la fonction ListenableFuture et ne veulent pas porter sur CompletableFuture bien qu'il s'agisse d'une méthode plus standard

1voto

Brixomatic Points 183

J'ai une classe utilitaire qui contient ces éléments :

@FunctionalInterface
public interface CheckedSupplier<X> {
  X get() throws Throwable;
}

public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
    return () -> {
        try {
            return supplier.get();
        } catch (final Throwable checkedException) {
            throw new IllegalStateException(checkedException);
        }
    };
}

Une fois que vous avez obtenu cela, en utilisant une importation statique, vous pouvez simplement attendre tous les futurs comme ceci :

futures.stream().forEach(future -> uncheckedSupplier(future::get).get());

vous pouvez également collecter tous leurs résultats de cette manière :

List<MyResultType> results = futures.stream()
    .map(future -> uncheckedSupplier(future::get).get())
    .collect(Collectors.toList());

Je reviens sur mon ancien message et je remarque que vous avez eu un autre chagrin :

Mais le problème est que si, par exemple, le quatrième futur lance une exception, j'attendrai inutilement que les trois premiers futurs soient disponibles.

Dans ce cas, la solution la plus simple est de procéder en parallèle :

futures.stream().parallel()
 .forEach(future -> uncheckedSupplier(future::get).get());

De cette façon, la première exception, bien qu'elle n'arrête pas le futur, interrompt l'instruction forEach, comme dans l'exemple en série, mais comme toutes les instructions attendent en parallèle, il n'est pas nécessaire d'attendre que les trois premières soient terminées.

1voto

Farhad Baghirov Points 67
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class Stack2 {   
    public static void waitFor(List<Future<?>> futures) {
        List<Future<?>> futureCopies = new ArrayList<Future<?>>(futures);//contains features for which status has not been completed
        while (!futureCopies.isEmpty()) {//worst case :all task worked without exception, then this method should wait for all tasks
            Iterator<Future<?>> futureCopiesIterator = futureCopies.iterator();
            while (futureCopiesIterator.hasNext()) {
                Future<?> future = futureCopiesIterator.next();
                if (future.isDone()) {//already done
                    futureCopiesIterator.remove();
                    try {
                        future.get();// no longer waiting
                    } catch (InterruptedException e) {
                        //ignore
                        //only happen when current Thread interrupted
                    } catch (ExecutionException e) {
                        Throwable throwable = e.getCause();// real cause of exception
                        futureCopies.forEach(f -> f.cancel(true));//cancel other tasks that not completed
                        return;
                    }
                }
            }
        }
    }
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        Runnable runnable1 = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                }
            }
        };
        Runnable runnable2 = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                }
            }
        };

        Runnable fail = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(1000);
                    throw new RuntimeException("bla bla bla");
                } catch (InterruptedException e) {
                }
            }
        };

        List<Future<?>> futures = Stream.of(runnable1,fail,runnable2)
                .map(executorService::submit)
                .collect(Collectors.toList());

        double start = System.nanoTime();
        waitFor(futures);
        double end = (System.nanoTime()-start)/1e9;
        System.out.println(end +" seconds");

    }
}

0voto

Ohad Bitton Points 85

C'est ce que j'utilise pour attendre un certain moment sur une liste de contrats à terme. Je pense que c'est plus propre.

CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
// Some parallel work
        for (Something tp : somethings) {
            completionService.submit(() -> {
                try {
                   work(something)
                } catch (ConnectException e) {
                } finally {
                    countDownLatch.countDown();
                }
            });
        }    
  try {
        if (!countDownLatch.await(secondsToWait, TimeUnit.SECONDS)){
        }
    } catch (InterruptedException e) {
    }

0voto

typeracer Points 1

Une solution basée sur Guava peut être mise en œuvre en utilisant Futures.FutureCombiner .

Voici l'exemple de code donné dans la javadoc :

 final ListenableFuture<Instant> loginDateFuture =
     loginService.findLastLoginDate(username);
 final ListenableFuture<List<String>> recentCommandsFuture =
     recentCommandsService.findRecentCommands(username);
 ListenableFuture<UsageHistory> usageFuture =
     Futures.whenAllSucceed(loginDateFuture, recentCommandsFuture)
         .call(
             () ->
                 new UsageHistory(
                     username,
                     Futures.getDone(loginDateFuture),
                     Futures.getDone(recentCommandsFuture)),
             executor);

Pour plus d'informations, voir le site L'avenir écoutable expliqué du guide de l'utilisateur.

Si vous êtes curieux de savoir comment cela fonctionne sous le capot, je vous suggère de consulter cette partie du code source : AggregateFuture.java#L127-L186

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X