52 votes

Pourquoi Monitor.PulseAll entraîne-t-il un modèle de latence en "escalier" dans les threads signalés ?

Dans une bibliothèque utilisant Monitor.PulseAll() pour la synchronisation des threads, j'ai remarqué que la latence entre le moment où PulseAll(...) est appelé et le moment où un thread est réveillé semble suivre une distribution en "escalier" -- avec des pas extrêmement grands. Les threads réveillés ne font presque aucun travail et retournent presque immédiatement à l'attente du moniteur. Par exemple, sur une machine à 12 cœurs avec 24 threads attendant sur un moniteur (2x Xeon5680/Gulftown ; 6 cœurs physiques par processeur ; HT désactivée), la latence entre l'impulsion et le réveil d'un thread est la suivante :

Latency using Monitor.PulseAll(); 3rd party library

Les 12 premiers threads (notez que nous avons 12 cœurs) prennent entre 30 et 60 microsecondes pour répondre. Ensuite, nous commençons à avoir des sauts très importants, avec des plateaux autour de 700, 1300, 1900, et 2600 microsecondes.

J'ai pu recréer avec succès ce comportement indépendamment de la bibliothèque tierce en utilisant le code ci-dessous. Ce code lance un grand nombre de threads (modifiez le paramètre numThreads) qui attendent simplement un moniteur, lisent un horodatage, l'enregistrent dans un ConcurrentSet, puis retournent immédiatement à l'attente. Une fois par seconde, PulseAll() réveille tous les threads. Elle effectue cette opération 20 fois, et signale les latences de la 10e itération à la console.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;

namespace PulseAllTest
{
    class Program
    {
        static long LastTimestamp;
        static long Iteration;
        static object SyncObj = new object();
        static Stopwatch s = new Stopwatch();
        static ConcurrentBag<Tuple<long, long>> IterationToTicks = new ConcurrentBag<Tuple<long, long>>();

        static void Main(string[] args)
        {
            long numThreads = 32;

            for (int i = 0; i < numThreads; ++i)
            {
                Task.Factory.StartNew(ReadLastTimestampAndPublish, TaskCreationOptions.LongRunning);
            }

            s.Start();
            for (int i = 0; i < 20; ++i)
            {
                lock (SyncObj)
                {
                    ++Iteration;
                    LastTimestamp = s.Elapsed.Ticks;
                    Monitor.PulseAll(SyncObj);
                }
                Thread.Sleep(TimeSpan.FromSeconds(1));
            }

            Console.WriteLine(String.Join("\n",
                from n in IterationToTicks where n.Item1 == 10 orderby n.Item2 
                    select ((decimal)n.Item2)/TimeSpan.TicksPerMillisecond));
            Console.Read();
        }

        static void ReadLastTimestampAndPublish()
        {
            while(true)
            {
                lock(SyncObj)
                {
                    Monitor.Wait(SyncObj);
                }
                IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
            }
        }
    }
}

En utilisant le code ci-dessus, voici un exemple de latences sur une boîte avec 8 cores /w hyperthreading activé (i.e. 16 cores dans le Task Manager) et 32 threads (*2x Xeon5550/Gainestown ; 4 cores physiques par processeur ; HT Enabled) :

Latency using Monitor.PulseAll(), sample code

EDIT : Pour essayer d'éliminer NUMA de l'équation, voici un graphique exécutant le programme d'exemple avec 16 threads sur un Core i7-3770 (Ivy Bridge) ; 4 cœurs physiques ; HT activée :

Latency using Monitor.PulseAll(), sample code, no NUMA

Quelqu'un peut-il expliquer pourquoi Monitor.PulseAll() se comporte de cette façon ?

EDIT2 :

Pour essayer de montrer que ce comportement n'est pas inhérent au réveil d'un tas de threads en même temps, j'ai reproduit le comportement du programme de test en utilisant des événements ; et au lieu de mesurer la latence de PulseAll(), je mesure la latence de ManualResetEvent.Set(). Le code crée un certain nombre de fils de travail puis attend un événement ManualResetEvent.Set() sur le même objet ManualResetEvent. Lorsque l'événement est déclenché, ils prennent une mesure de latence puis attendent immédiatement leur propre AutoResetEvent individuel par thread. Bien avant l'itération suivante (500 ms avant), le ManualResetEvent est Reset() et ensuite chaque AutoResetEvent est Set() afin que les threads puissent retourner attendre le ManualResetEvent partagé.

J'ai hésité à poster ceci parce que cela pourrait être une audience rouge géante (je ne prétends pas que les événements et les moniteurs se comportent de manière similaire) plus il utilise certaines pratiques absolument terribles pour qu'un événement se comporte comme un moniteur (j'aimerais/déteste voir ce que mes collègues feraient si je soumettais ceci à une revue de code) ; mais je pense que les résultats sont éclairants.

Ce test a été effectué sur la même machine que le test original ; un 2xXeon5680/Gulftown ; 6 cœurs par processeur (12 cœurs au total) ; Hyperthreading désactivé.

ManualResetEventLatency

Si vous ne voyez pas à quel point cette méthode est radicalement différente de Monitor.PulseAll, voici le premier graphique superposé au dernier :

ManualResetEventLatency vs. Monitor Latency

Le code utilisé pour générer ces mesures est ci-dessous :

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics;

namespace MRETest
{
    class Program
    {
        static long LastTimestamp;
        static long Iteration;
        static ManualResetEventSlim MRES = new ManualResetEventSlim(false);
        static List<ReadLastTimestampAndPublish> Publishers = 
            new List<ReadLastTimestampAndPublish>();
        static Stopwatch s = new Stopwatch();
        static ConcurrentBag<Tuple<long, long>> IterationToTicks = 
            new ConcurrentBag<Tuple<long, long>>();

        static void Main(string[] args)
        {
            long numThreads = 24;
            s.Start();

            for (int i = 0; i < numThreads; ++i)
            {
                AutoResetEvent ares = new AutoResetEvent(false);
                ReadLastTimestampAndPublish spinner = new ReadLastTimestampAndPublish(
                    new AutoResetEvent(false));
                Task.Factory.StartNew(spinner.Spin, TaskCreationOptions.LongRunning);
                Publishers.Add(spinner);
            }

            for (int i = 0; i < 20; ++i)
            {
                ++Iteration;
                LastTimestamp = s.Elapsed.Ticks;
                MRES.Set();
                Thread.Sleep(500);
                MRES.Reset();
                foreach (ReadLastTimestampAndPublish publisher in Publishers)
                {
                    publisher.ARES.Set();
                }
                Thread.Sleep(500);
            }

            Console.WriteLine(String.Join("\n",
                from n in IterationToTicks where n.Item1 == 10 orderby n.Item2
                    select ((decimal)n.Item2) / TimeSpan.TicksPerMillisecond));
            Console.Read();
        }

        class ReadLastTimestampAndPublish
        {
            public AutoResetEvent ARES { get; private set; }

            public ReadLastTimestampAndPublish(AutoResetEvent ares)
            {
                this.ARES = ares;
            }

            public void Spin()
            {
                while (true)
                {
                    MRES.Wait();
                    IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
                    ARES.WaitOne();
                }
            }
        }
    }
}

1voto

Mitch Points 5201

Pour commencer, ceci n'est pas une réponse, simplement mes notes en regardant le SSCLI pour trouver ce qui se passe exactement. La plupart de ces informations dépassent largement mes compétences, mais elles sont néanmoins intéressantes.

Le voyage dans le terrier du lapin commence par un appel à Monitor.PulseAll qui est implémenté en C# :

clr\src\bcl\system\threading\monitor.cs :

namespace System.Threading
{
    public static class Monitor 
    {
        // other methods omitted

        [MethodImplAttribute(MethodImplOptions.InternalCall)]
        private static extern void ObjPulseAll(Object obj);

        public static void PulseAll(Object obj)
        {
            if (obj==null) {
                throw new ArgumentNullException("obj");
            }

            ObjPulseAll(obj);
        } 
    }
}

Les méthodes InternalCall sont acheminées dans clr\src\vm\ecall.cpp :

FCFuncStart(gMonitorFuncs)
    FCFuncElement("Enter", JIT_MonEnter)
    FCFuncElement("Exit", JIT_MonExit)
    FCFuncElement("TryEnterTimeout", JIT_MonTryEnter)
    FCFuncElement("ObjWait", ObjectNative::WaitTimeout)
    FCFuncElement("ObjPulse", ObjectNative::Pulse)
    FCFuncElement("ObjPulseAll", ObjectNative::PulseAll)
    FCFuncElement("ReliableEnter", JIT_MonReliableEnter)
FCFuncEnd()

ObjectNative vit dans clr\src\vm\comobject.cpp :

FCIMPL1(void, ObjectNative::PulseAll, Object* pThisUNSAFE)
{
    CONTRACTL
    {
        MODE_COOPERATIVE;
        DISABLED(GC_TRIGGERS);  // can't use this in an FCALL because we're in forbid gc mode until we setup a H_M_F.
        THROWS;
        SO_TOLERANT;
    }
    CONTRACTL_END;

    OBJECTREF pThis = (OBJECTREF) pThisUNSAFE;
    HELPER_METHOD_FRAME_BEGIN_1(pThis);
    //-[autocvtpro]-------------------------------------------------------

    if (pThis == NULL)
        COMPlusThrow(kNullReferenceException, L"NullReference_This");

    pThis->PulseAll();

    //-[autocvtepi]-------------------------------------------------------
    HELPER_METHOD_FRAME_END();
}
FCIMPLEND

OBJECTREF est un peu de magie saupoudrée sur le dessus de Object (le -> est surchargé), donc OBJECTREF->PulseAll() est en fait Object->PulseAll() qui est mis en œuvre dans clr\src\vm\object.h et transmet simplement l'appel à ObjHeader->PulseAll :

class Object
{
  // snip   
  public:
  // snip
    ObjHeader   *GetHeader()
    {
        LEAF_CONTRACT;
        return PTR_ObjHeader(PTR_HOST_TO_TADDR(this) - sizeof(ObjHeader));
    }
  // snip
    void PulseAll()
    {
        WRAPPER_CONTRACT;
        GetHeader()->PulseAll();
    }
  // snip
}

ObjHeader::PulseAll récupère le SyncBlock qui utilise AwareLock para Enter et Exit le verrouillage de l'objet. AwareLock ( clr\src\vm\syncblk.cpp ) utilise un CLREvent ( clr\src\vm\synch.cpp ) créé en tant que MonitorEvent ( CLREvent::CreateMonitorEvent(SIZE_T) ), qui appelle UnsafeCreateEvent ( clr\src\inc\unsafe.h ) ou les méthodes de synchronisation de l'environnement d'hébergement.

clr\src\vm\syncblk.cpp :

void ObjHeader::PulseAll()
{
    CONTRACTL
    {
        INSTANCE_CHECK;
        THROWS;
        GC_TRIGGERS;
        MODE_ANY;
        INJECT_FAULT(COMPlusThrowOM(););
    }
    CONTRACTL_END;

    //  The following code may cause GC, so we must fetch the sync block from
    //  the object now in case it moves.
    SyncBlock *pSB = GetBaseObject()->GetSyncBlock();

    // GetSyncBlock throws on failure
    _ASSERTE(pSB != NULL);

    // make sure we own the crst
    if (!pSB->DoesCurrentThreadOwnMonitor())
        COMPlusThrow(kSynchronizationLockException);

    pSB->PulseAll();
}

void SyncBlock::PulseAll()
{
    CONTRACTL
    {
        INSTANCE_CHECK;
        NOTHROW;
        GC_NOTRIGGER;
        MODE_ANY;
    }
    CONTRACTL_END;

    WaitEventLink  *pWaitEventLink;

    while ((pWaitEventLink = ThreadQueue::DequeueThread(this)) != NULL)
        pWaitEventLink->m_EventWait->Set();
}

DequeueThread utilise un crst ( clr\src\vm\crst.cpp ) qui est une enveloppe autour des sections critiques. m_EventWait est un manuel CLREvent .

Ainsi, tout ceci utilise les primitives du système d'exploitation, à moins que l'hébergeur par défaut ne prenne le dessus sur certaines choses.

1voto

Michael Points 1827

Une différence entre ces versions est que dans le cas de PulseAll - les threads répètent immédiatement la boucle, verrouillant à nouveau l'objet.

Vous avez 12 cœurs, donc 12 threads sont en cours d'exécution, exécutent la boucle, et entrent à nouveau dans la boucle, verrouillant l'objet (l'un après l'autre) et entrant ensuite dans l'état d'attente. Pendant tout ce temps, les autres threads attendent. Dans le cas de ManualEvent, vous avez deux événements, donc les threads ne répètent pas immédiatement la boucle, mais sont bloqués sur les événements ARES à la place - cela permet aux autres threads de prendre possession du verrou plus rapidement.

J'ai simulé un comportement similaire dans PulseAll en ajoutant sleep à la fin de la boucle dans ReadLastTimestampAndPublish. Cela permet à l'autre thread de verrouiller syncObj plus rapidement et semble améliorer les chiffres que j'obtiens du programme.

static void ReadLastTimestampAndPublish()
{
    while(true)
    {
        lock(SyncObj)
        {
            Monitor.Wait(SyncObj);
        }
        IterationToTicks.Add(Tuple.Create(Iteration, s.Elapsed.Ticks - LastTimestamp));
        Thread.Sleep(TimeSpan.FromMilliseconds(100));   // <===
    }
}

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X