Node.js将相同的可读streampipe道化成多个(可写)目标

我需要运行两个需要从同一个stream中读取数据的命令。 在将一个stream传输到另一个stream之后,缓冲区被清空,所以我无法再从该stream读取数据,所以这不起作用:

var spawn = require('child_process').spawn; var fs = require('fs'); var request = require('request'); var inputStream = request('http://placehold.it/640x360'); var identify = spawn('identify',['-']); inputStream.pipe(identify.stdin); var chunks = []; identify.stdout.on('data',function(chunk) { chunks.push(chunk); }); identify.stdout.on('end',function() { var size = getSize(Buffer.concat(chunks)); //width var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']); inputStream.pipe(convert.stdin); convert.stdout.pipe(fs.createWriteStream('half.png')); }); function getSize(buffer){ return parseInt(buffer.toString().split(' ')[2].split('x')[0]); } 

要求抱怨这个

 Error: You cannot pipe after data has been emitted from the response. 

并改变inputStreamfs.createWriteStream当然产生相同的问题。 我不想写入文件,而是以某种方式重用 请求产生的stream(或任何其他)。

有一种方法可以重复使用一个可读的stream一旦完成pipe道? 完成类似上述例子的最好方法是什么?

您不能重新使用已经发送的pipe道数据。 而且在“结束”之后你不能pipestream。 所以你不能两次处理同一个stream,并且需要两个stream。 您必须通过将streampipe道到两个stream来创buildstream的重复。 您可以使用PassThroughstream创build一个简单的stream,它只是将input传递给输出。

 spawn = require('child_process').spawn; pass = require('stream').PassThrough; a = spawn('echo', ['hi user']); b = new pass; c = new pass; a.stdout.pipe(b); a.stdout.pipe(c); count = 0; b.on('data', function(chunk) { count += chunk.length; }); b.on('end', function() { console.log(count); c.pipe(process.stdout); }); 

产量

 8 hi user 

第一个答案只有在stream处理数据的时间大致相同时才有效。 如果花费的时间更长,则更快的请求新数据,从而覆盖仍然被较慢数据使用的数据(在使用重复stream尝试解决该问题后,我遇到了这个问题)。

以下模式对我来说工作得非常好。 它使用基于Stream2stream,Streamz和Promises的库来通过callback同步asynchronousstream。 使用第一个答案中熟悉的例子:

 spawn = require('child_process').spawn; pass = require('stream').PassThrough; streamz = require('streamz').PassThrough; var Promise = require('bluebird'); a = spawn('echo', ['hi user']); b = new pass; c = new pass; a.stdout.pipe(streamz(combineStreamOperations)); function combineStreamOperations(data, next){ Promise.join(b, c, function(b, c){ //perform n operations on the same data next(); //request more } count = 0; b.on('data', function(chunk) { count += chunk.length; }); b.on('end', function() { console.log(count); c.pipe(process.stdout); }); 

对于一般问题,下面的代码工作正常

 var PassThrough = require('stream').PassThrough a=PassThrough() b1=PassThrough() b2=PassThrough() a.pipe(b1) a.pipe(b2) b1.on('data', function(data) { console.log('b1:', data.toString()) }) b2.on('data', function(data) { console.log('b2:', data.toString()) }) a.write('text') 

怎么处理两个或更多的stream不是在同一时间?

例如 :

 var PassThrough = require('stream').PassThrough; var mybiraryStream = stream.start(); //never ending audio stream var file1 = fs.createWriteStream('file1.wav',{encoding:'binary'}) var file2 = fs.createWriteStream('file2.wav',{encoding:'binary'}) var mypass = PassThrough mybinaryStream.pipe(mypass) mypass.pipe(file1) setTimeout(function(){ mypass.pipe(file2); },2000) 

上面的代码不会产生任何错误,但是file2是空的

我有一个不同的解决scheme来同时写入两个stream,自然,写入的时间将是两次的添加,但我用它来响应下载请求,在那里我想保留下载的文件的副本上我的服务器(实际上我使用S3备份,所以我在本地caching最常用的文件,以避免多个文件传输)

 /** * A utility class made to write to a file while answering a file download request */ class TwoOutputStreams { constructor(streamOne, streamTwo) { this.streamOne = streamOne this.streamTwo = streamTwo } setHeader(header, value) { if (this.streamOne.setHeader) this.streamOne.setHeader(header, value) if (this.streamTwo.setHeader) this.streamTwo.setHeader(header, value) } write(chunk) { this.streamOne.write(chunk) this.streamTwo.write(chunk) } end() { this.streamOne.end() this.streamTwo.end() } } 

然后,您可以将其用作常规的OutputStream

 const twoStreamsOut = new TwoOutputStreams(fileOut, responseStream) 

并传递给你的方法,就好像它是一个响应或fileOutputStream