309 votes

Foreach parallèle avec lambda asynchrone

Je voudrais gérer une collection en parallèle, mais j'ai du mal à le mettre en œuvre et j'espère donc avoir de l'aide.

Le problème se pose si je veux appeler une méthode marquée async en C#, à l'intérieur du lambda de la boucle parallèle. Par exemple :

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

Le problème se pose lorsque le nombre est égal à 0, car tous les threads créés ne sont en fait que des threads d'arrière-plan et l'option Parallel.ForEach L'appel n'attend pas l'achèvement. Si je supprime le mot clé async, la méthode ressemble à ceci :

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

Cela fonctionne, mais cela désactive complètement l'intelligence d'attente et je dois faire un peu de gestion manuelle des exceptions (Supprimé pour des raisons de brièveté).

Comment puis-je mettre en œuvre un Parallel.ForEach qui utilise le mot clé await dans la boucle lambda ? Est-ce possible ?

Le prototype de la méthode Parallel.ForEach prend un Action<T> comme paramètre, mais je veux qu'il attende mon lambda asynchrone.

346voto

Stephen Cleary Points 91731

Si vous voulez un parallélisme simple, vous pouvez le faire :

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

Si vous avez besoin de quelque chose de plus complexe, consultez Stephen Toub ForEachAsync poste .

160voto

Libertad Points 147

L'une des nouvelles API de .NET 6 est la suivante Parallèle.ForEachAsync La méthode de planification du travail asynchrone qui permet de contrôler le degré de parallélisme :

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

Un autre exemple dans le blog de Scott Hanselman .

La source pour référence.

157voto

Serge Semenov Points 2621

Vous pouvez utiliser le ParallelForEachAsync méthode d'extension de Paquet NuGet AsyncEnumerator :

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;

Avertissement : je suis l'auteur de la bibliothèque AsyncEnumerator, qui est open source et sous licence MIT, et je publie ce message uniquement pour aider la communauté.

38voto

Felipe l Points 419

Avec SemaphoreSlim vous pouvez réaliser le contrôle du parallélisme.

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  await throttler.WaitAsync();
  try
  {
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;

17voto

jitbit Points 8072

Méthode d'extension la plus simple possible compilée à partir d'autres réponses et de l'article référencé par l'auteur de la réponse acceptée :

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync();
        try
        {
            await asyncAction(item).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

MISE À JOUR : voici une modification simple qui supporte également un jeton d'annulation comme demandé dans les commentaires (non testé).

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync(cancellationToken);
        if (cancellationToken.IsCancellationRequested) return;

        try
        {
            await asyncAction(item, cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X