50 votes

Assurer l'ordre d'exécution des tâches dans le pool de threads

J'ai lu sur le thread du pool de modèle et je n'arrive pas à trouver la solution habituelle pour le problème suivant.

J'ai parfois envie de tâches doivent être exécutées en série. Par exemple, j'ai lu des morceaux de texte à partir d'un fichier et pour une raison que j'ai besoin de le des morceaux à être traitées dans l'ordre. Donc, fondamentalement, je veux éliminer la simultanéité de certaines tâches.

Considérons ce scénario où les tâches avec * doivent être traitées dans l'ordre où ils ont été poussés dans. Les autres tâches peuvent être traitées dans n'importe quel ordre.

push task1
push task2
push task3   *
push task4   *
push task5
push task6   *
....
and so on

Dans le contexte d'un thread-piscine, sans cette contrainte, une seule file d'attente des tâches fonctionne très bien mais clairement ici qu'il ne l'est pas.

J'ai pensé avoir des threads fonctionnent sur un thread spécifique de la file d'attente et les autres sur le "global" de la file d'attente. Ensuite, pour l'exécution de certaines tâches en série, je n'ai qu'à pousser sur une file d'attente où un seul thread semble. Il ne semble un peu maladroit.

Donc, la vraie question dans cette longue histoire: comment voulez-vous résoudre ce problème ? Comment vous assurez-vous de ces tâches sont ordonnées?

MODIFIER

Comme un problème plus général, supposons que le scénario ci-dessus devient

push task1
push task2   **
push task3   *
push task4   *
push task5
push task6   *
push task7   **
push task8   *
push task9
....
and so on

Ce que je veux dire, c'est que les tâches au sein d'un groupe doivent être exécutées de manière séquentielle, mais les groupes eux-mêmes peuvent se mélanger. Vous pouvez donc avoir 3-2-5-4-7 par exemple.

Une autre chose à noter est que je n'ai pas accès à toutes les tâches d'un groupe à l'avance (et je ne peux pas attendre pour tous d'arriver avant le départ du groupe).

Je vous remercie pour votre temps.

17voto

Tim Lloyd Points 23571

Quelque chose comme ce qui suit va permettre de série et en parallèle les tâches en file d'attente, où la série tâches seront exécutées l'une après l'autre, et en parallèle les tâches seront exécutées dans n'importe quel ordre, mais en parallèle. Cela vous donne la possibilité de sérialiser des tâches, le cas échéant, ont des tâches parallèles, mais de faire ce que les tâches sont reçus c'est à dire que vous n'avez pas besoin de savoir à propos de l'ensemble de la séquence, en exécution de l'ordre est maintenu de façon dynamique.

internal class TaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Queue<QTask> _tasks = new Queue<QTask>();
    private int _runningTaskCount;

    public void Queue(bool isParallel, Action task)
    {
        lock (_syncObj)
        {
            _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
        }

        ProcessTaskQueue();
    }

    public int Count
    {
        get{lock (_syncObj){return _tasks.Count;}}
    }

    private void ProcessTaskQueue()
    {
        lock (_syncObj)
        {
            if (_runningTaskCount != 0) return;

            while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
            {
                QTask parallelTask = _tasks.Dequeue();

                QueueUserWorkItem(parallelTask);
            }

            if (_tasks.Count > 0 && _runningTaskCount == 0)
            {
                QTask serialTask = _tasks.Dequeue();

                QueueUserWorkItem(serialTask);
            }
        }
    }

    private void QueueUserWorkItem(QTask qTask)
    {
        Action completionTask = () =>
        {
            qTask.Task();

            OnTaskCompleted();
        };

        _runningTaskCount++;

        ThreadPool.QueueUserWorkItem(_ => completionTask());
    }

    private void OnTaskCompleted()
    {
        lock (_syncObj)
        {
            if (--_runningTaskCount == 0)
            {
                ProcessTaskQueue();
            }
        }
    }

    private class QTask
    {
        public Action Task { get; set; }
        public bool IsParallel { get; set; }
    }
}

Mise à jour

Pour gérer des groupes de travail avec des en série et en parallèle des tâches de mélanges, un GroupedTaskQueue peut gérer un TaskQueue pour chaque groupe. Encore une fois, vous n'avez pas besoin de savoir à propos des groupes, il en est tout géré dynamiquement, comme les tâches sont reçus.

internal class GroupedTaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
    private readonly string _defaultGroup = Guid.NewGuid().ToString();

    public void Queue(bool isParallel, Action task)
    {
        Queue(_defaultGroup, isParallel, task);
    }

    public void Queue(string group, bool isParallel, Action task)
    {
        TaskQueue queue;

        lock (_syncObj)
        {
            if (!_queues.TryGetValue(group, out queue))
            {
                queue = new TaskQueue();

                _queues.Add(group, queue);
            }
        }

        Action completionTask = () =>
        {
            task();

            OnTaskCompleted(group, queue);
        };

        queue.Queue(isParallel, completionTask);
    }

    private void OnTaskCompleted(string group, TaskQueue queue)
    {
        lock (_syncObj)
        {
            if (queue.Count == 0)
            {
                _queues.Remove(group);
            }
        }
    }
}

13voto

Anthony Williams Points 28904

Les pools de threads sont bons pour les cas où l'ordre relatif des tâches n'a pas d'importance, à condition de tout faire. En particulier, il doit être OK pour eux tous d'être menés en parallèle.

Si vos tâches doivent être réalisées dans un ordre précis, alors qu'ils ne sont pas adaptés pour le parallélisme, donc un pool de threads n'est pas approprié.

Si vous souhaitez déplacer cette série de tâches le thread principal, puis à un seul thread d'arrière-plan avec une tâche de la file d'attente serait approprié pour ces tâches. Vous pouvez continuer à utiliser un pool de threads pour le reste des tâches qui sont appropriés pour le parallélisme.

Oui, cela signifie que vous avez à décider où envoyer la tâche selon qu'il s'agit d'une commande de la tâche ou un "peut être parallélisée" de la tâche, mais ce n'est pas une grosse affaire.

Si vous avez des groupes qui doivent être sérialisé, mais qui peuvent s'exécuter en parallèle avec d'autres tâches, alors vous avez plusieurs choix:

  1. Créer une tâche unique pour chaque groupe, ce qui n'est le groupe de tâches dans l'ordre, et après cette tâche pour le pool de threads.
  2. Demandez à chaque tâche dans un groupe explicitement attendre pour la tâche précédente dans le groupe, et les publier sur le pool de threads. Cela nécessite que votre pool de threads peut gérer le cas où un thread est en attente d'un pas-encore-tâche planifiée sans interblocage.
  3. Avoir un thread dédié pour chaque groupe, et le poste de tâches de groupe sur la file d'attente de messages.

8voto

pvoosten Points 1684

Fondamentalement, il existe un certain nombre de tâches en attente. Certaines de ces tâches peuvent être effectuées uniquement lorsqu'un ou plusieurs autres tâches en attente ont fini de s'exécuter.

Les tâches en attente peut être modélisé dans un graphe de dépendance:

  • "la tâche 1 -> task2" signifie "la tâche 2 ne peut être exécuté qu'après la tâche 1 est terminé." les flèches pointent dans la direction de la commande d'exécution.
  • le indegree d'une tâche (le nombre de tâches en le montrant du doigt) détermine si la tâche est prêt pour l'exécution. Si le indegree est de 0, elle peut être exécutée.
  • parfois, une tâche doit attendre plusieurs tâches pour terminer, le indegree est donc >1.
  • si une tâche n'a pas à attendre pour d'autres tâches pour terminer plus (sa indegree est égale à zéro), il peut être soumis à la pool de threads avec les threads de travail, ou de la file d'attente de tâches en attente d'être ramassé par un thread de travail. Vous connaissez le soumis tâche ne sera pas provoquer un blocage, parce que la tâche n'est pas d'attente pour rien. Comme l'optimisation, vous pouvez utiliser une file d'attente de priorité, par exemple dans les tâches que plus de tâches dans le graphe de dépendance dépendent sera exécuté en premier. Ceci peut également pas provoquer de blocage, car toutes les tâches dans le pool de threads peut être exécutée. Il peut provoquer la famine, cependant.
  • Si une tâche se termine l'exécution, il peut être retiré à partir du graphe de dépendance, voire à réduire le indegree d'autres tâches, ce qui peut à son tour être soumis à la pool de threads de travail.

Donc, il y a (au moins) un thread utilisé pour ajouter/supprimer des tâches en attente, et il y a un thread du pool de threads de travail.

Lorsqu'une tâche est ajoutée au graphe de dépendance, vous devez vérifier:

  • la façon dont la tâche est connecté dans le graphe de dépendance: quelles sont les tâches que doit-on attendre de terminer et de ce que les tâches que doit attendre qu'elle se termine? Établir des liens à partir et à la nouvelle tâche en conséquence.
  • une fois les connexions sont tirés: les nouvelles connexions cause tous les cycles dans le graphe de dépendance? Si oui, il y a une situation de blocage.

Performance:

  • ce modèle est plus lent que l'exécution séquentielle si l'exécution en parallèle, est en fait rarement possible, car vous avez besoin de plus de l'administration de tout faire presque de manière séquentielle, de toute façon.
  • ce modèle est rapide si de nombreuses tâches peuvent être effectuées simultanément dans la pratique.

Hypothèses:

Comme vous avez pu le lire entre les lignes, vous devez concevoir les tâches de sorte qu'ils n'interfèrent pas avec d'autres tâches. Aussi, il doit y avoir un moyen de déterminer la priorité des tâches. La priorité de la tâche doit inclure les données manipulées par chaque tâche. Deux tâches ne peuvent pas modifier le même objet en même temps; l'une des tâches doit obtenir la priorité sur l'autre à la place, ou les opérations effectuées sur l'objet doit être thread-safe.

6voto

Martin Points 3187

Pour faire ce que vous voulez faire avec un pool de threads, vous pourriez avoir à créer une sorte de planificateur.

Quelque chose comme ça:

TaskQueue -> Planification -> File -> Pool De Threads

Le planificateur s'exécute dans son propre thread, en gardant les pistes de dépendances entre les tâches. Quand un travail est prêt à être réalisé, le planificateur pousse juste dans la file d'attente pour le pool de threads.

Le pool de threads qui peuvent avoir à envoyer des signaux à l'Ordonnanceur pour indiquer quand un travail est fait pour que l'ordonnanceur peut mettre les emplois selon que le travail dans la File d'attente.

Dans votre cas, les dépendances pourrait probablement être stockés dans une liste chaînée.

Disons que vous avez les dépendances suivantes: 3 -> 4 -> 6 -> 8

Job 3 est en cours d'exécution sur le pool de threads, vous n'avez toujours pas d'idées de job 8 existe.

Job 3 se termine. Vous enlevez les 3 à partir de la liste liée, vous mettez de l'emploi 4 sur la file d'attente pour le pool de threads.

Job 8 arrive. Vous le mettez à la fin de la liste chaînée.

Les seules constructions qui doivent être parfaitement synchronisés sont les Files d'attente avant et après le planificateur.

5voto

Matt Points 4917

Si je comprends bien le problème correctement, le jdk exécuteurs n'ont pas cette capacité, mais il est facile de rouler votre propre. En gros, vous avez besoin

  • un pool de threads de travail, dont chacun a une file
  • certains abstraction au-dessus de ces files d'attente à qui vous offrez le travail (c.f. l' ExecutorService)
  • un algorithme qui sélectionne de façon déterministe une file d'attente spécifique pour chaque élément de travail
  • chaque pièce de travail, puis s'offre à la droite de la file d'attente et donc est traitée dans le bon ordre

La différence pour le jdk exécuteurs testamentaires, c'est qu'ils ont 1 file d'attente avec les n threads, mais vous voulez n files d'attente et les m fils (où n peut ou peut ne pas correspondre à m)

* edit après avoir lu que chaque tâche a une des clés *

Dans un peu plus en détail

  • écrire du code qui transforme une clé dans un index (un int) dans une plage donnée (0-n où n est le nombre de threads que vous voulez), cela pourrait être aussi simple que d' key.hashCode() % n ou peut-être certains de faire le mappage statique de connues les valeurs de clé de threads ou ce que vous voulez
  • au démarrage
    • créez n files d'attente, les mettre dans une structure indexée (tableau, liste quoi que ce soit)
    • start n threads, chaque thread juste un blocage de prendre de la file d'attente
    • lorsqu'il reçoit un peu de travail, il sait comment exécuter des travaux spécifiques à la tâche/l'événement (vous pouvez évidemment en avoir la cartographie de tâches à des actions si vous avez hétérogène événements)
  • stockez ce qui se cache derrière certaines façade qui accepte les éléments de travail
  • lorsqu'une tâche arrive, la main à la façade
    • la façade trouve la bonne file d'attente pour la tâche basée sur la touche, l'offre de la file d'attente

il est plus facile suffisant pour ajouter auto redémarrage de threads à ce régime, vous devez le thread de travail pour enregistrer avec certains gestionnaire de l'état "je possède cette file d'attente" et puis un peu de ménage autour de + détection d'erreurs dans le thread (ce qui signifie qu'il annule l'inscription de la propriété de la file d'attente de retour de la file d'attente à un pool libre de files d'attente qui est un déclencheur pour démarrer un nouveau thread)

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X