rxjava:我可以使用重试()但延迟?

我在我的Android应用程序中使用rxjava来asynchronous处理networking请求。 现在我只想在经过一段时间后再试一次失败的networking请求。

有什么办法可以在Observable上使用retry(),但是只有在某个延迟之后才能重试。

有没有办法让Observable知道目前正在重试(而不是第一次尝试)?

我看了debounce()/ throttleWithTimeout(),但他们似乎在做一些不同的事情。

编辑:

我想我find了一种方法来做到这一点,但是我有兴趣确认这是正确的做法或者其他更好的方法。

我在做的是这样的:在Observable.OnSubscribe的call()方法中,在调用Subscribers onError()方法之前,我只需让Threadhibernate所需的时间。 所以,每1000毫秒重试,我做这样的事情:

@Override public void call(Subscriber<? super List<ProductNode>> subscriber) { try { Log.d(TAG, "trying to load all products with pid: " + pid); subscriber.onNext(productClient.getProductNodesForParentId(pid)); subscriber.onCompleted(); } catch (Exception e) { try { Thread.sleep(1000); } catch (InterruptedException e1) { e.printStackTrace(); } subscriber.onError(e); } } 

由于这个方法在IO线程上运行,所以不会阻塞UI。 我能看到的唯一问题是即使是第一个错误也会延迟报告,所以即使没有重试(),延迟也是存在的。 如果发生错误之后没有应用延迟,而是重试之前 (但不是在第一次尝试之前,显然),我希望它更好。

您可以使用retryWhen()运算符向任何Observable添加重试逻辑。

以下类包含重试逻辑:

RxJava 2.x

 public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> { private final int maxRetries; private final int retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Observable<?> apply(final Observable<? extends Throwable> attempts) { return attempts .flatMap(new Function<Throwable, Observable<?>>() { @Override public Observable<?> apply(final Throwable throwable) { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Observable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Observable.error(throwable); } }); } } 

RxJava 1.x

 public class RetryWithDelay implements Func1<Observable<? extends Throwable>, Observable<?>> { private final int maxRetries; private final int retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Observable<?> call(Observable<? extends Throwable> attempts) { return attempts .flatMap(new Func1<Throwable, Observable<?>>() { @Override public Observable<?> call(Throwable throwable) { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Observable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Observable.error(throwable); } }); } } 

用法:

 // Add retry logic to existing observable. // Retry max of 3 times with a delay of 2 seconds. observable .retryWhen(new RetryWithDelay(3, 2000)); 

这是一个基于Ben Christensen的代码片段的解决scheme, RetryWhen Example和RetryWhenTestsConditional (我不得不将n.getThrowable()更改为n )。 我使用evant / gradle-retrolambda在Android上使lambda表示法工作,但是您不必使用lambdaexpression式(虽然强烈推荐)。 对于延迟,我实现了指数退避,但是可以插入你想要的退避逻辑。 为了完整性,我添加了subscribeOnobserveOn操作符。 我为AndroidSchedulers.mainThread()使用ReactiveX / RxAndroid 。

 int ATTEMPT_COUNT = 10; public class Tuple<X, Y> { public final X x; public final Y y; public Tuple(X x, Y y) { this.x = x; this.y = y; } } observable .subscribeOn(Schedulers.io()) .retryWhen( attempts -> { return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i)) .flatMap( ni -> { if (ni.y > ATTEMPT_COUNT) return Observable.error(ni.x); return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS); }); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); 

而不是使用MyRequestObservable.retry我使用包装函数retryObservable(MyRequestObservable,retrycount,秒),它返回一个新的Observable处理间接的延迟,所以我可以做

 retryObservable(restApi.getObservableStuff(), 3, 30) .subscribe(new Action1<BonusIndividualList>(){ @Override public void call(BonusIndividualList arg0) { //success! } }, new Action1<Throwable>(){ @Override public void call(Throwable arg0) { // failed after the 3 retries ! }}); // wrapper code private static <T> Observable<T> retryObservable( final Observable<T> requestObservable, final int nbRetry, final long seconds) { return Observable.create(new Observable.OnSubscribe<T>() { @Override public void call(final Subscriber<? super T> subscriber) { requestObservable.subscribe(new Action1<T>() { @Override public void call(T arg0) { subscriber.onNext(arg0); subscriber.onCompleted(); } }, new Action1<Throwable>() { @Override public void call(Throwable error) { if (nbRetry > 0) { Observable.just(requestObservable) .delay(seconds, TimeUnit.SECONDS) .observeOn(mainThread()) .subscribe(new Action1<Observable<T>>(){ @Override public void call(Observable<T> observable){ retryObservable(observable, nbRetry - 1, seconds) .subscribe(subscriber); } }); } else { // still fail after retries subscriber.onError(error); } } }); } }); } 

现在使用RxJava版本1.0+,你可以使用zipWith来延迟重试。

添加修改kjones的答案。

改性

 public class RetryWithDelay implements Func1<Observable<? extends Throwable>, Observable<?>> { private final int MAX_RETRIES; private final int DELAY_DURATION; private final int START_RETRY; /** * Provide number of retries and seconds to be delayed between retry. * * @param maxRetries Number of retries. * @param delayDurationInSeconds Seconds to be delays in each retry. */ public RetryWithDelay(int maxRetries, int delayDurationInSeconds) { MAX_RETRIES = maxRetries; DELAY_DURATION = delayDurationInSeconds; START_RETRY = 1; } @Override public Observable<?> call(Observable<? extends Throwable> observable) { return observable .delay(DELAY_DURATION, TimeUnit.SECONDS) .zipWith(Observable.range(START_RETRY, MAX_RETRIES), new Func2<Throwable, Integer, Integer>() { @Override public Integer call(Throwable throwable, Integer attempt) { return attempt; } }); } } 

受保罗的答复启发,如果你不担心重retryWhen Abhijit Sarkar retryWhen问题,推迟与rxJava2无条件地重新订阅的最简单的方法是:

 source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS)) 

您可能希望看到更多的样本和重试时的解释时和重复时 。

您可以在RetryWhen操作符中返回的Observable中添加延迟

  /** * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated */ @Test public void observableOnErrorResumeNext() { Subscription subscription = Observable.just(null) .map(Object::toString) .doOnError(failure -> System.out.println("Error:" + failure.getCause())) .retryWhen(errors -> errors.doOnNext(o -> count++) .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)), Schedulers.newThread()) .onErrorResumeNext(t -> { System.out.println("Error after all retries:" + t.getCause()); return Observable.just("I save the world for extinction!"); }) .subscribe(s -> System.out.println(s)); new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS); } 

你可以在这里看到更多的例子。 https://github.com/politrons/reactive

retryWhen一个复杂的,甚至可能是越野车的操作员。 官方文档和至less一个答案在这里使用range运算符,如果没有重试将会失败。 请参阅我与ReactiveX成员David Karnok的讨论 。

我改进了kjones的答案,将flatMap更改为concatMap并添加了RetryDelayStrategy类。 flatMapconcatMap中不保留发射顺序,这对于延迟退避很重要。 如名称所示, RetryDelayStrategy让用户从各种生成重试延迟的模式中进行select,包括退避。 代码在我的GitHub上可用,完成以下testing用例:

  1. 一次尝试成功(不重试)
  2. 1次重试失败
  3. 尝试重试3次,但在2日成功,因此不会重试第三次
  4. 第三次重试成功

请参阅setRandomJokes方法。

kjones相同的答案,但更新到最新版本对于RxJava 2.x版本:( 'io.reactivex.rxjava2:rxjava:2.1.3')

 public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> { private final int maxRetries; private final long retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception { return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() { @Override public Publisher<?> apply(Throwable throwable) throws Exception { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Flowable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Flowable.error(throwable); } }); } 

}

用法:

//将重试逻辑添加到现有的observable。 //延迟2秒,重试最多3次。

 observable .retryWhen(new RetryWithDelay(3, 2000)); 

只需要这样做:

  Observable.just("") .delay(2, TimeUnit.SECONDS) //delay .flatMap(new Func1<String, Observable<File>>() { @Override public Observable<File> call(String s) { L.from(TAG).d("postAvatar="); File file = PhotoPickUtil.getTempFile(); if (file.length() <= 0) { throw new NullPointerException(); } return Observable.just(file); } }) .retry(6) .subscribe(new Action1<File>() { @Override public void call(File file) { postAvatar(file); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { } });