3 votes

Rx - consommer chaque élément sur un nouveau thread

Supposons que j'ai un tel code :

static void Main (string[] args) 
    { 
        var scheduler = NewThreadScheduler.Default; 
        var enumerable = Enumerable.Range (0, 100);

        enumerable
        .ToObservable (scheduler)
        .SubscribeOn (scheduler)
        .Subscribe (item => 
            { 
                Console.WriteLine ("Consommation de {0} sur le thread : {1}", item, Thread.CurrentThread.ManagedThreadId);

                // simuler une opération longue
                Thread.Sleep(1000); 
            }); 

        Console.ReadKey(); 
    } 

Comme vous le voyez, je convertis IEnumerable en IObservable. Ensuite, je veux consommer chaque élément sur un nouveau thread, donc j'utilise SubscribeOn (scheduler). Malheureusement, chaque itération fonctionne sur le même thread, donc une itération bloque la suivante.

Le résultat est :

Consommation de 0 sur le thread : 4 
Consommation de 1 sur le thread : 4 
Consommation de 2 sur le thread : 4 
Consommation de 3 sur le thread : 4 
Consommation de 4 sur le thread : 4 
....

Est-il possible de forcer un tel comportement ?

4voto

James World Points 9394

Le comportement que vous observez est entièrement délibéré.

Fondamental à Rx est sa grammaire qui déclare qu'un flux est défini comme une séquence de zéro ou plusieurs appels OnNext suivis éventuellement d'un OnError ou d'un OnCompleted.

En particulier, la grammaire de Rx stipule que chacun de ces messages est délivré séquentiellement pour un abonné donné.

Donc ce que vous observez est le comportement correct - aucune exécution concurrente des gestionnaires OnNext. Étant donné cette contrainte délibérée, créer un nouveau thread pour chaque OnNext serait assez coûteux.

Sous-jacent, si vous tracez suffisamment loin le code, vous verrez que le NewThreadScheduler utilise un EventLoopScheduler spécifiquement pour réutiliser le thread pour chaque abonné. Le nom NewThreadScheduler parle en réalité du fait que chaque abonné obtient un nouveau thread, pas chaque événement.

Pour voir cela, modifiez votre code pour avoir deux abonnés fonctionnant à des vitesses différentes. Vous verrez que chacun obtient son propre thread et progresse à sa propre vitesse, le plus rapide étant non entravé par le plus lent :

var planificateur = NewThreadScheduler.Default;
var énumérable = Enumerable.Range(0, 100);

var xs = énumérable
    .ToObservable(planificateur)
    .SubscribeOn(planificateur);

xs.Subscribe(item =>
{
    Console.WriteLine("Consommation lente de {0} sur le Thread : {1}",
        item, Thread.CurrentThread.ManagedThreadId);

    // simuler une opération longue plus lente
    Thread.Sleep(1000);
});

xs.Subscribe(item =>
{
    Console.WriteLine("Consommation rapide de {0} sur le Thread : {1}",
        item, Thread.CurrentThread.ManagedThreadId);

    // simuler une opération longue plus rapide
    Thread.Sleep(500);
});

Console.ReadKey();

Vous pourriez trouver utile de lire les Directives de conception de Rx.

Le désir de permettre le traitement concurrent des événements chez un abonné suggère qu'une file d'attente avec plusieurs consommateurs pourrait correspondre à ce que vous recherchez - et pour cela vous pourriez regarder en dehors de Rx, par exemple une ConcurrentQueue de la BCL. Il est également possible de projeter des messages en appels asynchrones et de rassembler les résultats à la fin sans violer les contraintes de la grammaire Rx.

Par exemple, voici un code similaire qui traite aléatoirement chaque nombre du flux pendant un temps différent. Vous pouvez voir les résultats arriver dans le désordre, sans être perturbés les uns par les autres. Ce n'est pas un code génial, mais il fait passer le message. Il pourrait être vraiment utile si le travail asynchrone était lié à l'E/S. Notez également l'utilisation de Observable.Range qui évite l'utilisation de la combinaison Enumerable.Range().ToObservable(). Testé sur .NET Core 2.0 :

var aléatoire = new Random();

// empêcher le threadpool de nous ralentir en grandissant
ThreadPool.SetMinThreads(100, 1);

Observable.Range(0, 100)
.SelectMany(x => Observable.Start(() =>
{
    Console.WriteLine($"Démarré {x}");
    Thread.Sleep(aléatoire.Next(1, 10) * 1000);
    return x;
}))
.Subscribe(item =>
{
    Console.WriteLine($"{item}, {Thread.CurrentThread.ManagedThreadId}");
});

Console.ReadKey();

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X