Dans une bibliothèque utilisant Monitor.PulseAll() pour la synchronisation des threads, j'ai remarqué que la latence entre le moment où PulseAll(...) est appelé et le moment où un thread est réveillé semble suivre une distribution en "escalier" -- avec des pas extrêmement grands. Les threads réveillés ne font presque aucun travail et retournent presque immédiatement à l'attente du moniteur. Par exemple, sur une machine à 12 cœurs avec 24 threads attendant sur un moniteur (2x Xeon5680/Gulftown ; 6 cœurs physiques par processeur ; HT désactivée), la latence entre l'impulsion et le réveil d'un thread est la suivante :
Les 12 premiers threads (notez que nous avons 12 cœurs) prennent entre 30 et 60 microsecondes pour répondre. Ensuite, nous commençons à avoir des sauts très importants, avec des plateaux autour de 700, 1300, 1900, et 2600 microsecondes.
J'ai pu recréer avec succès ce comportement indépendamment de la bibliothèque tierce en utilisant le code ci-dessous. Ce code lance un grand nombre de threads (modifiez le paramètre numThreads) qui attendent simplement un moniteur, lisent un horodatage, l'enregistrent dans un ConcurrentSet, puis retournent immédiatement à l'attente. Une fois par seconde, PulseAll() réveille tous les threads. Elle effectue cette opération 20 fois, et signale les latences de la 10e itération à la console.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;
namespace PulseAllTest
{
class Program
{
static long LastTimestamp;
static long Iteration;
static object SyncObj = new object();
static Stopwatch s = new Stopwatch();
static ConcurrentBag<Tuple<long, long>> IterationToTicks = new ConcurrentBag<Tuple<long, long>>();
static void Main(string[] args)
{
long numThreads = 32;
for (int i = 0; i < numThreads; ++i)
{
Task.Factory.StartNew(ReadLastTimestampAndPublish, TaskCreationOptions.LongRunning);
}
s.Start();
for (int i = 0; i < 20; ++i)
{
lock (SyncObj)
{
++Iteration;
LastTimestamp = s.Elapsed.Ticks;
Monitor.PulseAll(SyncObj);
}
Thread.Sleep(TimeSpan.FromSeconds(1));
}
Console.WriteLine(String.Join("\n",
from n in IterationToTicks where n.Item1 == 10 orderby n.Item2
select ((decimal)n.Item2)/TimeSpan.TicksPerMillisecond));
Console.Read();
}
static void ReadLastTimestampAndPublish()
{
while(true)
{
lock(SyncObj)
{
Monitor.Wait(SyncObj);
}
IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
}
}
}
}
En utilisant le code ci-dessus, voici un exemple de latences sur une boîte avec 8 cores /w hyperthreading activé (i.e. 16 cores dans le Task Manager) et 32 threads (*2x Xeon5550/Gainestown ; 4 cores physiques par processeur ; HT Enabled) :
EDIT : Pour essayer d'éliminer NUMA de l'équation, voici un graphique exécutant le programme d'exemple avec 16 threads sur un Core i7-3770 (Ivy Bridge) ; 4 cœurs physiques ; HT activée :
Quelqu'un peut-il expliquer pourquoi Monitor.PulseAll() se comporte de cette façon ?
EDIT2 :
Pour essayer de montrer que ce comportement n'est pas inhérent au réveil d'un tas de threads en même temps, j'ai reproduit le comportement du programme de test en utilisant des événements ; et au lieu de mesurer la latence de PulseAll(), je mesure la latence de ManualResetEvent.Set(). Le code crée un certain nombre de fils de travail puis attend un événement ManualResetEvent.Set() sur le même objet ManualResetEvent. Lorsque l'événement est déclenché, ils prennent une mesure de latence puis attendent immédiatement leur propre AutoResetEvent individuel par thread. Bien avant l'itération suivante (500 ms avant), le ManualResetEvent est Reset() et ensuite chaque AutoResetEvent est Set() afin que les threads puissent retourner attendre le ManualResetEvent partagé.
J'ai hésité à poster ceci parce que cela pourrait être une audience rouge géante (je ne prétends pas que les événements et les moniteurs se comportent de manière similaire) plus il utilise certaines pratiques absolument terribles pour qu'un événement se comporte comme un moniteur (j'aimerais/déteste voir ce que mes collègues feraient si je soumettais ceci à une revue de code) ; mais je pense que les résultats sont éclairants.
Ce test a été effectué sur la même machine que le test original ; un 2xXeon5680/Gulftown ; 6 cœurs par processeur (12 cœurs au total) ; Hyperthreading désactivé.
Si vous ne voyez pas à quel point cette méthode est radicalement différente de Monitor.PulseAll, voici le premier graphique superposé au dernier :
Le code utilisé pour générer ces mesures est ci-dessous :
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;
namespace MRETest
{
class Program
{
static long LastTimestamp;
static long Iteration;
static ManualResetEventSlim MRES = new ManualResetEventSlim(false);
static List<ReadLastTimestampAndPublish> Publishers =
new List<ReadLastTimestampAndPublish>();
static Stopwatch s = new Stopwatch();
static ConcurrentBag<Tuple<long, long>> IterationToTicks =
new ConcurrentBag<Tuple<long, long>>();
static void Main(string[] args)
{
long numThreads = 24;
s.Start();
for (int i = 0; i < numThreads; ++i)
{
AutoResetEvent ares = new AutoResetEvent(false);
ReadLastTimestampAndPublish spinner = new ReadLastTimestampAndPublish(
new AutoResetEvent(false));
Task.Factory.StartNew(spinner.Spin, TaskCreationOptions.LongRunning);
Publishers.Add(spinner);
}
for (int i = 0; i < 20; ++i)
{
++Iteration;
LastTimestamp = s.Elapsed.Ticks;
MRES.Set();
Thread.Sleep(500);
MRES.Reset();
foreach (ReadLastTimestampAndPublish publisher in Publishers)
{
publisher.ARES.Set();
}
Thread.Sleep(500);
}
Console.WriteLine(String.Join("\n",
from n in IterationToTicks where n.Item1 == 10 orderby n.Item2
select ((decimal)n.Item2) / TimeSpan.TicksPerMillisecond));
Console.Read();
}
class ReadLastTimestampAndPublish
{
public AutoResetEvent ARES { get; private set; }
public ReadLastTimestampAndPublish(AutoResetEvent ares)
{
this.ARES = ares;
}
public void Spin()
{
while (true)
{
MRES.Wait();
IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
ARES.WaitOne();
}
}
}
}
}