4 votes

Tester unitaire des méthodes asynchrones C#

J'ai un problème

Je cherche à reproduire quelque chose comme plusieurs appels de procédure stockée (sp) dans mon code c#, mais je veux le faire de manière asynchrone.

Exemple TSQL : (Execute sp @key = 15072000173475; Execute sp @key = 15072000173571; ... Execute sp @key = n;)

[TestClass]
public class UnitTestNomenclature {
    [TestMethod]
    public void ParallelSQLMethod() {
        Task scropeTasks = null;
        //le véritable nombre est supérieur à 1500
        long[] keys = new long[] {15072000173475, 15072000173571 ... n };

        try {
            var tasks = keys.Select(i => Task.Run(async () => { await RunStoredProc(i); }));
            scropeTasks = Task.WhenAll(tasks);

            scropeTasks.Wait();
        } catch (Exception ex) {
            Debug.WriteLine("Exception : " + ex.Message);

            Debug.WriteLine("IsFaulted : " + scropeTasks.IsFaulted);
            foreach (var inx in scropeTasks.Exception.InnerExceptions) {
                Debug.WriteLine("Détails : " + inx.Message);
            }
        }

        Assert.AreEqual(1, 1);
    }

    public async Task RunStoredProc(long scollNumbParam) {
        const string strStoredProcName = @"[dbo].[sp]";
        using (SqlConnection conn = new SqlConnection(@"source de données=SERVEUR;catalogue initial=Db;sécurité intégrée=True;Trusted_Connection=Oui;")) {
            await conn.OpenAsync();
            Debug.WriteLine("============================================ La connexion est ouverte : ==============================================");

            // info
            Debug.WriteLine(String.Format("Connexion : {0}", conn.ClientConnectionId));
            Debug.WriteLine(String.Format("État : {0}", conn.State.ToString()));

            using (SqlCommand cmd = new SqlCommand(strStoredProcName, conn) { CommandTimeout = 120, CommandType = CommandType.StoredProcedure }) {

                SqlParameter scrParam = new SqlParameter() {
                    ParameterName = "@KEYKRT",
                    Value = scollNumbParam,
                    SqlDbType = SqlDbType.BigInt
                };
                cmd.Parameters.Add(scrParam);

                Debug.WriteLine("Début du traitement : " + scollNumbParam);
                await cmd.ExecuteNonQueryAsync().ConfigureAwait(false);
                Debug.WriteLine("Fin du traitement : " + scollNumbParam);

            }
        }

        Debug.WriteLine("============================================ La connexion est fermée : ==============================================");
    }
}

Voici ce que j'obtiens dans la fenêtre de sortie :

========== La connexion est ouverte : ========
Connexion : 5be9c681-6eb5-422f-a22c-b49689a2d912
État : Ouvert
Début du traitement : 15072000173475
========== La connexion est ouverte : ==========
Connexion : cfb66041-6646-4b56-be1c-2afb26a18cb8
État : Ouvert
Début du traitement : 15072000173571
.....
Fin du traitement : 15072000173475
=========== La connexion est fermée : =========
Fin du traitement : 15072000173571
=========== La connexion est fermée : =========

....

Un délai d'attente s'est produit en attendant des ressources de mémoire pour exécuter la requête dans le groupe de ressources 'par défaut' (2). Relancez la requête.
Numéro d'erreur réel : 8645
Numéro de ligne réel : 98

Le débogage indique également que le pool de connexions déborde Je pense que la principale raison est que la connexion n'est pas correctement libérée, mais comment puis-je y parvenir avec async ?

Si j'essaie simplement d'ouvrir une connexion avant la déclaration des tâches asynchrones et de la passer à ma méthode RunStoredProc, alors j'obtiens la connexion ne prend pas en charge MultipleActiveResultSets

using (SqlConnection conn = new SqlConnection(@"source de données=SERVEUR;catalogue initial=Db;sécurité intégrée=True;Trusted_Connection=Oui;)) {

                    conn.OpenAsync();
                    var tasks = keys.Select(i => Task.Run(async () => { await RunStoredProc(i, conn); }));
                    scropeTasks = Task.WhenAll(tasks);

                    scropeTasks.Wait();
                }

                Debug.WriteLine("========== La connexion est fermée : ==========");

Voici ce que j'obtiens dans la fenêtre de sortie :

Connexion : 5be9c681-6eb5-422f-a22c-b49689a2d912
État : Ouvert
Début du traitement : 15072000173475
======= La connexion est ouverte : =============
Connexion : cfb66041-6646-4b56-be1c-2afb26a18cb8
État : Ouvert
Début du traitement : 15072000173571
========= La connexion est ouverte : =========

6voto

Nkosi Points 95895

Vous avez environ 1500 tâches qui s'exécutent toutes en même temps et qui mélangent également des appels asynchrones et bloquants (comme .Wait) ce qui peut causer des blocages.

Rendez le test asynchrone et essayez d'éviter async void sauf s'il s'agit d'un gestionnaire d'événements.

Essayez de les itérer en séquence. Cela prendra plus de temps mais au moins les connexions seront correctement fermées afin de ne pas surcharger les ressources. Vous pourriez également envisager de les faire par lots de taille raisonnable.

[TestMethod]
public async Task ParallelSQLMethod() {
    //le nombre réel est supérieur à 1500
    var keys = new long[] { 
        15072000173475, 
        15072000173571, 
        //....., n
    };
    var tasks = keys.Select(i => RunStoredProc(i));
    var batchSize = 50; //Ou plus petit

    //exécuter les tâches par lots
    var sequence = tasks;
    while (sequence.Any()) {
        var batch = sequence.Take(batchSize);
        sequence = sequence.Skip(batchSize);

        await Task.WhenAll(batch);
    }
}

2voto

ipavlu Points 1148

Je crains de voir des problèmes classiques avec async/await/concurrence/threading ici. Le test comporte plusieurs problèmes, je vais essayer de les passer en revue un par un.

1) Architecture du cas de test. Vous ne précisez pas si le test unitaire que vous écrivez et le serveur SQL sont sur la même machine ou sur des machines différentes.

Si c'est sur la même machine, je choisirais Max(n_cores/2, 1) connexions.

Si c'est sur des machines différentes, je choisirais quelque chose comme 1 à 3 connexions.

Et ces chiffres pourraient être ajustés à la hausse ou à la baisse en fonction du comportement de la procédure stockée, du calcul long/court, de la quantité de données transférées, de la vitesse de la connexion, etc.

2) Problème de concurrence de connexion SQL. Vous ne pouvez pas ouvrir une connexion, puis essayer d'envoyer 1500 requêtes en même temps via cette connexion. En fait, même pas deux en même temps.

C'est ce qu'il vous a dit: la connexion ne prend pas en charge MultipleActiveResultSets.

Vous devez utiliser une connexion ouverte pour une seule requête à la fois.

Mais ! Vous n'êtes pas obligé de l'utiliser uniquement pour une seule requête et la fermer, vous pouvez après l'achèvement de la première requête exécuter la requête suivante et cela serait beaucoup plus rapide que de fermer et de créer une nouvelle connexion. Il vous suffit d'exécuter ces requêtes à travers chaque connexion séquentiellement...

3) Donc, l'architecture correcte du cas de test ressemblerait à ceci :

  • une méthode de test asynchrone,
  • qui pousserait toutes les clés dans la file queue ConcurrentQueue;
  • puis un tableau Task[] de taille basée sur le nombre de connexions requises,
  • démarrer chaque tâche et les stocker dans le tableau,
  • attendre Task.WhenAll(tasks);

J'aime beaucoup jouer avec du code concurrent/parallèle, mais créer de plus en plus de tâches sans les coordonner n'aide pas à accélérer les choses, c'est plutôt une perte de ressources...

4) Exemple :

[TestClass]
public class UnitTestNomenclature
{
    [TestMethod]
    public async Task ParallelSQLMethod()
    {
        long[] keys = new long[] { 15072000173475, 15072000173571 };

        ConcurrentQueue queue = new ConcurrentQueue(keys);

        int connections = Math.Max(1, Environment.ProcessorCount / 2);

        Task[] tasks =
        Enumerable
        .Range(0, connections)
        .Select(i => Task.Run(() => RunConnection(i, queue)).Unwrap())
        .ToArray()
        ;

        await Task.WhenAll(tasks);
    }

    public async Task RunConnection(int connection, ConcurrentQueue queue)
    {
        using (SqlConnection conn = new SqlConnection(@"data source=SERVER;initial catalog=Db;integrated security=True;Trusted_Connection=Yes;"))
        {
            await conn.OpenAsync();
            Debug.WriteLine($"====== Connection[{connection}] is open: ======");

            Debug.WriteLine($"Connection[{connection}]: {conn.ClientConnectionId}");
            Debug.WriteLine($"Connection[{connection}].State: {conn.State}");

            long scollNumbParam;

            while (queue.TryDequeue(out scollNumbParam))
            {
                await RunStoredProc(conn, connection, scollNumbParam);
                Debug.WriteLine($"Connection[{connection}]: {conn.ClientConnectionId}");
                Debug.WriteLine($"Connection[{connection}].State: {conn.State}");
            }
        }

        Debug.WriteLine($"====== Connection[{connection}] is closed  ======");
    }

    public async Task RunStoredProc(SqlConnection conn, int connection, long scollNumbParam)
    {
        const string strStoredProcName = @"[dbo].[sp]";

        using (SqlCommand cmd = new SqlCommand(strStoredProcName, conn) { CommandTimeout = 120, CommandType = CommandType.StoredProcedure })
        {
            SqlParameter scrParam = new SqlParameter()
            {
                ParameterName = "@KEYKRT",
                Value = scollNumbParam,
                SqlDbType = SqlDbType.BigInt
            };
            cmd.Parameters.Add(scrParam);

            Debug.WriteLine($"Connection[{connection}] Start of Proccesing: " + scollNumbParam);
            await cmd.ExecuteNonQueryAsync();
            Debug.WriteLine($"Connection[{connection}] End of Proccesing: " + scollNumbParam);
        }
    }
}

1voto

AllmanTool Points 381

J'ai fait des expériences avec mon code et obtenu un résultat approprié (traitement asynchrone). J'ai modifié le lien de connexion (ajouté : Max Pool Size=250;Connection Timeout=60;Connection Lifetime=0;MultipleActiveResultSets=true) c'est-à-dire j'ai augmenté la taille du pool de connexions et la durée de la connexion.

  • Taille maximale du pool de connexions (Max Pool Size)

  • Nombre minimum de connexions dans un pool de connexions (Min Pool Size)

  • Nombre de secondes de maintien des connexions dans un pool de connexions (Durée de vie de la connexion) (0 est la valeur maximale)

Conseil : Une quantité excessive de pools Max Pool Size (par défaut 100) peut bloquer votre serveur (comme je l'ai fait :))

J'ai également remarqué que je n'ai pas eu d'exception 'la connexion ne prend pas en charge MultipleActiveResultSets' avec 'MultipleActiveResultSets=true' dans ma chaîne de connexion, mais le traitement était synchronisé. Vous pouvez en savoir plus à ce sujet (MARS) sur

Conclusion : L'exécution parallèle sur le serveur n'est pas une fonctionnalité de MARS et les opérations MARS ne sont pas sécurisées pour les threads. MARS n'est pas conçu pour supprimer toutes les exigences de connexions multiples dans une application. Si une application a besoin d'une véritable exécution parallèle de commandes contre un serveur, il convient d'utiliser des connexions multiples. C'est généralement utilisé pour de telles raisons

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X