RxJS:我将如何“手动”更新Observable?

我认为我必须误解一些基本的东西,因为在我看来,这应该是一个可观察的最基本的情况,但是对于我的生活,我无法弄清楚如何从文档中做到这一点。

基本上,我想能够做到这一点:

// create a dummy observable, which I would update manually var eventObservable = rx.Observable.create(function(observer){}); var observer = eventObservable.subscribe( function(x){ console.log('next: ' + x); } ... var my_function = function(){ eventObservable.push('foo'); //'push' adds an event to the datastream, the observer gets it and prints // next: foo } 

但是我一直没能find像push这样的方法。 我使用这个为一个点击处理程序,我知道他们有Observable.fromEvent的,但我试图用它与React,我宁愿能够简单地更新数据stream在callback,而不是使用一个完全不同的事件处理系统。 所以基本上我想这个:

 $( "#target" ).click(function(e) { eventObservable.push(e.target.text()); }); 

我得到的最接近的是使用observer.onNext('foo') ,但这似乎并没有实际的工作,这是在观察员,这似乎并不正确。 观察者应该是对数据stream作出反应的东西,而不是改变它,对吗?

我只是不理解观察者/可观察的关系吗?

在RX中,Observer和Observable是不同的实体。 观察者订阅Observable。 Observable通过调用观察者的方法向观察者发射物品。 如果您需要调用Observable.create()范围以外的观察者方法,则可以使用Subject,它是一个代理,同时充当观察者和Observable。

你可以这样做:

 var eventStream = new Rx.Subject(); var subscription = eventStream.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); var my_function = function() { eventStream.onNext('foo'); } 

你可以在这里find更多关于科目的信息:

我相信Observable.create()不会将观察者视为callback参数,而是一个发射器。 所以,如果你想添加一个新的值到你的Observable,请尝试这个:

 var emitter; var observable = Rx.Observable.create(e => emitter = e); var observer = { next: function(next) { console.log(next); }, error: function(error) { console.log(error); }, complete: function() { console.log("done"); } } observable.subscribe(observer); emitter.next('foo'); emitter.next('bar'); emitter.next('baz'); emitter.complete(); //console output //"foo" //"bar" //"baz" //"done" 

是Subject使得它更简单,在同一个对象中提供Observable和Observer,但它不完全相同,因为Subject允许您在多个观察者只向最后订阅的观察者发送数据时订阅多个观察者到同一个观察者,所以有意识地使用它。 这是一个JsBin,如果你想修补它。

Interesting Posts