Le comportement que vous observez est entièrement délibéré.
Fondamental à Rx est sa grammaire qui déclare qu'un flux est défini comme une séquence de zéro ou plusieurs appels OnNext
suivis éventuellement d'un OnError
ou d'un OnCompleted
.
En particulier, la grammaire de Rx stipule que chacun de ces messages est délivré séquentiellement pour un abonné donné.
Donc ce que vous observez est le comportement correct - aucune exécution concurrente des gestionnaires OnNext
. Étant donné cette contrainte délibérée, créer un nouveau thread pour chaque OnNext
serait assez coûteux.
Sous-jacent, si vous tracez suffisamment loin le code, vous verrez que le NewThreadScheduler
utilise un EventLoopScheduler
spécifiquement pour réutiliser le thread pour chaque abonné. Le nom NewThreadScheduler
parle en réalité du fait que chaque abonné obtient un nouveau thread, pas chaque événement.
Pour voir cela, modifiez votre code pour avoir deux abonnés fonctionnant à des vitesses différentes. Vous verrez que chacun obtient son propre thread et progresse à sa propre vitesse, le plus rapide étant non entravé par le plus lent :
var planificateur = NewThreadScheduler.Default;
var énumérable = Enumerable.Range(0, 100);
var xs = énumérable
.ToObservable(planificateur)
.SubscribeOn(planificateur);
xs.Subscribe(item =>
{
Console.WriteLine("Consommation lente de {0} sur le Thread : {1}",
item, Thread.CurrentThread.ManagedThreadId);
// simuler une opération longue plus lente
Thread.Sleep(1000);
});
xs.Subscribe(item =>
{
Console.WriteLine("Consommation rapide de {0} sur le Thread : {1}",
item, Thread.CurrentThread.ManagedThreadId);
// simuler une opération longue plus rapide
Thread.Sleep(500);
});
Console.ReadKey();
Vous pourriez trouver utile de lire les Directives de conception de Rx.
Le désir de permettre le traitement concurrent des événements chez un abonné suggère qu'une file d'attente avec plusieurs consommateurs pourrait correspondre à ce que vous recherchez - et pour cela vous pourriez regarder en dehors de Rx, par exemple une ConcurrentQueue
de la BCL. Il est également possible de projeter des messages en appels asynchrones et de rassembler les résultats à la fin sans violer les contraintes de la grammaire Rx.
Par exemple, voici un code similaire qui traite aléatoirement chaque nombre du flux pendant un temps différent. Vous pouvez voir les résultats arriver dans le désordre, sans être perturbés les uns par les autres. Ce n'est pas un code génial, mais il fait passer le message. Il pourrait être vraiment utile si le travail asynchrone était lié à l'E/S. Notez également l'utilisation de Observable.Range
qui évite l'utilisation de la combinaison Enumerable.Range().ToObservable()
. Testé sur .NET Core 2.0 :
var aléatoire = new Random();
// empêcher le threadpool de nous ralentir en grandissant
ThreadPool.SetMinThreads(100, 1);
Observable.Range(0, 100)
.SelectMany(x => Observable.Start(() =>
{
Console.WriteLine($"Démarré {x}");
Thread.Sleep(aléatoire.Next(1, 10) * 1000);
return x;
}))
.Subscribe(item =>
{
Console.WriteLine($"{item}, {Thread.CurrentThread.ManagedThreadId}");
});
Console.ReadKey();