235 votes

Foreach parallèle avec lambda asynchrone

Je voudrais gérer une collection en parallèle, mais j'ai du mal à le mettre en œuvre et j'espère donc avoir de l'aide.

Le problème se pose si je veux appeler une méthode marquée async en C#, à l'intérieur du lambda de la boucle parallèle. Par exemple :

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

Le problème se pose lorsque le nombre est égal à 0, car tous les threads créés ne sont en fait que des threads d'arrière-plan et l'option Parallel.ForEach L'appel n'attend pas l'achèvement. Si je supprime le mot clé async, la méthode ressemble à ceci :

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

Cela fonctionne, mais cela désactive complètement l'intelligence d'attente et je dois faire un peu de gestion manuelle des exceptions (Supprimé pour des raisons de brièveté).

Comment puis-je mettre en œuvre un Parallel.ForEach qui utilise le mot clé await dans la boucle lambda ? Est-ce possible ?

Le prototype de la méthode Parallel.ForEach prend un Action<T> comme paramètre, mais je veux qu'il attende mon lambda asynchrone.

284voto

Stephen Cleary Points 91731

Si vous voulez un parallélisme simple, vous pouvez le faire :

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

Si vous avez besoin de quelque chose de plus complexe, consultez Stephen Toub ForEachAsync poste .

127voto

Serge Semenov Points 2621

Vous pouvez utiliser le ParallelForEachAsync méthode d'extension de Paquet NuGet AsyncEnumerator :

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;

Avertissement : je suis l'auteur de la bibliothèque AsyncEnumerator, qui est open source et sous licence MIT, et je publie ce message uniquement pour aider la communauté.

58voto

Libertad Points 147

L'une des nouvelles API de .NET 6 est la suivante Parallèle.ForEachAsync La méthode de planification du travail asynchrone qui permet de contrôler le degré de parallélisme :

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

Un autre exemple dans le blog de Scott Hanselman .

La source pour référence.

32voto

Felipe l Points 419

Avec SemaphoreSlim vous pouvez réaliser le contrôle du parallélisme.

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  await throttler.WaitAsync();
  try
  {
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;

12voto

jitbit Points 8072

Méthode d'extension la plus simple possible compilée à partir d'autres réponses et de l'article référencé par l'auteur de la réponse acceptée :

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync();
        try
        {
            await asyncAction(item).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

MISE À JOUR : voici une modification simple qui supporte également un jeton d'annulation comme demandé dans les commentaires (non testé).

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync(cancellationToken);
        if (cancellationToken.IsCancellationRequested) return;

        try
        {
            await asyncAction(item, cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X