CompletableFuture,Future和RxJava的Observable之间的区别

我想知道CompletableFutureFutureObservable RxJava之间的区别。

我所知道的都是asynchronous的,但是

Future.get()阻塞线程

CompletableFuture给出了callback方法

RxJava Observable —与CompletableFuture类似,具有其他好处(不确定)

例如:如果客户端需要进行多个服务调用,并且当我们使用Futures (Java)时, Future.get()将会按顺序执行…想知道RxJava如何更好。

而文档http://reactivex.io/intro.html说

使用Futures来优化组合有条件的asynchronous执行stream是困难的(或者不可能,因为每个请求的延迟在运行时间上是不同的)。 这当然可以做到,但是它很快变得复杂(并且因此容易出错),或者它过早地阻塞了Future.get(),这消除了asynchronous执行的好处。

真的有兴趣知道如何RxJava解决这个问题。我发现很难从文档中理解。请帮助!

期货

Java 5(2004)引入了期货 。 它们是承诺在操作完成后保持操作结果的对象。 例如,当一个任务(即Runnable或Callable )被提交给执行者。 调用者可以使用未来对象来检查操作是否是() ,或者等待它完成使用get() 。

例:

 /** * A task that sleeps for a second, then returns 1 **/ public static class MyCallable implements Callable<Integer> { @Override public Integer call() throws Exception { Thread.sleep(1000); return 1; } } public static void main(String[] args) throws Exception{ ExecutorService exec = Executors.newSingleThreadExecutor(); Future<Integer> f = exec.submit(new MyCallable()); System.out.println(f.isDone()); //False System.out.println(f.get()); //Waits until the task is done, then prints 1 } 

CompletableFutures

CompletableFutures是在Java 8(2014)中引入的。 可实现的期货实际上是由谷歌的可听期货 ( Guava libary的一部分)启发的定期期货的演变。 他们是期货,也允许你在一个链中串起任务。 你可以用它们来告诉一些工作者线程“去做一些任务X,当你完成了,用X的结果去做这件事情”。 这是一个简单的例子:

 /** * A supplier that sleeps for a second, and then returns one **/ public static class MySupplier implements Supplier<Integer> { @Override public Integer get() { try { Thread.sleep(1000); } catch (InterruptedException e) { //Do nothing } return 1; } } /** * A (pure) function that adds one to a given Integer **/ public static class PlusOne implements Function<Integer, Integer> { @Override public Integer apply(Integer x) { return x + 1; } } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newSingleThreadExecutor(); CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec); System.out.println(f.isDone()); // False CompletableFuture<Integer> f2 = f.thenApply(new PlusOne()); System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2 } 

RxJava

RxJava是由Netflix创build的反应式编程的完整库。 一目了然,它将看起来像Java 8的stream 。 它是,除了它更强大。

与Futures类似,RxJava可以用来将一堆同步或asynchronous操作串起来以产生一个或多个有意义的结果。 然而,不同于一次性使用的期货,RxJava可以处理零或多个物品stream,包括无限数量的物品。 由于有一组令人难以置信的丰富的操作员,它也更加灵活和stream畅。

另外,与Java 8的stream不同,RxJava有一个反压机制,允许它处理不同部分处理stream在不同线程中以不同速率运行的情况。

Rx的缺点是,尽pipe有很好的文档,但是由于涉及到范式转换,这是一个充满挑战的图书馆。 Rx代码也可能是debugging的噩梦,特别是涉及多个线程时,甚至更糟 – 如果需要反压的话。 如果你想进入它,官方网站上有一整页的各种教程,加上官方文档和Javadoc 。 你也可以看看一些video,比如这个给Rx简要介绍的video,还讨论了Rx和期货之间的区别。

奖金:Java 9react nativestream

Java 9的反应stream又称Flow API是由各种反应stream库实现的一组接口,如RxJava 2 , Akka Streams和Vertx 。 他们允许这些react native库互连,同时保留所有重要的背压。

从0.9开始,我一直在使用Rx Java,现在在1.3.2,很快就迁移到了2.x,我在一个已经工作了8年的私人项目中使用它。

没有这个图书馆,我不会再编程了。 在开始的时候我很怀疑,但是你需要创造一个完整的其他状态。 在一开始就困难重重。 我有时看着弹珠好几个小时..哈哈

这只是一个实践问题,真正了解stream动(也就是观察者和观察者的合同),一旦你到达那里,你就会讨厌这样做。

对我来说,这个图书馆没有什么坏处。

使用案例:我有一个监视器视图,包含9个仪表(CPU,MEM,networking等)。 启动视图时,视图将自己订阅到一个系统监视器类,该类将返回一个包含9米的所有数据的可观察(间隔)。 它将每秒向视图推送一个新的结果(所以不轮询!!!)。 该observable使用flatmap同时(asynchronous!)从9个不同的来源获取数据,并将结果压缩到一个新的模型,您的视图将在onNext()。

你怎么会用期货,补充等来做到这一点…祝你好运! 🙂

Rx Java为我解决了编程中的许多问题,并且使得它更容易…

优点:

  • 国家! (重要的是要提到,也许最重要的)
  • 线程pipe理开箱即用
  • 构build具有自己生命周期的序列
  • 一切都是可观察的,所以链接很容易
  • 编写的代码较less
  • classpath上的单个jar(非常轻量级)
  • 高度并发
  • 没有callback地狱了
  • 基于订户(消费者和生产者之间的紧密合同)
  • 背压策略(断路器一样)
  • 辉煌的error handling和恢复
  • 非常好的文档(弹珠<3)
  • 完全控制
  • 还有很多 …

缺点: – 很难testing

就这样 :)