16 votes

Comment (et si) écrire une file d'attente à consommateur unique en utilisant le TPL ?

J'ai entendu récemment un grand nombre de podcasts sur la TPL dans .NET 4.0. La plupart d'entre eux décrivent des activités en arrière-plan, comme le téléchargement d'images ou un calcul, en utilisant des tâches pour que le travail n'interfère pas avec un thread de l'interface graphique.

La plupart du code sur lequel je travaille a plutôt une saveur de producteur multiple / consommateur unique, où les éléments de travail provenant de plusieurs sources doivent être mis en file d'attente et ensuite traités dans l'ordre. Un exemple serait la journalisation, où les lignes de journal de plusieurs threads sont séquentialisées dans une seule file d'attente pour une éventuelle écriture dans un fichier ou une base de données. Tous les enregistrements d'une même source doivent rester dans l'ordre, et les enregistrements d'un même moment doivent être "proches" les uns des autres dans le résultat final.

Ainsi, plusieurs threads ou tâches ou autres invoquent tous une file d'attente :

lock( _queue ) // or use a lock-free queue!
{
   _queue.enqueue( some_work );
   _queueSemaphore.Release();
}

Et un fil de travail dédié traite la file d'attente :

while( _queueSemaphore.WaitOne() )
{
   lock( _queue )
   {
      some_work = _queue.dequeue();     
   }
   deal_with( some_work );
}

Il a toujours semblé raisonnable de dédier un thread de travail pour le côté consommateur de ces tâches. Devrais-je écrire mes futurs programmes en utilisant plutôt une construction de la TPL ? Laquelle ? Pourquoi ?

13voto

Ade Miller Points 7750

Vous pouvez utiliser une tâche à long terme pour traiter les éléments d'une BlockingCollection, comme le suggère Wilka. Voici un exemple qui répond à peu près aux exigences de votre application. Vous obtiendrez un résultat semblable à celui-ci :

Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C

Non pas que les sorties de A, B, C et D semblent aléatoires car elles dépendent de l'heure de début des fils, mais B apparaît toujours avant B1.

public class LogItem 
{
    public string Message { get; private set; }

    public LogItem (string message)
    {
        Message = message;
    }
}

public void Example()
{
    BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();

    // Start queue listener...
    CancellationTokenSource canceller = new CancellationTokenSource();
    Task listener = Task.Factory.StartNew(() =>
        {
            while (!canceller.Token.IsCancellationRequested)
            {
                LogItem item;
                if (_queue.TryTake(out item))
                    Console.WriteLine(item.Message);
            }
        },
    canceller.Token, 
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    // Add some log messages in parallel...
    Parallel.Invoke(
        () => { _queue.Add(new LogItem("Log from task A")); },
        () => { 
            _queue.Add(new LogItem("Log from task B")); 
            _queue.Add(new LogItem("Log from task B1")); 
        },
        () => { _queue.Add(new LogItem("Log from task C")); },
        () => { _queue.Add(new LogItem("Log from task D")); });

    // Pretend to do other things...
    Thread.Sleep(1000);

    // Shut down the listener...
    canceller.Cancel();
    listener.Wait();
}

5voto

Barry Points 448

Je sais que cette réponse a environ un an de retard, mais jetez un coup d'oeil à MSDN .

qui montre comment créer un LimitedConcurrencyLevelTaskScheduler à partir de la classe TaskScheduler. En limitant la simultanéité à une seule tâche, ce dernier devrait alors traiter vos tâches dans l'ordre où elles sont mises en file d'attente :

LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(lcts);

factory.StartNew(()=> 
{
   // your code
});

3voto

FrenchData Points 430

Je ne suis pas sûr que TPL soit adéquat dans votre cas d'utilisation. D'après ce que j'ai compris, la principale utilisation de TPL est de diviser une tâche énorme en plusieurs tâches plus petites qui peuvent être exécutées côte à côte. Par exemple, si vous avez une grande liste et que vous voulez appliquer la même transformation sur chaque élément. Dans ce cas, vous pouvez avoir plusieurs tâches qui appliquent la transformation sur un sous-ensemble de la liste.

Le cas que vous décrivez ne me semble pas correspondre à cette image. Dans votre cas, vous n'avez pas plusieurs tâches qui font la même chose en parallèle. Vous avez plusieurs tâches différentes qui font chacune leur propre travail (les producteurs) et une tâche qui consomme. Peut-être que TPL pourrait être utilisé pour la partie consommateur si vous voulez avoir plusieurs consommateurs parce que dans ce cas, chaque consommateur fait le même travail (en supposant que vous trouviez une logique pour appliquer la cohérence temporelle que vous recherchez).

Bien sûr, ce n'est que mon point de vue personnel sur le sujet.

Vivre longtemps et prospérer

2voto

Wilka Points 13239

On dirait que BlocageCollection serait pratique pour vous. Ainsi, pour votre code ci-dessus, vous pourriez utiliser quelque chose comme (en supposant que _queue est un BlockingCollection instance) :

// for your producers 
_queue.Add(some_work);

Un fil de travail dédié qui traite la file d'attente :

foreach (var some_work in _queue.GetConsumingEnumerable())
{
    deal_with(some_work);
}

Note : lorsque tous vos producteurs ont fini de produire des trucs, vous devrez appeler CompleteAdding() sur _queue sinon votre consommateur sera bloqué dans l'attente d'autres travaux.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X