47 votes

Lecture/écriture de flux asynchrone .NET

J'ai essayé de résoudre cet exercice d'examen "Programmation concurrente" (en C#) :

Sachant que Stream la classe contient int Read(byte[] buffer, int offset, int size) y void Write(byte[] buffer, int offset, int size) implémenter en C# les méthodes NetToFile qui copie toutes les données reçues de NetworkStream net à l'instance FileStream file instance. Pour effectuer le transfert, utilisez des lectures asynchrones et des écritures synchrones, en évitant qu'un thread soit bloqué pendant les opérations de lecture. Le transfert se termine lorsque le net L'opération de lecture renvoie la valeur 0. Pour simplifier, il n'est pas nécessaire de prendre en charge l'annulation contrôlée de l'opération.

void NetToFile(NetworkStream net, FileStream file);

J'ai essayé de résoudre cet exercice, mais j'ai du mal à répondre à une question liée à la question elle-même. Mais d'abord, voici mon code :

public static void NetToFile(NetworkStream net, FileStream file) {
    byte[] buffer = new byte[4096]; // buffer with 4 kB dimension
    int offset = 0; // read/write offset
    int nBytesRead = 0; // number of bytes read on each cycle

    IAsyncResult ar;
    do {
        // read partial content of net (asynchronously)
        ar = net.BeginRead(buffer,offset,buffer.Length,null,null);
        // wait until read is completed
        ar.AsyncWaitHandle.WaitOne();
        // get number of bytes read on each cycle
        nBytesRead = net.EndRead(ar);

        // write partial content to file (synchronously)
        fs.Write(buffer,offset,nBytesRead);
        // update offset
        offset += nBytesRead;
    }
    while( nBytesRead > 0);
}

La question que je me pose est que, dans l'énoncé de la question, il est dit :

Pour effectuer le transfert, utilisez la méthode asynchrone. lectures asynchrones et des écritures synchrones, en évitant qu'un thread soit bloqué pendant les opérations

Je ne suis pas vraiment sûr que ma solution accomplisse ce qui est recherché dans cet exercice, parce que j'utilise AsyncWaitHandle.WaitOne() pour attendre que la lecture asynchrone soit terminée.

D'un autre côté, je ne comprends pas vraiment ce que l'on entend par solution "non bloquante" dans ce scénario, étant donné que la fonction FileStream write est censé être fait de manière synchrone... et pour cela, je dois attendre que NetworkStream la lecture est terminée pour procéder à la FileStream l'écriture, n'est-ce pas ?

Pouvez-vous, s'il vous plaît, m'aider avec ça ? Merci d'avance pour votre collaboration.


[EDIT 1] Utilisation de rappel solution

Ok, si j'ai compris ce que Mitchel Sellers y willvv J'ai reçu une réponse (merci les gars), on m'a conseillé d'utiliser une méthode de rappel pour transformer cela en une solution "non bloquante". Voici donc mon code...

byte[] buffer; // buffer

public static void NetToFile(NetworkStream net, FileStream file) {
    // buffer with same dimension as file stream data
    buffer = new byte[file.Length];
    //start asynchronous read
    net.BeginRead(buffer,0,buffer.Length,OnEndRead,net);
}

//asynchronous callback
static void OnEndRead(IAsyncResult ar) {
    //NetworkStream retrieve
    NetworkStream net = (NetworkStream) ar.IAsyncState;
    //get number of bytes read
    int nBytesRead = net.EndRead(ar);

    //write content to file
    //... and now, how do I write to FileStream instance without
    //having its reference??
    //fs.Write(buffer,0,nBytesRead);
}

Comme vous l'avez peut-être remarqué, je suis bloqué au niveau de la méthode de rappel, car je n'ai pas de référence à l'objet de l'appel. FileStream où je veux invoquer la méthode "Write(...)".

En outre, il ne s'agit pas d'une solution sûre du point de vue des threads, car la fonction byte[] est exposé et peut être partagé entre des utilisateurs concurrents NetToFile invocations. Je ne sais pas comment résoudre ce problème sans exposer cette byte[] dans l'outer-scope... et je suis presque sûr qu'il ne peut pas être exposé de cette façon.

Je ne veux pas utiliser une solution de méthode lambda ou anonyme, car cela ne fait pas partie du programme du cours de "programmation simultanée".

52voto

Nicholas Carey Points 24614

Même si cela va à l'encontre de l'idée d'aider les gens à faire leurs devoirs, étant donné que cela date de plus d'un an, voici la bonne façon de procéder. Tout ce dont vous avez besoin chevauchement vos opérations de lecture/écriture - aucune création de threads supplémentaires, ni rien d'autre n'est nécessaire.

public static class StreamExtensions
{
    private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767
    public static void CopyTo( this Stream input , Stream output )
    {
        input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ;
        return ;
    }
    public static void CopyTo( this Stream input , Stream output , int bufferSize )
    {
        if ( !input.CanRead ) throw new InvalidOperationException(   "input must be open for reading"  );
        if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );

        byte[][]     buf   = { new byte[bufferSize] , new byte[bufferSize] } ;
        int[]        bufl  = { 0 , 0 }                                       ;
        int          bufno = 0 ;
        IAsyncResult read  = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
        IAsyncResult write = null ;

        while ( true )
        {

            // wait for the read operation to complete
            read.AsyncWaitHandle.WaitOne() ; 
            bufl[bufno] = input.EndRead(read) ;

            // if zero bytes read, the copy is complete
            if ( bufl[bufno] == 0 )
            {
                break ;
            }

            // wait for the in-flight write operation, if one exists, to complete
            // the only time one won't exist is after the very first read operation completes
            if ( write != null )
            {
                write.AsyncWaitHandle.WaitOne() ;
                output.EndWrite(write) ;
            }

            // start the new write operation
            write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ;

            // toggle the current, in-use buffer
            // and start the read operation on the new buffer.
            //
            // Changed to use XOR to toggle between 0 and 1.
            // A little speedier than using a ternary expression.
            bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ;
            read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;

        }

        // wait for the final in-flight write operation, if one exists, to complete
        // the only time one won't exist is if the input stream is empty.
        if ( write != null )
        {
            write.AsyncWaitHandle.WaitOne() ;
            output.EndWrite(write) ;
        }

        output.Flush() ;

        // return to the caller ;
        return ;
    }
}

A la vôtre.

18voto

John Leidegren Points 21951

Je doute que ce soit le code le plus rapide (il y a une certaine surcharge due à l'abstraction des tâches .NET) mais je pense que c'est une nettoyant de l'approche de la copie asynchrone.

J'avais besoin d'un CopyTransformAsync où je pourrais passer un délégué pour faire quelque chose lorsque les morceaux sont passés par l'opération de copie, par exemple calculer un résumé de message pendant la copie. C'est pourquoi je me suis intéressé à l'élaboration de ma propre option.

Les résultats :

  • CopyToAsync bufferSize est sensible (un grand tampon est requis)
  • FileOptions.Asynchronous -> terriblement lent (je ne sais pas exactement pourquoi)
  • Le bufferSize des objets FileStream peut être plus petit (ce n'est pas très important).
  • Le site Serial est sans doute le test le plus rapide et le plus gourmand en ressources.

Voici ce que j'ai trouvé et le code source complet pour le programme que j'ai utilisé pour tester ceci. Sur ma machine, ces tests ont été effectués sur un disque SSD et c'est l'équivalent d'une copie de fichier. Normalement, vous ne voudriez pas l'utiliser pour copier simplement des fichiers, mais plutôt lorsque vous avez un flux réseau (ce qui est mon cas), c'est là que vous voudriez utiliser quelque chose comme ça.

4K buffer

Serial...                                in 0.474s
CopyToAsync...                           timed out
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    timed out
CopyTransformAsync (Asynchronous)...     timed out

8K buffer

Serial...                                in 0.344s
CopyToAsync...                           timed out
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    in 1.116s
CopyTransformAsync (Asynchronous)...     timed out

40K buffer

Serial...                                in 0.195s
CopyToAsync...                           in 0.624s
CopyToAsync (Asynchronous)...            timed out
CopyTransformAsync...                    in 0.378s
CopyTransformAsync (Asynchronous)...     timed out

80K buffer

Serial...                                in 0.190s
CopyToAsync...                           in 0.355s
CopyToAsync (Asynchronous)...            in 1.196s
CopyTransformAsync...                    in 0.300s
CopyTransformAsync (Asynchronous)...     in 0.886s

160K buffer

Serial...                                in 0.432s
CopyToAsync...                           in 0.252s
CopyToAsync (Asynchronous)...            in 0.454s
CopyTransformAsync...                    in 0.447s
CopyTransformAsync (Asynchronous)...     in 0.555s

Vous pouvez voir ici le graphique de performance de Process Explorer pendant l'exécution du test. En fait, chaque top (dans le plus bas des trois graphiques) est le début du test en série. Vous pouvez clairement voir comment le débit augmente de façon spectaculaire à mesure que la taille du tampon augmente. Il semblerait qu'il soit planifié quelque part autour de 80K, ce qui est ce que le framework .NET CopyToAsync utilise la méthode, en interne.

Performance Graph

Ce qui est bien ici, c'est que la mise en œuvre finale n'était pas si compliquée :

static Task CompletedTask = ((Task)Task.FromResult(0));
static async Task CopyTransformAsync(Stream inputStream
    , Stream outputStream
    , Func<ArraySegment<byte>, ArraySegment<byte>> transform = null
    )
{
    var temp = new byte[bufferSize];
    var temp2 = new byte[bufferSize];

    int i = 0;

    var readTask = inputStream
        .ReadAsync(temp, 0, bufferSize)
        .ConfigureAwait(false);

    var writeTask = CompletedTask.ConfigureAwait(false);

    for (; ; )
    {
        // synchronize read
        int read = await readTask;
        if (read == 0)
        {
            break;
        }

        if (i++ > 0)
        {
            // synchronize write
            await writeTask;
        }

        var chunk = new ArraySegment<byte>(temp, 0, read);

        // do transform (if any)
        if (!(transform == null))
        {
            chunk = transform(chunk);
        }

        // queue write
        writeTask = outputStream
            .WriteAsync(chunk.Array, chunk.Offset, chunk.Count)
            .ConfigureAwait(false);

        // queue read
        readTask = inputStream
            .ReadAsync(temp2, 0, bufferSize)
            .ConfigureAwait(false);

        // swap buffer
        var temp3 = temp;
        temp = temp2;
        temp2 = temp3;
    }

    await writeTask; // complete any lingering write task
}

Cette méthode d'entrelacement des lectures/écritures malgré les énormes tampons est quelque part entre 18% plus rapide que la BCL CopyToAsync .

Par curiosité, j'ai remplacé les appels asynchrones par des appels asynchrones typiques de type début/fin et cela n'a pas amélioré la situation, mais l'a empirée. Malgré tout ce que j'aime dire sur l'abstraction des tâches, elles font des choses intéressantes lorsque vous écrivez votre code avec les mots-clés async/await et il est beaucoup plus agréable de lire ce code !

12voto

bendewey Points 25437

Vous allez devoir utiliser le rappel de la lecture de NetStream pour gérer cela. Et franchement, il pourrait être plus facile d'envelopper la logique de copie dans sa propre classe afin de pouvoir maintenir l'instance des flux actifs.

Voici comment j'aborderais la question (non testé) :

public class Assignment1
{
    public static void NetToFile(NetworkStream net, FileStream file) 
    {
        var copier = new AsyncStreamCopier(net, file);
        copier.Start();
    }

    public static void NetToFile_Option2(NetworkStream net, FileStream file) 
    {
        var completedEvent = new ManualResetEvent(false);

        // copy as usual but listen for completion
        var copier = new AsyncStreamCopier(net, file);
        copier.Completed += (s, e) => completedEvent.Set();
        copier.Start();

        completedEvent.WaitOne();
    }

    /// <summary>
    /// The Async Copier class reads the input Stream Async and writes Synchronously
    /// </summary>
    public class AsyncStreamCopier
    {
        public event EventHandler Completed;

        private readonly Stream input;
        private readonly Stream output;

        private byte[] buffer = new byte[4096];

        public AsyncStreamCopier(Stream input, Stream output)
        {
            this.input = input;
            this.output = output;
        }

        public void Start()
        {
            GetNextChunk();
        }

        private void GetNextChunk()
        {
            input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null);
        }

        private void InputReadComplete(IAsyncResult ar)
        {
            // input read asynchronously completed
            int bytesRead = input.EndRead(ar);

            if (bytesRead == 0)
            {
                RaiseCompleted();
                return;
            }

            // write synchronously
            output.Write(buffer, 0, bytesRead);

            // get next
            GetNextChunk();
        }

        private void RaiseCompleted()
        {
            if (Completed != null)
            {
                Completed(this, EventArgs.Empty);
            }
        }
    }
}

11voto

Kipp Points 404

Wow, elles sont toutes très complexes ! Voici ma solution asynchrone, et c'est juste une fonction. Les fonctions Read() et BeginWrite() s'exécutent en même temps.

/// <summary>
/// Copies a stream.
/// </summary>
/// <param name="source">The stream containing the source data.</param>
/// <param name="target">The stream that will receive the source data.</param>
/// <remarks>
/// This function copies until no more can be read from the stream
///  and does not close the stream when done.<br/>
/// Read and write are performed simultaneously to improve throughput.<br/>
/// If no data can be read for 60 seconds, the copy will time-out.
/// </remarks>
public static void CopyStream(Stream source, Stream target)
{
    // This stream copy supports a source-read happening at the same time
    // as target-write.  A simpler implementation would be to use just
    // Write() instead of BeginWrite(), at the cost of speed.

    byte[] readbuffer = new byte[4096];
    byte[] writebuffer = new byte[4096];
    IAsyncResult asyncResult = null;

    for (; ; )
    {
        // Read data into the readbuffer.  The previous call to BeginWrite, if any,
        //  is executing in the background..
        int read = source.Read(readbuffer, 0, readbuffer.Length);

        // Ok, we have read some data and we're ready to write it, so wait here
        //  to make sure that the previous write is done before we write again.
        if (asyncResult != null)
        {
            // This should work down to ~0.01kb/sec
            asyncResult.AsyncWaitHandle.WaitOne(60000);
            target.EndWrite(asyncResult); // Last step to the 'write'.
            if (!asyncResult.IsCompleted) // Make sure the write really completed.
                throw new IOException("Stream write failed.");
        }

        if (read <= 0)
            return; // source stream says we're done - nothing else to read.

        // Swap the read and write buffers so we can write what we read, and we can
        //  use the then use the other buffer for our next read.
        byte[] tbuf = writebuffer;
        writebuffer = readbuffer;
        readbuffer = tbuf;

        // Asynchronously write the data, asyncResult.AsyncWaitHandle will
        // be set when done.
        asyncResult = target.BeginWrite(writebuffer, 0, read, null, null);
    }
}

9voto

Shrike Points 2594

C'est étrange que personne n'ait mentionné TPL.
Ici Le très bon article de l'équipe PFX (Stephen Toub) sur l'implémentation de la copie de flux asynchrone simultanée. L'article contient des références périmées à des exemples, voici donc la référence actuelle :
Obtenez Extensions parallèles Extras de code.msdn puis

var task = sourceStream.CopyStreamToStreamAsync(destinationStream);
// do what you want with the task, for example wait when it finishes:
task.Wait();

Pensez également à utiliser la méthode de J.Richer AsyncEnumerator .

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X