什么时候应该使用RxJava Observable,并在Android上进行简单的callback?

我正在为我的应用程序的networking工作。 所以我决定尝试Square的改造 。 我看到他们支持简单的Callback

 @GET("/user/{id}/photo") void getUserPhoto(@Path("id") int id, Callback<Photo> cb); 

和RxJava的Observable

 @GET("/user/{id}/photo") Observable<Photo> getUserPhoto(@Path("id") int id); 

乍一看,两者看起来非常相似,但是当它实现时,它变得有趣…

虽然简单的callback实现看起来类似于这样的:

 api.getUserPhoto(photoId, new Callback<Photo>() { @Override public void onSuccess() { } }); 

这很简单直接。 而随着Observable它迅速变得冗长和相当复杂。

 public Observable<Photo> getUserPhoto(final int photoId) { return Observable.create(new Observable.OnSubscribeFunc<Photo>() { @Override public Subscription onSubscribe(Observer<? super Photo> observer) { try { observer.onNext(api.getUserPhoto(photoId)); observer.onCompleted(); } catch (Exception e) { observer.onError(e); } return Subscriptions.empty(); } }).subscribeOn(Schedulers.threadPoolForIO()); } 

而那不是。 你仍然需要做这样的事情:

 Observable.from(photoIdArray) .mapMany(new Func1<String, Observable<Photo>>() { @Override public Observable<Photo> call(Integer s) { return getUserPhoto(s); } }) .subscribeOn(Schedulers.threadPoolForIO()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Photo>() { @Override public void call(Photo photo) { //save photo? } }); 

我在这里错过了什么? 或者这是一个错误的情况下使用Observable ? 什么时候/应该更喜欢简单的callbackObservable

更新

@Niels在他的回答或者杰克·沃顿(Jake Wharton)的例子U2020项目中表明,使用改造比上面的例子简单得多。 但是问题基本上是一样的 – 应该什么时候用这种方法呢?

对于简单的networking,RxJava比Callback的优势非常有限。 简单的getUserPhoto例子:

RxJava:

 api.getUserPhoto(photoId) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Photo>() { @Override public void call(Photo photo) { } }); 

回电话:

 api.getUserPhoto(photoId, new Callback<Photo>() { @Override public void onSuccess(Photo photo, Response response) { } }); 

RxJava变种并没有比Callback变种更好。 现在,让我们忽略error handling。 我们来看一张照片列表:

RxJava:

 api.getUserPhotos(userId) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .flatMap(new Func1<List<Photo>, Observable<Photo>>() { @Override public Observable<Photo> call(List<Photo> photos) { return Observable.from(photos); } }) .filter(new Func1<Photo, Boolean>() { @Override public Boolean call(Photo photo) { return photo.isPNG(); } }) .subscribe( new Action1<Photo>() { @Override public void call(Photo photo) { list.add(photo) } }); 

回电话:

 api.getUserPhotos(userId, new Callback<List<Photo>>() { @Override public void onSuccess(List<Photo> photos, Response response) { List<Photo> filteredPhotos = new ArrayList<Photo>(); for(Photo photo: photos) { if(photo.isPNG()) { filteredList.add(photo); } } } }); 

现在,RxJava变体仍然不小,尽pipe使用Lambdas它会更接近callback变体。 此外,如果您有权访问JSON订阅源,那么当您仅显示PNG时,检索所有照片会有些奇怪。 只要调整饲料,它只显示PNG。

第一个结论

当您加载一个简单的JSON时,它不会使您的代码库更小,而您准备使用正确的格式。

现在,让我们更有趣一些。 假设您不仅要检索userPhoto,而且要有一个Instagram克隆,并且要检索2个JSON:1. getUserDetails()2. getUserPhotos()

您想要并行加载这两个JSON,并且在加载这两个JSON时,应该显示该页面。 callback变体将变得更加困难:您必须创build2个callback,将数据存储在活动中,并且如果加载了所有数据,则显示该页面:

回电话:

 api.getUserDetails(userId, new Callback<UserDetails>() { @Override public void onSuccess(UserDetails details, Response response) { this.details = details; if(this.photos != null) { displayPage(); } } }); api.getUserPhotos(userId, new Callback<List<Photo>>() { @Override public void onSuccess(List<Photo> photos, Response response) { this.photos = photos; if(this.details != null) { displayPage(); } } }); 

RxJava:

 private class Combined { UserDetails details; List<Photo> photos; } Observable.zip(api.getUserDetails(userId), api.getUserPhotos(userId), new Func2<UserDetails, List<Photo>, Combined>() { @Override public Combined call(UserDetails details, List<Photo> photos) { Combined r = new Combined(); r.details = details; r.photos = photos; return r; } }).subscribe(new Action1<Combined>() { @Override public void call(Combined combined) { } }); 

我们到了某个地方! 现在RxJava的代码和callback选项一样大。 RxJava代码更健壮; 想想如果我们需要第三个JSON加载(如最新的video)会发生什么? RxJava只需要很小的调整,而Callback变体需要在多个地方调整(在每个callback中,我们需要检查是否所有的数据都被检索到)。

另一个例子; 我们想创build一个自动完成的字段,使用Retrofit加载数据。 每当EditText有一个TextChangedEvent时,我们都不想做一个webcall。 快速input时,只有最后一个元素应该触发呼叫。 在RxJava上,我们可以使用debounce操作符:

 inputObservable.debounce(1, TimeUnit.SECONDS).subscribe(new Action1<String>() { @Override public void call(String s) { // use Retrofit to create autocompletedata } }); 

我不会创buildcallback变体,但你会明白这是更多的工作。

结论:当数据以stream的forms发送时,RxJava特别好。 Retrofit Observable同时推送stream中的所有元素。 与Callback相比,这本身并不是特别有用。 但是,当在stream上推送多个元素并且不同的时间,并且您需要做与时间相关的东西时,RxJava使得代码更易于维护。

Observable的东西已经在Retrofit中完成了,所以代码可能是这样的:

 api.getUserPhoto(photoId) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Photo>() { @Override public void call(Photo photo) { //save photo? } }); 

在getUserPhoto()的情况下,RxJava的优点并不是很好。 但是,让我们再举一个例子,当您获取用户的所有照片时,只有当图像是PNG时,并且您无法访问JSON才能在服务器端执行过滤。

 api.getUserPhotos(userId) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .flatMap(new Func1<List<Photo>, Observable<Photo>>() { @Override public Observable<Photo> call(List<Photo> photos) { return Observable.from(photos); } }) .filter(new Func1<Photo, Boolean>() { @Override public Boolean call(Photo photo) { return photo.isPNG(); } }) .subscribe( new Action1<Photo>() { @Override public void call(Photo photo) { // on main thread; callback for each photo, add them to a list or something. list.add(photo) } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { // on main thread; something went wrong System.out.println("Error! " + throwable); } }, new Action0() { @Override public void call() { // on main thread; all photo's loaded, time to show the list or something. } }); 

现在JSON返回一张Photo的列表。 我们将它们映射到单个项目。 通过这样做,我们可以使用过滤方法来忽略不是PNG的照片。 之后,我们将订阅,并为每个单独的照片,一个errorHandler和一个callback,当所有的行已经完成callback。

TLDR点在这里; callback只会返回一个成功和失败的callback; RxJava Observable允许你做地图,缩小,过滤和更多的东西。

有了rxjava,你可以用更less的代码做更多的事情。

假设您想要在您的应用中实现即时search。 有callback你担心取消订阅以前的请求和订阅新的,处理方向改变自己…我认为这是很多的代码,太冗长。

用rxjava很简单。

 public class PhotoModel{ BehaviorSubject<Observable<Photo>> subject = BehaviorSubject.create(...); public void setUserId(String id){ subject.onNext(Api.getUserPhoto(photoId)); } public Observable<Photo> subscribeToPhoto(){ return Observable.switchOnNext(subject); } } 

如果你想实现即时search,你只需要监听TextChangeListener并调用photoModel.setUserId(EditText.getText());

在片段或活动的onCreate方法中,您订阅了返回photoModel.subscribeToPhoto()的Observable,它将返回一个Observable,它始终发出最新的Observable(request)所命令的项目。

 AndroidObservable.bindFragment(this, photoModel.subscribeToPhoto()) .subscribe(new Action1<Photo>(Photo photo){ //Here you always receive the response of the latest query to the server. }); 

另外,例如,如果PhotoModel是Singleton,则无需担心方向更改,因为无论订阅什么时候,BehaviorSubject都会发出最后一个服务器响应。

通过这些代码,我们实现了即时search和处理方向的变化。 你认为你可以使用较less代码的callback来实现这个吗? 我对此表示怀疑。

看起来你正在重新发明轮子,你正在做的是在改造中已经实施。

举个例子,你可以看看改造后的RestAdapterTest.java ,它们将 Observable 定义为返回types,然后使用它 。

通过其他答案中的样本和结论,我认为简单的一步或两步任务没有太大的区别。 但是,callback简单而直接。 对于简单的任务来说,RxJava更复杂,也更大。 有第三个解决scheme: AbacusUtil 。 让我用所有三种解决scheme实现上述用例:使用Retrolambda的Callback,RxJava,CompletableFuture(AbacusUtil):

从networking获取照片并在设备上保存/显示:

 // By Callback api.getUserPhoto(userId, new Callback<Photo>() { @Override public void onResponse(Call<Photo> call, Response<Photo> response) { save(response.body()); // or update view on UI thread. } @Override public void onFailure(Call<Photo> call, Throwable t) { // show error message on UI or do something else. } }); // By RxJava api.getUserPhoto2(userId) // .observeOn(AndroidSchedulers.mainThread()) .subscribe(photo -> { save(photo); // or update view on UI thread. }, error -> { // show error message on UI or do something else. }); // By Thread pool executor and CompletableFuture. TPExecutor.execute(() -> api.getUserPhoto(userId)) .thenRunOnUI((photo, error) -> { if (error != null) { // show error message on UI or do something else. } else { save(photo); // or update view on UI thread. } }); 

并行加载用户详细信息和照片

 // By Callback // ignored because it's little complicated // By RxJava Observable.zip(api.getUserDetails2(userId), api.getUserPhoto2(userId), (details, photo) -> Pair.of(details, photo)) .subscribe(p -> { // Do your task. }); // By Thread pool executor and CompletableFuture. TPExecutor.execute(() -> api.getUserDetails(userId)) .runOnUIAfterBoth(TPExecutor.execute(() -> api.getUserPhoto(userId)), p -> { // Do your task });