40 votes

Async avec d'énormes flux de données

Nous utilisons IEnumerables de retour d'énormes ensembles de données à partir de la base de données:

public IEnumerable<Data> Read(...)
{
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(reader.Read())
        {
            // ...
            yield return item;
        }
    }
}

Maintenant, nous voulons utiliser des méthodes asynchrones à faire de même. Cependant il n'y a pas de IEnumerables pour async, donc nous avons besoin de recueillir des données dans une liste jusqu'à ce que l'ensemble du jeu de données est chargé:

public async Task<List<Data>> ReadAsync(...)
{
    var result = new List<Data>();
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(await reader.ReadAsync().ConfigureAwait(false))
        {
            // ...
            result.Add(item);
        }
    }
    return result;
}

Cela permettra de consommer une énorme quantité de ressources sur le serveur, car toutes les données doivent être dans la liste avant le retour. Quel est le meilleur et facile à utiliser asynchrone alternative pour IEnumerables de travailler avec de grands flux de données? Je voudrais éviter de stocker toutes les données en mémoire pendant le traitement.

26voto

I3arnon Points 9498

L'option la plus facile est d'utiliser TPL Dataflow. Tout ce que vous devez faire est de configurer un ActionBlock qui gère le traitement (en parallèle, si vous le souhaitez) et "envoie" les éléments un par un de manière asynchrone.
Je vous suggère aussi d'un réglage d' BoundedCapacity qui va augmenter la vitesse de la lecture de lecture à partir de la base de données lorsque le traitement ne peut pas gérer la vitesse.

var block = new ActionBlock<Data>(
    data => ProcessDataAsync(data),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 1000,
        MaxDegreeOfParallelism = Environment.ProcessorCount
    });

using(var connection = new SqlConnection(...))
{
    // ...
    while(await reader.ReadAsync().ConfigureAwait(false))
    {
        // ...
       await block.SendAsync(item);
    }
}

Vous pouvez également utiliser le Réactif Extensions, mais c'est un peu plus compliqué cadre solide et que vous avez probablement besoin d'.

11voto

Simon Mourier Points 49585

La plupart du temps lorsque vous traitez avec async/await méthodes, je trouve ça plus facile à faire tourner le problème autour de, et utiliser les fonctions (Func<...>) ou des actions (Action<...>) au lieu de ad-hoc code, surtout avec IEnumerable et yield.

En d'autres termes, quand je pense à "async", j'essaie d'oublier le vieux concept de la fonction "valeur de retour" qui est par ailleurs tellement évident et qui nous sont si familières.

Par exemple, si vous changez vous de la synchronisation initiale de code dans cette (processor est le code qui va finalement faire ce que vous faites avec un seul élément de Données):

public void Read(..., Action<Data> processor)
{
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(reader.Read())
        {
            // ...
            processor(item);
        }
    }
}

Ensuite, la version asynchrone est assez simple à écrire:

public async Task ReadAsync(..., Action<Data> processor)
{
    using(var connection = new SqlConnection(...))
    {
        // note you can use connection.OpenAsync()
        // and command.ExecuteReaderAsync() here
        while(await reader.ReadAsync())
        {
            // ...
            processor(item);
        }
    }
}

Si vous pouvez changer votre code de cette façon, vous n'avez pas besoin de l'extension ou de la bibliothèque ou IAsyncEnumerable choses.

9voto

Noseratio Points 23840

Cela permettra de consommer une énorme quantité de ressources sur le serveur, parce que tous les les données doivent être dans la liste avant le retour. Quel est le meilleur et facile à utiliser asynchrone alternative pour IEnumerables de travailler avec des données de grande taille flux? Je voudrais éviter de stocker toutes les données en mémoire pendant la transformation.

Si vous ne voulez pas envoyer toutes les données du client à la fois, vous pouvez envisager d'utiliser des Reactive Extensions (Rx) (sur le client) et SignalR (à la fois client et serveur) pour gérer cela.

SignalR permettrait d'envoyer des données au client de façon asynchrone. Rx permettrait d'appliquer LINQ to asynchrone séquence d'éléments de données qu'ils sont arrivés sur le client. Ce serait toutefois changer l'ensemble du modèle de code des vous application client-serveur.

Exemple (un post de blog de Samuel Jack):

Liés à la question (si ce n'est un doublon):

7voto

Ned Stoyanov Points 3809

Comme certains des autres affiches ont mentionné cela peut être mis en œuvre avec des Rx. Avec Rx la fonction retournera un IObservable<Data> qui peut être souscrit et il pousse des données de l'abonné qu'il devient disponible. IObservable prend également en charge LINQ et ajoute quelques méthodes d'extension de son propre.

Mise à jour

J'ai ajouté un couple de génériques méthodes d'aide à rendre l'utilisation du lecteur réutilisables ainsi que le soutien pour l'annulation.

public static class ObservableEx
    {
        public static IObservable<T> CreateFromSqlCommand<T>(string connectionString, string command, Func<SqlDataReader, Task<T>> readDataFunc)
        {
            return CreateFromSqlCommand(connectionString, command, readDataFunc, CancellationToken.None);
        }

        public static IObservable<T> CreateFromSqlCommand<T>(string connectionString, string command, Func<SqlDataReader, Task<T>> readDataFunc, CancellationToken cancellationToken)
        {
            return Observable.Create<T>(
                async o =>
                {
                    SqlDataReader reader = null;

                    try
                    {                        
                        using (var conn = new SqlConnection(connectionString))
                        using (var cmd = new SqlCommand(command, conn))
                        {
                            await conn.OpenAsync(cancellationToken);
                            reader = await cmd.ExecuteReaderAsync(CommandBehavior.CloseConnection, cancellationToken);

                            while (await reader.ReadAsync(cancellationToken))
                            {
                                var data = await readDataFunc(reader);
                                o.OnNext(data);                                
                            }

                            o.OnCompleted();
                        }
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }

                    return reader;
                });
        }
    }

La mise en œuvre de l' ReadData est maintenant grandement simplifiée.

     private static IObservable<Data> ReadData()
    {
        return ObservableEx.CreateFromSqlCommand(connectionString, "select * from Data", async r =>
        {
            return await Task.FromResult(new Data()); // sample code to read from reader.
        });
    }

L'utilisation de la

Vous pouvez vous abonner à la Observables en lui donnant un IObserver mais il y a aussi des surcharges qui prennent les lambdas. Que les données deviennent disponibles, l' OnNext de rappel est appelée. Si il y a une exception, l' OnError de rappel est appelée. Enfin, si il n'y a pas plus de données OnCompleted de rappel est appelée.

Si vous voulez annuler l'observable, il suffit de disposer de l'abonnement.

void Main()
{
   // This is an asyncrhonous call, it returns straight away
    var subscription = ReadData()
        .Skip(5)                        // Skip first 5 entries, supports LINQ               
        .Delay(TimeSpan.FromSeconds(1)) // Rx operator to delay sequence 1 second
        .Subscribe(x =>
    {
        // Callback when a new Data is read
        // do something with x of type Data
    },
    e =>
    {
        // Optional callback for when an error occurs
    },
    () =>
    {
        //Optional callback for when the sequenc is complete
    }
    );

    // Dispose subscription when finished
    subscription.Dispose();

    Console.ReadKey();
}

2voto

glopes Points 90

Je pense que Rx est certainement le chemin à parcourir dans ce scénario, étant donné observer une séquence est le formelle double d'une énumération un.

Comme mentionné dans une précédente réponse, vous pouvez réécrire votre séquence comme un observable à partir de zéro, mais il ya aussi un couple de façons de garder la rédaction de votre itérateur blocs, mais alors tout simplement se détendre entre eux de manière asynchrone.

1) il suffit de convertir le énumérable d'une observable comme suit:

using System.Reactive.Linq;
using System.Reactive.Concurrency;

var enumerable = Enumerable.Range(10);
var observable = enumerable.ToObservable();
var subscription = observable.Subscribe(x => Console.WriteLine(x));

Cela rendra votre énumérable se comporter comme un observable en poussant ses notifications en toute en aval des observateurs. Dans ce cas, lorsque vous Abonner est appelée, elle synchrone bloc jusqu'à ce que toutes les données ont été traitées. Si vous voulez qu'il soit totalement asynchrone, vous pouvez le configurer pour un thread différent, en utilisant:

var observable = enumerable.ToObservable().SubscribeOn(NewThreadScheduler.Default);

Maintenant, le déroulement de l'énumérable sera fait dans un nouveau thread et la méthode subscribe sera de retour immédiatement.

2) Déroulez le énumérable l'aide d'un autre asynchrone source de l'événement:

var enumerable = Enumerable.Range(10);
var observable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
                           .Zip(enumerable, (t, x) => x);
var subscription = observable.Subscribe(x => Console.WriteLine(x));

Dans ce cas, j'ai configuré un compte à rebours pour le feu chaque seconde et chaque fois qu'il tire, il déplace l'itérateur de l'avant. Maintenant, le minuteur peut être facilement remplacé par une source de l'événement afin de contrôler exactement quand l'itérateur se déplace vers l'avant.

Je me retrouve en profitant de la syntaxe et de la sémantique de l'itérateur blocs (par exemple ce qui se passe avec try/finally blocs et d'en disposer), j'ai donc utiliser ces conceptions, parfois même lors de la conception des opérations asynchrones.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X