140 votes

Comment limiter le nombre d'opérations d'E/S asynchrones simultanées ?

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

Voici le problème, il lance plus de 1000 requêtes web simultanées. Existe-t-il un moyen simple de limiter le nombre de ces requêtes http asynchrones ? De sorte que pas plus de 20 pages web soient téléchargées à un moment donné. Comment le faire de la manière la plus efficace possible ?

207voto

Theo Yaung Points 1593

Vous pouvez certainement le faire dans les dernières versions d'async pour .NET, en utilisant .NET 4.5 Beta. L'article précédent de 'usr' renvoie à un bon article écrit par Stephen Toub, mais la nouvelle moins annoncée est que le sémaphore asynchrone a été intégré à la version bêta de .NET 4.5.

Si vous regardez notre bien-aimé SemaphoreSlim (que vous devriez utiliser car elle est plus performante que la classe originale Semaphore ), il peut désormais se targuer d'être le WaitAsync(...) une série de surcharges, avec tous les arguments attendus - intervalles de temps, jetons d'annulation, tous vos amis habituels de la planification :)

Stephen a également écrit un article de blog plus récent sur les nouvelles fonctionnalités de .NET 4.5 qui sont apparues avec la version bêta. Nouveautés pour Parallelism in .NET 4.5 Beta .

Enfin, voici un exemple de code sur l'utilisation de SemaphoreSlim pour la limitation des méthodes asynchrones :

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

Enfin, une solution qui utilise l'ordonnancement basé sur le TPL mérite d'être mentionnée. Vous pouvez créer des tâches liées à un délégué sur le TPL qui n'ont pas encore été lancées, et permettre à un planificateur de tâches personnalisé de limiter la concurrence. En fait, il y a un exemple MSDN pour cela ici :

Voir aussi TaskScheduler .

23voto

Dogu Arslan Points 1571

Si vous avez une IEnumerable (c'est-à-dire des chaînes d'URL) et que vous voulez effectuer une opération d'entrée/sortie avec chacune d'entre elles (c'est-à-dire faire une requête http asynchrone) simultanément ET, éventuellement, vous voulez également définir le nombre maximum de requêtes d'entrée/sortie simultanées en temps réel, voici comment vous pouvez le faire. De cette manière, vous n'utilisez pas de pool de threads et autres, la méthode utilise des sémaphores limités pour contrôler le nombre maximum de demandes d'E/S simultanées, de manière similaire à un modèle de fenêtre glissante : une demande se termine, quitte le sémaphore et la suivante y entre.

l'usage :

await ForEachAsync(urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);

public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }

10voto

Serge Semenov Points 2621

Il y a beaucoup de pièges et l'utilisation directe d'un sémaphore peut être délicate dans les cas d'erreur, je suggérerais donc d'utiliser Paquet NuGet AsyncEnumerator au lieu de réinventer la roue :

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);

6voto

usr Points 74796

Malheureusement, il manque au Framework .NET les combinateurs les plus importants pour orchestrer des tâches asynchrones parallèles. Il n'y a rien de tel d'intégré.

Regardez le AsyncSemaphore construit par le très respectable Stephen Toub. Ce que vous voulez s'appelle un sémaphore, et vous avez besoin d'une version asynchrone de celui-ci.

4voto

Jay Shah Points 1303

SemaphoreSlim peut être très utile ici. Voici la méthode d'extension que j'ai créée.

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxActionsToRunInParallel = null)
    {
        if (maxActionsToRunInParallel.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all of the provided tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Utilisation de l'échantillon :

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X