102 votes

Node.js Piping the same readable stream into multiple (writable) targets

J'ai besoin d'exécuter deux commandes en série qui doivent lire des données à partir du même flux. Après avoir redirigé un flux dans un autre, le tampon est vidé, donc je ne peux pas lire à nouveau des données à partir de ce flux, donc cela ne fonctionne pas :

var spawn = require('child_process').spawn;
var fs = require('fs');
var request = require('request');

var inputStream = request('http://placehold.it/640x360');
var identify = spawn('identify',['-']);

inputStream.pipe(identify.stdin);

var chunks = [];
identify.stdout.on('data',function(chunk) {
  chunks.push(chunk);
});

identify.stdout.on('end',function() {
  var size = getSize(Buffer.concat(chunks)); //width
  var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']);
  inputStream.pipe(convert.stdin);
  convert.stdout.pipe(fs.createWriteStream('half.png'));
});

function getSize(buffer){
  return parseInt(buffer.toString().split(' ')[2].split('x')[0]);
}

Request se plaint de cela

Erreur : Vous ne pouvez pas rediriger après que des données aient été émises de la réponse.

et changer le inputStream en fs.createWriteStream entraîne bien sûr le même problème. Je ne veux pas écrire dans un fichier mais réutiliser en quelque sorte le flux que request produit (ou tout autre d'ailleurs).

Existe-t-il un moyen de réutiliser un flux lisible une fois qu'il a fini d'être redirigé ? Quel serait le meilleur moyen d'accomplir quelque chose comme l'exemple ci-dessus ?

99voto

user568109 Points 21253

Vous devez créer une copie du flux en le renvoyant vers deux flux. Vous pouvez créer un flux simple avec un flux PassThrough, il transfère simplement l'entrée à la sortie.

const spawn = require('child_process').spawn;
const PassThrough = require('stream').PassThrough;

const a = spawn('echo', ['hi user']);
const b = new PassThrough();
const c = new PassThrough();

a.stdout.pipe(b);
a.stdout.pipe(c);

let count = 0;
b.on('data', function (chunk) {
  count += chunk.length;
});
b.on('end', function () {
  console.log(count);
  c.pipe(process.stdout);
});

Sortie:

8
hi user

12voto

artikas Points 198

La première réponse ne fonctionne que si les flux prennent à peu près le même temps pour traiter les données. Si l'un prend significativement plus de temps, celui qui est plus rapide demandera de nouvelles données, écrasant ainsi les données encore utilisées par le plus lent (j'ai eu ce problème après avoir essayé de le résoudre en utilisant un double flux).

Le schéma suivant a très bien fonctionné pour moi. Il utilise une bibliothèque basée sur les flux Stream2, Streamz, et des Promises pour synchroniser les flux asynchrones via un rappel. En utilisant l'exemple familier de la première réponse :

spawn = require('child_process').spawn;
pass = require('stream').PassThrough;
streamz = require('streamz').PassThrough;
var Promise = require('bluebird');

a = spawn('echo', ['hi user']);
b = new pass;
c = new pass;   

a.stdout.pipe(streamz(combineStreamOperations)); 

function combineStreamOperations(data, next){
  Promise.join(b, c, function(b, c){ //effectuer n opérations sur les mêmes données
  next(); //demander plus de données
}

count = 0;
b.on('data', function(chunk) { count += chunk.length; });
b.on('end', function() { console.log(count); c.pipe(process.stdout); });

6voto

levansuper Points 233

Vous pouvez utiliser ce petit package npm que j'ai créé :

readable-stream-clone

Avec cela, vous pouvez réutiliser des flux lisibles autant de fois que vous le souhaitez

5voto

Jake Points 457

Pour un problème général, le code suivant fonctionne bien

var PassThrough = require('stream').PassThrough
a=PassThrough()
b1=PassThrough()
b2=PassThrough()
a.pipe(b1)
a.pipe(b2)
b1.on('data', function(data) {
  console.log('b1:', data.toString())
})
b2.on('data', function(data) {
  console.log('b2:', data.toString())
})
a.write('text')

2voto

Juan Points 619

Si vous avez des opérations asynchrones sur les flux PassThrough, les réponses postées ici ne fonctionneront pas. Une solution qui fonctionne pour les opérations asynchrones consiste à mettre en mémoire tampon le contenu du flux, puis à créer des flux à partir du résultat mis en mémoire tampon.

  1. Pour mettre en mémoire tampon le résultat, vous pouvez utiliser concat-stream

    const Promise = require('bluebird');
    const concat = require('concat-stream');
    const getBuffer = function(stream){
        return new Promise(function(resolve, reject){
            var gotBuffer = function(buffer){
                resolve(buffer);
            }
            var concatStream = concat(gotBuffer);
            stream.on('error', reject);
            stream.pipe(concatStream);
        });
    }
  2. Pour créer des flux à partir du tampon, vous pouvez utiliser:

    const { Readable } = require('stream');
    const getBufferStream = function(buffer){
        const stream = new Readable();
        stream.push(buffer);
        stream.push(null);
        return Promise.resolve(stream);
    }

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X