RxJava并行获取可观测数据

在RxJava中实现并行asynchronous调用需要一些帮助。 我select了一个简单的用例,其中FIRST调用获取(而不是search)要显示的产品列表(Tile)。 随后的呼叫出去并取(A)REVIEWS和(B)产品图像

经过几次尝试,我到了这个地方。

1 Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm); 2 List<Tile> allTiles = new ArrayList<Tile>(); 3 ClientResponse response = new ClientResponse(); 4 searchTile.parallel(oTile -> { 5 return oTile.flatMap(t -> { 6 Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId()); 7 Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId()); 8 return Observable.zip(reviews, imageUrl, (r, u) -> { 9 t.setReviews(r); 10 t.setImageUrl(u); 11 return t; 12 }); 13 }); 14 }).subscribe(e -> { 15 allTiles.add((Tile) e); 16 }); 

第1行:出去取出要显示的产品(平铺)

第4行:我们把Observable的列表放到它的评论和imageUrls中

谎言6,7:获取Observable评论和Observableurl

第8行:最后,2个观测值被压缩以返回更新的Observable

第15行:最后第15行将所有要显示的单个产品进行整理,并将其返回给调用层

虽然Observable已经被分割,并且在我们的testing中运行了4个不同的线程, 取回评论和图像似乎是一个接一个。 我怀疑第8行的zip步骤基本上是导致2个observables(评论和url)的连续调用。

在这里输入图像说明

这个小组是否有任何build议来并行获取reiews和图像的URL。 从本质上讲,上面附带的瀑布图应该看起来更垂直堆叠。 对评论和图片的要求应该是平行的

谢谢拉曼

并行运算符在几乎所有用例中都被certificate是一个问题,并没有达到最期望的效果,所以在1.0.0.rc.4版本中被删除了: https : //github.com/ReactiveX/RxJava/拉/ 1716

在这里可以看到一个很好的例子,说明如何做这种行为并获得并行执行。

在你的示例代码中,不清楚searchServiceClient是同步的还是asynchronous的。 它会影响如何解决问题,就好像它已经是asynchronous的,不需要额外的调度。 如果需要同步的额外调度。

首先这里是一些简单的例子,展示了同步和asynchronous行为:

 import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers; public class ParallelExecution { public static void main(String[] args) { System.out.println("------------ mergingAsync"); mergingAsync(); System.out.println("------------ mergingSync"); mergingSync(); System.out.println("------------ mergingSyncMadeAsync"); mergingSyncMadeAsync(); System.out.println("------------ flatMapExampleSync"); flatMapExampleSync(); System.out.println("------------ flatMapExampleAsync"); flatMapExampleAsync(); System.out.println("------------"); } private static void mergingAsync() { Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println); } private static void mergingSync() { // here you'll see the delay as each is executed synchronously Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println); } private static void mergingSyncMadeAsync() { // if you have something synchronous and want to make it async, you can schedule it like this // so here we see both executed concurrently Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println); } private static void flatMapExampleAsync() { Observable.range(0, 5).flatMap(i -> { return getDataAsync(i); }).toBlocking().forEach(System.out::println); } private static void flatMapExampleSync() { Observable.range(0, 5).flatMap(i -> { return getDataSync(i); }).toBlocking().forEach(System.out::println); } // artificial representations of IO work static Observable<Integer> getDataAsync(int i) { return getDataSync(i).subscribeOn(Schedulers.io()); } static Observable<Integer> getDataSync(int i) { return Observable.create((Subscriber<? super Integer> s) -> { // simulate latency try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } s.onNext(i); s.onCompleted(); }); } } 

以下是试图提供更接近您的代码的示例:

 import java.util.List; import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers; public class ParallelExecutionExample { public static void main(String[] args) { final long startTime = System.currentTimeMillis(); Observable<Tile> searchTile = getSearchResults("search term") .doOnSubscribe(() -> logTime("Search started ", startTime)) .doOnCompleted(() -> logTime("Search completed ", startTime)); Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> { Observable<Reviews> reviews = getSellerReviews(t.getSellerId()) .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime)); Observable<String> imageUrl = getProductImage(t.getProductId()) .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime)); return Observable.zip(reviews, imageUrl, (r, u) -> { return new TileResponse(t, r, u); }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime)); }); List<TileResponse> allTiles = populatedTiles.toList() .doOnCompleted(() -> logTime("All Tiles Completed ", startTime)) .toBlocking().single(); } private static Observable<Tile> getSearchResults(String string) { return mockClient(new Tile(1), new Tile(2), new Tile(3)); } private static Observable<Reviews> getSellerReviews(int id) { return mockClient(new Reviews()); } private static Observable<String> getProductImage(int id) { return mockClient("image_" + id); } private static void logTime(String message, long startTime) { System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms"); } private static <T> Observable<T> mockClient(T... ts) { return Observable.create((Subscriber<? super T> s) -> { // simulate latency try { Thread.sleep(1000); } catch (Exception e) { } for (T t : ts) { s.onNext(t); } s.onCompleted(); }).subscribeOn(Schedulers.io()); // note the use of subscribeOn to make an otherwise synchronous Observable async } public static class TileResponse { public TileResponse(Tile t, Reviews r, String u) { // store the values } } public static class Tile { private final int id; public Tile(int i) { this.id = i; } public int getSellerId() { return id; } public int getProductId() { return id; } } public static class Reviews { } } 

这输出:

 Search started => 65ms Search completed => 1094ms getProductImage[1] completed => 2095ms getSellerReviews[2] completed => 2095ms getProductImage[3] completed => 2095ms zip[1] completed => 2096ms zip[2] completed => 2096ms getProductImage[2] completed => 2096ms getSellerReviews[1] completed => 2096ms zip[3] completed => 2096ms All Tiles Completed => 2097ms getSellerReviews[3] completed => 2097ms 

我已经做了每个IO调用模拟需要1000毫秒,这是显而易见的延迟时间和并行发生。 它打印出的进度是经过了毫秒。

这里的诀窍是flatMap合并了asynchronous调用,所以只要合并的Observables是asynchronous的,它们将全部被并发执行。

如果像getProductImage(t.getProductId())这样的调用是同步的,可以像下面这样进行asynchronous调用:getProductImage(t.getProductId())。subscribeOn(Schedulers.io)。

以上是没有所有日志和样板types的例子的重要部分:

  Observable<Tile> searchTile = getSearchResults("search term");; Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> { Observable<Reviews> reviews = getSellerReviews(t.getSellerId()); Observable<String> imageUrl = getProductImage(t.getProductId()); return Observable.zip(reviews, imageUrl, (r, u) -> { return new TileResponse(t, r, u); }); }); List<TileResponse> allTiles = populatedTiles.toList() .toBlocking().single(); 

我希望这有帮助。

仍然@ JDK 7的人,他们的IDE不能自动检测到JDK 8源代码,以及@benjchristensen尝试上面的杰出的回应(和解释)可以使用这个无耻折射的JDK 7代码。 荣誉@benjchristensen一个惊人的解释和例子!

 import java.util.List; import rx.Observable; import rx.Subscriber; import rx.functions.Action0; import rx.functions.Func1; import rx.functions.Func2; import rx.schedulers.Schedulers; public class ParallelExecutionExample { public static void main(String[] args) { final long startTime = System.currentTimeMillis(); Observable<Tile> searchTile = getSearchResults("search term") .doOnSubscribe(new Action0() { @Override public void call() { logTime("Search started ", startTime); } }) .doOnCompleted(new Action0() { @Override public void call() { logTime("Search completed ", startTime); } }); Observable<TileResponse> populatedTiles = searchTile.flatMap(new Func1<Tile, Observable<TileResponse>>() { @Override public Observable<TileResponse> call(final Tile t) { Observable<Reviews> reviews = getSellerReviews(t.getSellerId()) .doOnCompleted(new Action0() { @Override public void call() { logTime("getSellerReviews[" + t.id + "] completed ", startTime); } }); Observable<String> imageUrl = getProductImage(t.getProductId()) .doOnCompleted(new Action0() { @Override public void call() { logTime("getProductImage[" + t.id + "] completed ", startTime); } }); return Observable.zip(reviews, imageUrl, new Func2<Reviews, String, TileResponse>() { @Override public TileResponse call(Reviews r, String u) { return new TileResponse(t, r, u); } }) .doOnCompleted(new Action0() { @Override public void call() { logTime("zip[" + t.id + "] completed ", startTime); } }); } }); List<TileResponse> allTiles = populatedTiles .toList() .doOnCompleted(new Action0() { @Override public void call() { logTime("All Tiles Completed ", startTime); } }) .toBlocking() .single(); } private static Observable<Tile> getSearchResults(String string) { return mockClient(new Tile(1), new Tile(2), new Tile(3)); } private static Observable<Reviews> getSellerReviews(int id) { return mockClient(new Reviews()); } private static Observable<String> getProductImage(int id) { return mockClient("image_" + id); } private static void logTime(String message, long startTime) { System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms"); } private static <T> Observable<T> mockClient(final T... ts) { return Observable.create(new Observable.OnSubscribe<T>() { @Override public void call(Subscriber<? super T> s) { try { Thread.sleep(1000); } catch (Exception e) { } for (T t : ts) { s.onNext(t); } s.onCompleted(); } }) .subscribeOn(Schedulers.io()); // note the use of subscribeOn to make an otherwise synchronous Observable async } public static class TileResponse { public TileResponse(Tile t, Reviews r, String u) { // store the values } } public static class Tile { private final int id; public Tile(int i) { this.id = i; } public int getSellerId() { return id; } public int getProductId() { return id; } } public static class Reviews { } }