105 votes

ExecutorService qui interrompt les tâches après un délai d'attente

Je suis à la recherche d'un ExecutorService de mise en œuvre qui peut être fourni avec un délai d'attente. Les tâches qui sont soumis à l'ExecutorService sont interrompus si ils prennent plus de temps que le délai pour s'exécuter. L'application d'un tel engin n'est pas une tâche difficile, mais je me demandais si quelqu'un sait d'une mise en œuvre.

Voici ce que j'ai trouvé basée sur quelques éléments de la discussion ci-dessous. Tous les commentaires?

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        timeoutExecutor.shutdown();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
        }
    }
}

94voto

John Vint Points 19804

Vous pouvez utiliser un ScheduledExecutorService pour cela. D'abord vous devez soumettre qu'une seule fois pour commencer immidiatly et de conserver l'avenir qui est créé. Après cela, vous pouvez soumettre une nouvelle tâche qui permettrait d'annuler l'oubli de l'avenir après une certaine période de temps.

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
 final Future handler = executor.submit(new Callable(){ ... });
 executor.schedule(new Runnable(){
     public void run(){
         handler.cancel();
     }      
 }, 10000, TimeUnit.MILLISECONDS);

Cela permettra de exécuter votre gestionnaire (principale fonctionnalité à être interrompue) pendant 10 secondes, puis annuler (c'est à dire d'interruption) qui tâche spécifique.

6voto

Flavio Points 5369

Malheureusement, la solution est erronée. Il y a une sorte de bug avec ScheduledThreadPoolExecutor, a également signalé dans cette question: l'annulation d'une tâche de ne pas libérer les ressources de la mémoire associé à la tâche; les ressources sont publiées uniquement lorsque la tâche expire.

Si vous avez donc créer un TimeoutThreadPoolExecutor avec un assez long temps d'expiration (une utilisation typique), et de soumettre des tâches assez rapidement, vous vous retrouvez de remplissage de la mémoire - même si les tâches effectivement accomplies avec succès.

Vous pouvez voir le problème avec la suivante (très rudimentaire) programme de test:

public static void main(String[] args) throws InterruptedException {
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
            new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
    //ExecutorService service = Executors.newFixedThreadPool(1);
    try {
        final AtomicInteger counter = new AtomicInteger();
        for (long i = 0; i < 10000000; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    counter.incrementAndGet();
                }
            });
            if (i % 10000 == 0) {
                System.out.println(i + "/" + counter.get());
                while (i > counter.get()) {
                    Thread.sleep(10);
                }
            }
        }
    } finally {
        service.shutdown();
    }
}

Le programme épuise la mémoire disponible, bien qu'il attend pour le pondu Runnables à compléter.

J'ai bien sur ce sujet pendant un certain temps, mais malheureusement je ne pouvais pas arriver à une bonne solution.

EDIT: J'ai découvert ce problème a été signalé comme JDK bug 6602600, et semble avoir été résolu très récemment.

5voto

ZZ Coder Points 36990

Emballez la tâche dans FutureTask et vous pouvez spécifier un délai d'expiration pour FutureTask. Regardez l'exemple dans ma réponse à cette question,

http://stackoverflow.com/questions/1247390/java-native-process- timeout/1249984#1249984

1voto

Giovanni Botta Points 2674

Pourquoi ne pas utiliser la méthode ExecutorService.shutDownNow() décrite dans http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html ? Cela semble être la solution la plus simple.

1voto

Sergey Points 1

Il semble que le problème n'est pas dans le JDK bug 6602600 ( il a été résolu à 2010-05-22), mais dans appel incorrect de sommeil(10) en forme de cercle. Outre noter que le Thread principal doit donner directement la CHANCE à d'autres threads pour réaliser leurs tâches par invoquer le SOMMEIL(0) CHAQUE branche du cercle extérieur. C'est mieux, je pense, pour utiliser le Thread.rendement() au lieu de Fil.sleep(0)

Le résultat corrigé partie du problème précédent code comme ceci:

.......................
........................
Thread.yield();         

if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
}

//                
//                while (i > counter.get()) {
//                    Thread.sleep(10);
//                } 

Il fonctionne correctement avec le montant de l'extérieur du compteur jusqu'à 150 000 000 testé cercles.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X