235 votes

Foreach parallèle avec lambda asynchrone

Je voudrais gérer une collection en parallèle, mais j'ai du mal à le mettre en œuvre et j'espère donc avoir de l'aide.

Le problème se pose si je veux appeler une méthode marquée async en C#, à l'intérieur du lambda de la boucle parallèle. Par exemple :

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

Le problème se pose lorsque le nombre est égal à 0, car tous les threads créés ne sont en fait que des threads d'arrière-plan et l'option Parallel.ForEach L'appel n'attend pas l'achèvement. Si je supprime le mot clé async, la méthode ressemble à ceci :

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

Cela fonctionne, mais cela désactive complètement l'intelligence d'attente et je dois faire un peu de gestion manuelle des exceptions (Supprimé pour des raisons de brièveté).

Comment puis-je mettre en œuvre un Parallel.ForEach qui utilise le mot clé await dans la boucle lambda ? Est-ce possible ?

Le prototype de la méthode Parallel.ForEach prend un Action<T> comme paramètre, mais je veux qu'il attende mon lambda asynchrone.

5voto

nicolas2008 Points 46

Mon implémentation légère de ParallelForEach async.

Caractéristiques :

  1. Étranglement (degré maximal de parallélisme).
  2. Gestion des exceptions (l'exception d'agrégation sera levée à la fin du processus).
  3. Mémoire efficace (pas besoin de stocker la liste des tâches).

    public static class AsyncEx { public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10) { var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism); var tcs = new TaskCompletionSource<object>(); var exceptions = new ConcurrentBag<Exception>(); bool addingCompleted = false;

        foreach (T item in source)
        {
            await semaphoreSlim.WaitAsync();
            asyncAction(item).ContinueWith(t =>
            {
                semaphoreSlim.Release();
    
                if (t.Exception != null)
                {
                    exceptions.Add(t.Exception);
                }
    
                if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
                {
                    tcs.TrySetResult(null);
                }
            });
        }
    
        Volatile.Write(ref addingCompleted, true);
        await tcs.Task;
        if (exceptions.Count > 0)
        {
            throw new AggregateException(exceptions);
        }
    }

    }

Exemple d'utilisation :

await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
    var data = await GetData(i);
}, maxDegreeOfParallelism: 100);

1voto

Jay Shah Points 1303

J'ai créé une méthode d'extension pour cela qui utilise SemaphoreSlim et permet également de définir le degré maximal de parallélisme.

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Utilisation de l'échantillon :

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

1voto

Tom Points 326

Dans la réponse acceptée, le ConcurrentBag n'est pas nécessaire. Voici une implémentation qui ne l'utilise pas :

var tasks = myCollection.Select(GetData).ToList();
await Task.WhenAll(tasks);
var results = tasks.Select(t => t.Result);

Tous les éléments "//quelques trucs avant" et "//quelques trucs après" peuvent être intégrés dans l'implémentation de GetData (ou une autre méthode qui appelle GetData).

En plus d'être plus court, il n'y a pas d'utilisation d'un lambda "async void", qui est un anti-modèle.

0voto

Caleb Holt Points 176

Les éléments suivants sont configurés pour fonctionner avec IAsyncEnumerable mais peut être modifié pour utiliser IEnumerable en changeant simplement le type et en supprimant le "await" sur l'étiquette. foreach . C'est beaucoup plus approprié pour les grands ensembles de données que de créer d'innombrables tâches parallèles et de les attendre toutes.

    public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
    {
        ActionBlock<T> block = new ActionBlock<T>(
           action, 
           new ExecutionDataflowBlockOptions 
           { 
             MaxDegreeOfParallelism = maxDegreeOfParallelism, 
             BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3 
           });

        await foreach (T item in enumerable)
        {
           await block.SendAsync(item).ConfigureAwait(false);
        }

        block.Complete();
        await block.Completion;
    }

0voto

Mahesh Bongani Points 122

Avec l'introduction de .Net 6 Parallèle.ForEachAsync est maintenant disponible.

using System.Net.Http.Headers;
using System.Net.Http.Json;

var userHandlers = new []
{
    "users/okyrylchuk",
    "users/shanselman",
    "users/jaredpar",
    "users/davidfowl"
};

using HttpClient client = new()
{
    BaseAddress = new Uri("https://api.github.com"),
};
client.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("DotNet", "6"));

ParallelOptions parallelOptions = new()
{
    MaxDegreeOfParallelism = 3
};

await Parallel.ForEachAsync(userHandlers, parallelOptions, async (uri, token) =>
{
    var user = await client.GetFromJsonAsync<GitHubUser>(uri, token);

    Console.WriteLine($"Name: {user.Name}\nBio: {user.Bio}\n");
});

public class GitHubUser
{
    public string Name { get; set; }
    public string  Bio { get; set; }
}

Le suivi complet du numéro sur github Et certains exemples d'utilisation ici par SCOTT HANSELMAN

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X