47 votes

Comment implémenter la priorisation des tâches à l'aide d'un ExecutorService dans Java 5?

Je suis d'exécution d'un thread mécanisme de mise en commun dans lequel j'aimerais exécuter des tâches de différentes priorités. J'aimerais avoir une belle mécanisme par lequel je peux soumettre une tâche hautement prioritaire pour le service avant de l'être programmées avant d'autres tâches. La priorité de la tâche est une propriété intrinsèque de la tâche elle-même (si j'exprime que la tâche est un Callable ou Runnable n'est pas important pour moi).

Maintenant, superficiellement, il semble que je pourrais utiliser un PriorityBlockingQueue que la tâche de la file d'attente dans mon ThreadPoolExecutor, mais que la file d'attente contient Runnable objets, qui peut ou peut ne pas être l' Runnable tâches que j'ai soumis. De plus, si je l'ai soumis Callable tâches, il n'est pas clair comment cela serait jamais la carte.

Est-il un moyen de faire cela? J'aurais vraiment préféré ne pas rouler mes propres pour cela, depuis que je suis beaucoup plus susceptibles de se tromper de cette façon.

(D'un côté, oui, je suis conscient de la possibilité de la famine pour des travaux prioritaires dans quelque chose comme cela. Des points supplémentaires (?!) pour des solutions qui ont une garantie raisonnable de l'équité)

16voto

Mike Points 6184

J'ai résolu ce problème d'une façon raisonnable, et je vais vous décrire ci-dessous pour référence future à moi-même et personne d'autre qui s'exécute ce problème avec Java Simultanées des bibliothèques.

À l'aide d'un PriorityQueue que les moyens de les accrocher à des tâches pour l'exécution ultérieure est en fait un mouvement dans la bonne direction. Le problème est que l' PriorityQueue doit être générique instancié pour contenir Runnable des cas, et il est impossible d'appeler compareTo (ou similaire) sur un Runnable interface.

Sur la résolution du problème. Lors de la création de l'Exécuteur testamentaire, il doit être donné une PriorityQueue. La file d'attente doit être donné une coutume Comparateur d'effectuer correctement en place le tri:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

Maintenant, un coup d'oeil à l' CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

Tout semble assez simple jusqu'à ce point. Cela devient un peu collant ici. Notre prochain problème est de traiter avec la création de FutureTasks de l'Exécuteur testamentaire. À l'Exécuteur, nous devons remplacer newTaskFor comme:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

c est le Callable de la tâche que nous essayons de nous exécuter. Maintenant, nous allons jeter un coup d'oeil à l' CustomFutureTask:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

Avis de l' getTask méthode. Nous allons l'utiliser plus tard pour attraper la tâche d'origine de cette CustomFutureTask que nous avons créé.

Et enfin, nous allons modifier la tâche d'origine que nous avons tenté de l'exécuter:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

Vous pouvez voir que nous mettons en oeuvre Comparable dans la tâche à déléguer à la réelle Comparator pour MyType.

Et là vous l'avez, personnalisé, de la hiérarchisation de l'Exécuteur testamentaire, en utilisant les bibliothèques Java! Il prend un peu de flexion, mais c'est le plus propre que j'ai été en mesure de venir avec. J'espère que c'est utile à quelqu'un!

8voto

Adam Jaskiewicz Points 7485

À première vue, il semblerait, vous pouvez définir une interface pour vos tâches qui s'étend au - Runnable ou Callable<T> et Comparable. Puis les envelopper d'un ThreadPoolExecutor avec un PriorityBlockingQueue que la file d'attente, et seulement accepter les tâches qui implémentent l'interface.

En prenant votre commentaire en compte, il ressemble à une option consiste à étendre ThreadPoolExecutor, et de remplacer l' submit() méthodes. Reportez-vous à l' AbstractExecutorService pour voir ce que ceux par défaut ressembler; tout ce qu'ils font est de conclure l' Runnable ou Callable en FutureTask et execute() il. Je serais probablement le faire par écrit au moyen d'un wrapper de la classe qui implémente ExecutorService et les délégués à un anonyme intérieure ThreadPoolExecutor. Les envelopper dans quelque chose qui est votre priorité, de sorte que votre Comparator pouvez obtenir à elle.

4voto

Vous pouvez utiliser ces classes d'assistance:

 public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}
 

ET

 public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}
 

ET cette méthode d'assistance:

 public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}
 

Et puis l'utiliser comme ça:

 class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}
 

0voto

willcodejavaforfood Points 20365

Serait-il possible d'avoir un ThreadPoolExecutor pour chaque niveau de priorité? Un ThreadPoolExecutor peut être instanciée avec un ThreadFactory et vous pourriez avoir votre propre mise en œuvre d'un ThreadFactory pour définir les différents niveaux de priorité.

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X