可观察vs可stream动rxJava2

我一直在寻找新的Rx的Java 2,我不太确定我理解backpressure的想法了…

我知道,我们有Observable没有backpressure支持和Flowable有它。

所以基于实例,可以说我有flowableinterval

  Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { // do smth } }); 

这将会在128个值后崩溃,这很明显,我比消费项目慢。

但是我们跟Observable

  Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { // do smth } }); 

即使我在延迟使用它仍然有效时,这一切都不会崩溃。 为了使onBackpressureDrop工作可以说我把onBackpressureDrop操作符,崩溃消失,但并不是所有的价值都发出。

所以我现在头脑中找不到答案的基本问题是为什么我应该关心backpressure时候我可以使用普通的Observable仍然接收所有的值而不pipe理buffer ? 或者从另一方面来说, backpressure什么好处可以帮助我pipe理和处理消费?

什么背压在实践中体现的是有界的缓冲区, Flowable.observeOn有128个元素的缓冲区,可以像下游stream一样快地排空。 您可以单独增加缓冲区大小来处理突发源,所有背压pipe理实践仍然适用于1.x. Observable.observeOn有一个无限制的缓冲区,不断收集元素,你的应用程序可能会耗尽内存。

你可以使用Observable例如:

  • 处理GUI事件
  • 使用短序列(总共less于1000个元素)

你可以使用Flowable例如:

  • 冷和非定时来源
  • 发电机像来源
  • networking和数据库访问器

背压是指当你的观察者(出版商)创造的事件多于你的订阅者可以处理的时候。 所以你可以让订阅者失去事件,或者你可以得到一个巨大的事件队列,最终导致内存不足。 考虑到FlowableObservable不。 而已。

事实上,你的Flowable在发射128个值后没有背压处理的情况下坠毁并不意味着它会在128个值后总是崩溃:有时它会在10之后崩溃,有时甚至不会崩溃。 我相信这是当你用Observable尝试这个例子时发生的事情 – 碰巧没有背压,所以你的代码正常工作,下次可能不会。 RxJava 2的不同之处在于Observable没有背压的概念,也没有办法处理它。 如果您正在devise可能需要明确反压处理的反应序列,那么Flowable是您的最佳select。