219 votes

Comment utiliser Async avec ForEach ?

Est-il possible d'utiliser Async quand on utilise ForEach ? Voici le code que j'essaie d'utiliser :

using (DataContext db = new DataLayer.DataContext())
{
    db.Groups.ToList().ForEach(i => async {
        await GetAdminsFromGroup(i.Gid);
    });
}

Je reçois l'erreur :

Le nom 'Async' n'existe pas dans le contexte actuel.

La méthode dans laquelle se trouve l'instruction using est définie comme asynchrone.

1voto

superlogical Points 5129

Ajouter cette méthode d'extension

public static class ForEachAsyncExtension
{
    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(from partition in Partitioner.Create(source).GetPartitions(dop) 
            select Task.Run(async delegate
            {
                using (partition)
                    while (partition.MoveNext())
                        await body(partition.Current).ConfigureAwait(false);
            }));
    }
}

Et puis utiliser comme ça :

Task.Run(async () =>
{
    var s3 = new AmazonS3Client(Config.Instance.Aws.Credentials, Config.Instance.Aws.RegionEndpoint);
    var buckets = await s3.ListBucketsAsync();

    foreach (var s3Bucket in buckets.Buckets)
    {
        if (s3Bucket.BucketName.StartsWith("mybucket-"))
        {
            log.Information("Bucket => {BucketName}", s3Bucket.BucketName);

            ListObjectsResponse objects;
            try
            {
                objects = await s3.ListObjectsAsync(s3Bucket.BucketName);
            }
            catch
            {
                log.Error("Error getting objects. Bucket => {BucketName}", s3Bucket.BucketName);
                continue;
            }

            // ForEachAsync (4 is how many tasks you want to run in parallel)
            await objects.S3Objects.ForEachAsync(4, async s3Object =>
            {
                try
                {
                    log.Information("Bucket => {BucketName} => {Key}", s3Bucket.BucketName, s3Object.Key);
                    await s3.DeleteObjectAsync(s3Bucket.BucketName, s3Object.Key);
                }
                catch
                {
                    log.Error("Error deleting bucket {BucketName} object {Key}", s3Bucket.BucketName, s3Object.Key);
                }
            });

            try
            {
                await s3.DeleteBucketAsync(s3Bucket.BucketName);
            }
            catch
            {
                log.Error("Error deleting bucket {BucketName}", s3Bucket.BucketName);
            }
        }
    }
}).Wait();

1voto

ElConrado Points 766

Si vous utilisez EntityFramework.Core il y a une méthode d'extension ForEachAsync .

L'exemple d'utilisation ressemble à ceci :

using Microsoft.EntityFrameworkCore;
using System.Threading.Tasks;

public class Example
{
    private readonly DbContext _dbContext;
    public Example(DbContext dbContext)
    {
        _dbContext = dbContext;
    }
    public async void LogicMethod()
    {

        await _dbContext.Set<dbTable>().ForEachAsync(async x =>
        {
            //logic
            await AsyncTask(x);
        });
    }

    public async Task<bool> AsyncTask(object x)
    {
        //other logic
        return await Task.FromResult<bool>(true);
    }
}

-1voto

James Jeffery Points 1620

Le problème était que le async doit apparaître avant le lambda, et non avant le corps :

db.Groups.ToList().ForEach(async (i) => {
    await GetAdminsFromGroup(i.Gid);
});

-2voto

Luk164 Points 199

Je voudrais ajouter qu'il existe un Classe parallèle avec la fonction ForEach intégrée qui peut être utilisée à cette fin.

-3voto

Shoter Points 440

C'est la méthode que j'ai créée pour gérer les scénarios asynchrones avec ForEach .

  • Si l'une des tâches échoue, les autres tâches poursuivent leur exécution.
  • Vous avez la possibilité d'ajouter une fonction qui sera exécutée à chaque exception.
  • Les exceptions sont collectées comme aggregateException à la fin et sont disponibles pour vous.
  • Peut gérer CancellationToken

    public static class ParallelExecutor { /// <summary> /// Executes asynchronously given function on all elements of given enumerable with task count restriction. /// Executor will continue starting new tasks even if one of the tasks throws. If at least one of the tasks throwed exception then <see cref="AggregateException"/> is throwed at the end of the method run. /// </summary> /// <typeparam name="T">Type of elements in enumerable</typeparam> /// <param name="maxTaskCount">The maximum task count.</param> /// <param name="enumerable">The enumerable.</param> /// <param name="asyncFunc">asynchronous function that will be executed on every element of the enumerable. MUST be thread safe.</param> /// <param name="onException">Acton that will be executed on every exception that would be thrown by asyncFunc. CAN be thread unsafe.</param> /// <param name="cancellationToken">The cancellation token.</param> public static async Task ForEachAsync<T>(int maxTaskCount, IEnumerable<T> enumerable, Func<T, Task> asyncFunc, Action<Exception> onException = null, CancellationToken cancellationToken = default) { using var semaphore = new SemaphoreSlim(initialCount: maxTaskCount, maxCount: maxTaskCount);

            // This `lockObject` is used only in `catch { }` block.
            object lockObject = new object();
            var exceptions = new List<Exception>();
            var tasks = new Task[enumerable.Count()];
            int i = 0;
    
            try
            {
                foreach (var t in enumerable)
                {
                    await semaphore.WaitAsync(cancellationToken);
                    tasks[i++] = Task.Run(
                        async () =>
                        {
                            try
                            {
                                await asyncFunc(t);
                            }
                            catch (Exception e)
                            {
                                if (onException != null)
                                {
                                    lock (lockObject)
                                    {
                                        onException.Invoke(e);
                                    }
                                }
    
                                // This exception will be swallowed here but it will be collected at the end of ForEachAsync method in order to generate AggregateException.
                                throw;
                            }
                            finally
                            {
                                semaphore.Release();
                            }
                        }, cancellationToken);
    
                    if (cancellationToken.IsCancellationRequested)
                    {
                        break;
                    }
                }
            }
            catch (OperationCanceledException e)
            {
                exceptions.Add(e);
            }
    
            foreach (var t in tasks)
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    break;
                }
    
                // Exception handling in this case is actually pretty fast.
                // https://gist.github.com/shoter/d943500eda37c7d99461ce3dace42141
                try
                {
                    await t;
                }

    pragma warning disable CA1031 // Do not catch general exception types - we want to throw that exception later as aggregate exception. Nothing wrong here.

                catch (Exception e)

    pragma warning restore CA1031 // Do not catch general exception types

                {
                    exceptions.Add(e);
                }
            }
    
            if (exceptions.Any())
            {
                throw new AggregateException(exceptions);
            }
        }
    }

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X