写一个Rx“RetryAfter”扩展方法

在IntroToRx这本书中,作者build议写一个“智能”的I / O重试,在一段时间后重试I / O请求,比如networking请求。

这是确切的段落:

添加到自己的库中的一种有用的扩展方法可能是“后退和重试”方法。 与我合作过的团队在执行I / O(尤其是networking请求)时发现了这样的function。 这个概念就是尝试,并在失败后等待一段时间,然后再试一次。 此方法的版本可能会考虑要重试的exception的types以及重试的最大次数。 您甚至可能希望延长等待时间,以便在随后的每次重试时减less攻击。

不幸的是,我不知道如何写这个方法。 🙁

延迟观测值是实现退避重试的关键。 一个延迟观察者将不会执行它的工厂,直到有人订阅它。 它将为每个订阅调用工厂,使其成为我们的重试场景的理想select。

假设我们有一个触发networking请求的方法。

public IObservable<WebResponse> SomeApiMethod() { ... } 

为了这个小片段的目的,我们定义deferred作为source

 var source = Observable.Defer(() => SomeApiMethod()); 

每当有人订阅来源,它将调用SomeApiMethod并启动一个新的Web请求。 每当失败时重试它的天真的方式将使用内置的重试运算符。

 source.Retry(4) 

这对API来说不是很好,但这不是你要求的。 我们需要在每次尝试之间推迟发起请求。 这样做的一个方法是延迟订阅 。

 Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4) 

这是不理想的,因为即使是第一个请求,它也会增加延迟,让我们来解决这个问题。

 int attempt = 0; Observable.Defer(() => { return ((++attempt == 1) ? source : source.DelaySubscription(TimeSpan.FromSeconds(1))) }) .Retry(4) .Select(response => ...) 

只是暂停一秒不是一个很好的重试方法,所以让我们改变这个常数是一个函数接收重试计数并返回一个适当的延迟。 指数后退很容易实现。

 Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2)); ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1))) 

我们现在差不多完成了,只需要添加一个方法来指定我们应该重试的exception。 让我们添加一个给定exception的函数,返回是否重试有意义,我们将其称为retryOnError。

现在我们需要写一些可怕的代码,但是忍耐着我。

 Observable.Defer(() => { return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1))) .Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null)) .Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e) ? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e) : Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e))); }) .Retry(retryCount) .SelectMany(t => t.Item1 ? Observable.Return(t.Item2) : Observable.Throw<T>(t.Item3)) 

所有这些尖括号在那里编组一个exception,我们不应该重.Retry() 。 我们已经使内部可观察的是IObservable<Tuple<bool, WebResponse, Exception>> ,其中第一个bool指示我们是否有响应或exception。 如果retryOnError指示我们应该重试一个特定的exception,那么内部的observable将会抛出,并且会被重试拾取。 SelectMany只是展开我们的元组,并使得到的可观察性再次成为IObservable<WebRequest>

看到我的要点与完整的来源和最终版本的testing 。 有了这个操作符,我们可以很简洁地编写我们的重试代码

 Observable.Defer(() => SomApiMethod()) .RetryWithBackoffStrategy( retryCount: 4, retryOnError: e => e is ApiRetryWebException ) 

也许我过分简化了情况,但是如果我们看一下Retry的实现,那么它就是一个Observable.Catch,它是一个无穷无尽的可观察对象:

 private static IEnumerable<T> RepeatInfinite<T>(T value) { while (true) yield return value; } public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source) { return Observable.Catch<TSource>(QueryLanguage.RepeatInfinite<IObservable<TSource>(source)); } 

所以如果我们采取这种方法,我们可以在第一个产量之后加上一个延迟。

 private static IEnumerable<IObservable<TSource>> RepeateInfinite<TSource> (IObservable<TSource> source, TimeSpan dueTime) { // Don't delay the first time yield return source; while (true) yield return source.DelaySubscription(dueTime); } public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime) { return RepeateInfinite(source, dueTime).Catch(); } 

捕获具有重试计数的特定exception的重载可以更简洁:

 public static IObservable<TSource> RetryAfterDelay<TSource, TException>(this IObservable<TSource> source, TimeSpan dueTime, int count) where TException : Exception { return source.Catch<TSource, TException>(exception => { if (count <= 0) { return Observable.Throw<TSource>(exception); } return source.DelaySubscription(dueTime).RetryAfterDelay<TSource, TException>(dueTime, --count); }); } 

请注意,这里的重载是使用recursion。 在第一次出现时,如果count类似于Int32.MaxValue,那么似乎StackOverflowException是可能的。 但是,DelaySubscription使用一个调度程序来运行订阅动作,所以堆栈溢出将不可能(即使用“trampolining”)。 我想这不是通过查看代码真正明显。 我们可以通过显式地将DelaySubscription超载的调度器设置为Scheduler.Immediate,并传入TimeSpan.Zero和Int32.MaxValue来强制堆栈溢出。 我们可以通过一个非直接调度器来更明确地expression我们的意图,例如:

 return source.DelaySubscription(dueTime, TaskPoolScheduler.Default).RetryAfterDelay<TSource, TException>(dueTime, --count); 

更新:增加重载,以采取特定的计划。

 public static IObservable<TSource> RetryAfterDelay<TSource, TException>( this IObservable<TSource> source, TimeSpan retryDelay, int retryCount, IScheduler scheduler) where TException : Exception { return source.Catch<TSource, TException>( ex => { if (retryCount <= 0) { return Observable.Throw<TSource>(ex); } return source.DelaySubscription(retryDelay, scheduler) .RetryAfterDelay<TSource, TException>(retryDelay, --retryCount, scheduler); }); } 

这是我正在使用的一个:

 public static IObservable<T> DelayedRetry<T>(this IObservable<T> src, TimeSpan delay) { Contract.Requires(src != null); Contract.Ensures(Contract.Result<IObservable<T>>() != null); if (delay == TimeSpan.Zero) return src.Retry(); return src.Catch(Observable.Timer(delay).SelectMany(x => src).Retry()); } 

根据马库斯的回答,我写了以下内容:

 public static class ObservableExtensions { private static IObservable<T> BackOffAndRetry<T>( this IObservable<T> source, Func<int, TimeSpan> strategy, Func<int, Exception, bool> retryOnError, int attempt) { return Observable .Defer(() => { var delay = attempt == 0 ? TimeSpan.Zero : strategy(attempt); var s = delay == TimeSpan.Zero ? source : source.DelaySubscription(delay); return s .Catch<T, Exception>(e => { if (retryOnError(attempt, e)) { return source.BackOffAndRetry(strategy, retryOnError, attempt + 1); } return Observable.Throw<T>(e); }); }); } public static IObservable<T> BackOffAndRetry<T>( this IObservable<T> source, Func<int, TimeSpan> strategy, Func<int, Exception, bool> retryOnError) { return source.BackOffAndRetry(strategy, retryOnError, 0); } } 

我更喜欢它,因为

  • 它不会修改attempts但使用recursion。
  • 它不使用retries但将retryOnError的尝试retries传递给retryOnError

在研究Rxx如何做时,我想到了另一个稍微不同的实现。 所以这主要是Rxx的方法的缩减版本。

签名与Markus的版本略有不同。 您指定一种要重试的exceptiontypes,而延迟策略将采用exception和重试计数,因此每个连续重试等待的延迟时间可能会更长。

我不能保证它是错误certificate,或最好的方法,但它似乎工作。

 public static IObservable<TSource> RetryWithDelay<TSource, TException>(this IObservable<TSource> source, Func<TException, int, TimeSpan> delayFactory, IScheduler scheduler = null) where TException : Exception { return Observable.Create<TSource>(observer => { scheduler = scheduler ?? Scheduler.CurrentThread; var disposable = new SerialDisposable(); int retryCount = 0; var scheduleDisposable = scheduler.Schedule(TimeSpan.Zero, self => { var subscription = source.Subscribe( observer.OnNext, ex => { var typedException = ex as TException; if (typedException != null) { var retryDelay = delayFactory(typedException, ++retryCount); self(retryDelay); } else { observer.OnError(ex); } }, observer.OnCompleted); disposable.Disposable = subscription; }); return new CompositeDisposable(scheduleDisposable, disposable); }); } 

这是我提出的一个。

不希望将单个重试的项目连接成一个序列,而是在每次重试时将源序列作为一个整体发出 – 因此操作员返回一个IObservable<IObservable<TSource>> 。 如果不需要,可以简单地将它Switch()回一个序列。

(背景:在我的用例中,源代码是一个热门的热门程序,我GroupByUntil一个项目出现,closures了这个组,如果这个项目在两次重试之间丢失,那么这个组永远不会closures,导致内存泄漏。的序列允许仅在内部序列上进行分组(或exception处理或…)。)

 /// <summary> /// Repeats <paramref name="source"/> in individual windows, with <paramref name="interval"/> time in between. /// </summary> public static IObservable<IObservable<TSource>> RetryAfter<TSource>(this IObservable<TSource> source, TimeSpan interval, IScheduler scheduler = null) { if (scheduler == null) scheduler = Scheduler.Default; return Observable.Create<IObservable<TSource>>(observer => { return scheduler.Schedule(self => { observer.OnNext(Observable.Create<TSource>(innerObserver => { return source.Subscribe( innerObserver.OnNext, ex => { innerObserver.OnError(ex); scheduler.Schedule(interval, self); }, () => { innerObserver.OnCompleted(); scheduler.Schedule(interval, self); }); })); }); }); }