Node.js数据stream与观测数据

了解了Observables之后 ,我发现它们与Node.jsstream非常相似。 每当有新数据到达时,都有通知消费者的机制,发生错误或没有更多的数据(EOF)。

我很想了解两者之间的概念/function差异。 谢谢!

Observable和node.js的Streams都可以解决相同的底层问题:asynchronous处理一系列值。 我认为,两者之间的主要区别在于促成其外观的背景。 这一背景反映在术语和API中。

在Obserbles方面,你有一个EcmaScript的扩展,引入了被动编程模型。 它试图填补ObserverObserver的极简主义和可组合概念之间的价值差异和asynchronous性之间的差距。

在node.js和Streams端,您希望为networkingstream和本地文件的asynchronous和高性能处理创build一个接口。 术语来源于最初的上下文,你可以得到pipechunkencodingflushDuplexBuffer等等。通过一个实用的方法,为特定的用例提供明确的支持,你就失去了编写一些东西的能力,因为它不是统一的。 例如,在Readablestream上使用push ,并在WritableWritable尽pipe在概念上,您正在做同样的事情:发布一个值。

因此,在实践中,如果您查看这些概念,并且使用了{ objectMode: true }选项,则可以将ObservableReadablestream和ObserverWritablestream进行匹配。 你甚至可以在两个模型之间创build一些简单的适配器。

 var Readable = require('stream').Readable; var Writable = require('stream').Writable; var util = require('util'); var Observable = function(subscriber) { this.subscribe = subscriber; } var Subscription = function(unsubscribe) { this.unsubscribe = unsubscribe; } Observable.fromReadable = function(readable) { return new Observable(function(observer) { function nop() {}; var nextFn = observer.next ? observer.next.bind(observer) : nop; var returnFn = observer.return ? observer.return.bind(observer) : nop; var throwFn = observer.throw ? observer.throw.bind(observer) : nop; readable.on('data', nextFn); readable.on('end', returnFn); readable.on('error', throwFn); return new Subscription(function() { readable.removeListener('data', nextFn); readable.removeListener('end', returnFn); readable.removeListener('error', throwFn); }); }); } var Observer = function(handlers) { function nop() {}; this.next = handlers.next || nop; this.return = handlers.return || nop; this.throw = handlers.throw || nop; } Observer.fromWritable = function(writable, shouldEnd, throwFn) { return new Observer({ next: writable.write.bind(writable), return: shouldEnd ? writable.end.bind(writable) : function() {}, throw: throwFn }); } 

您可能已经注意到,我更改了一些名称,并使用了ObserverSubscription的简单概念(在这里介绍),以避免由GeneratorObservable完成的权限过载。 基本上, Subscription允许您取消订阅他Observable 。 无论如何,用上面的代码,你可以有一个pipe

 Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout)); 

process.stdin.pipe(process.stdout)相比,您拥有的是组合,过滤和转换也适用于其他任何数据序列的stream的方法。 您可以使用ReadableTransformWritablestream实现它,但是API支持子类化,而不是链接Readable和应用函数。 在Observable模型上,例如,变换值对应于对stream应用变换函数。 它不需要Transform的新子types。

 Observable.just = function(/*... arguments*/) { var values = arguments; return new Observable(function(observer) { [].forEach.call(values, function(value) { observer.next(value); }); observer.return(); return new Subscription(function() {}); }); }; Observable.prototype.transform = function(transformer) { var source = this; return new Observable(function(observer) { return source.subscribe({ next: function(v) { observer.next(transformer(v)); }, return: observer.return.bind(observer), throw: observer.throw.bind(observer) }); }); }; Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify) .subscribe(Observer.fromWritable(process.stdout)) 

结论? 很容易在任何地方引入反应模型和Observable概念。 围绕这个概念实施整个图书馆是比较困难的。 所有这些小function都需要一致的工作。 毕竟, ReactiveX项目仍在继续。 但是,如果你真的需要将文件内容发送到客户端,请处理编码,然后在NodeJS中对其进行压缩,然后将其压缩,然后运行得非常好。