RxJava中repeatWhen 和 retryWhen 操作符的解释

发表于 4年以前  | 总阅读数:5443 次

RxJava中repeatWhen 和 retryWhen 操作符的解释

repeatWhen and retryWhen are fairly baffling at first glance. For starters, they are serious contenders for "most confusing marble diagrams":

repeatWhenretryWhen 操作符在第一眼看上去的时候,就会让人觉得相当困惑。 对于初学者来说,这两个操作符的marble图算是最难看懂的几个之一了。


They're useful operators: they allow you to conditionally resubscribe to Observables that have terminated. I recently studied how they worked and I want to try to explain them (since it took me a while to grasp).


Repeat vs. Retry

First of all, what is the difference between their simpler counterparts: repeat() and retry? This part is simple: the difference is which terminal event causes a resubscription.

首先,我们先来看看它们所对应的简单版本的操作符:repeat()retry的区别?这部分很简单,主要的区别在于触发重新订阅(resubscription)的终止事件(terminal event)的类型不同。

repeat() resubscribes when it receives onCompleted().

repeat() 在接收到onCompleted()时重新订阅.

retry() resubscribes when it receives onError().

retry() 在接收到 onError()时重新订阅。

However, these simple versions can leave something to be desired. What if you want to delay resubscription by a couple seconds? Or examine the error to determine if you should resubscribe? That's where repeatWhen and retryWhen step in; they let you provide custom logic for the retries.

但是,如果我们的需求比这更加复杂的话,比如说,如果你想将重新订阅的时间往后延迟个几秒怎么办? 或者想通过判断错误(error)的类型来决定是否需要重新订阅。这时候我们就需要用到repeatWhenretryWhen了,它们可以允许我们为重试(retry)添加自定义的逻辑。

Notification Handler

You provide the retry logic through a function known as the notificationHandler. Here's the method signature for retryWhen:

你需要通过一个function也就是notificationHandler来提供重试的逻辑,下面是retryWhen的方法签名(method signature)

retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)

That's a mouthful! I found this signature hard to parse since it's a mess of generics.


Simplified, it consists of three parts:


  1. The Func1 is a factory for providing your retry logic.
  2. The input is an Observable<Throwable>.
  3. The output is an Observable<?>.
  1. Func1 是一个工厂(factory),用来提供你的重试逻辑。
  2. 输入是一个 Observable<Throwable>
  3. 输出是一个 Observable<?>

Let's look at the last part first. The emissions of the Observable<?> returned determines whether or not resubscription happens. If it emits onCompleted or onError then it doesn't resubscribe. But if it emits onNext then it does (regardless of what's actually in the onNext). That's why it's using the wildcard for its generic type: it's just the notification (next, error or completed) that matters.

我们先来看一下最后一部分。返回的Observable<?>所发出的事件(emissions)决定了要不要重新进行订阅(resubscription)。如果它发出(emit)的是onCompleted 或者 onError ,那么将不会重新订阅。如果它发出的是onNext,那么将重新订阅(无论所发出的onNext里的东西是什么)。这也是为什么它使用了通配符作为它的泛型类型:起作用的是这个事件(notification)本身。

The input is an Observable<Throwable> that emits anytime the source calls onError(Throwable). In other words, it triggers each time you need to decide whether to retry or not.


The factory Func1 is called on subscription to setup the retry logic. That way, when onError is called, you've already defined how to handle it.

工厂 Func1在订阅时(subscription)被调用,用以初始化重试逻辑。这样一来,每当 onError 被调用的时候,你就已经定义好了如何去处理这个事件了。

Here's an example wherein we resubscribe to the source if the Throwable is an IOException, but otherwise do not:

下面是一个例子,在这个例子中,当Throwable 是一个IOException的时候,我们将会去重新订阅源(source),其他情况下则不。

source.retryWhen(errors -> errors.flatMap(error -> {  
// For IOExceptions, we  retry
if (error instanceof IOException) {
return Observable.just(null);

// For anything else, don't retry
return Observable.error(error);

Each error is flatmapped so that we can either return onNext(null) (to trigger a resubscription) or onError(error) (to avoid resubscription).

每一个错误(error)都被flatmap了,所以我们可以选择在其中返回 onNext(null)(来触发重新订阅)或者onError(error) (不再重新订阅)。


Here's some key points about repeatWhen and retryWhen which you should keep in mind.

下面是一些关于repeatWhenretryWhen 的关键点,我们要牢牢记住它们

  • repeatWhen is identical to retryWhen, only it responds to onCompleted instead of onError. The input is Observable<Void>, since onCompleted has no type.

  • repeatWhenretryWhen几乎是一样的,唯一的区别在于,repeatWhen响应的是 onCompleted ,而不是 onError 而因为 onCompleted 是没有类型的,所以它的输入是一个 Observable<Void>

  • The notificationHandler (i.e. Func1) is only called once per subscription. This makes sense as you are given an Observable<Throwable>, which can emit any number of errors.

  • notificationHandler (也就是 Func1)在每一个subscription中只会被调用一次 这也说的通,因为你拿到的是一个 Observable<Throwable>, 它可以发出任意数目的错误(error)。

  • The output Observable has to use the input Observable as its source. You must react to the Observable and emit based on it; you can't just return a generic stream.

  • 输出的 Observable 必须要利用到输入的Observable 你必须响应(react)输入的Observable ,并在它的基础上发出(emit)东西,你不能仅仅返回一个泛型的流(generic stream)

In other words, you can't do something like retryWhen(errors -> Observable.just(null)). Not only will it not work, it completely breaks your sequence. You need to, at the very least, return the input, like retryWhen(errors -> errors) (which, by the way, duplicates the logic of the simpler retry()).

换句话说,你不能像retryWhen(errors -> Observable.just(null))这样做。不只是因为它不会按照你想象的那样去运行,而且它也完全打破了你的序列(sequence)。你至少也得像这样retryWhen(errors -> errors),直接把输入返回去(顺便提一下,这跟retry()的逻辑是一样的)

  • The Observable input only triggers on terminal events (onCompleted for repeatWhen / onError for retryWhen). It doesn't receive any of the onNext notifications from the source, so you can't examine the emitted data to determine if you should resubscribe. If you want to do that, you have to add an operator like takeUntil() to cut off the stream.

  • 输入的 Observable 只有在遇到终止事件(terminal event)时才会被触发 ( 对于 repeatWhenonCompleted / 对于 retryWhenonError)。 它不会接收任何来自源的onNext通知(notification), 所以你不能通过校验发出的数据去决定是否重新订阅。如果你想这样做的话,你要加入像takeUntil()之类的操作符来截断这个流(stream)。


Now that you (vaguely) understand retryWhen and repeatWhen, what sort of logic can you stick in the notificationHandler?


Poll for data periodically using repeatWhen + delay:

利用repeatWhen + delay实现服务器轮询:

source.repeatWhen(completed -> completed.delay(5, TimeUnit.SECONDS))

The source isn't resubscribed until the notificationHandler emits onNext(). Since delay waits some time before emitting onNext(), this has the effect of delaying resubscription so you can avoid constantly polling data.

只有在notificationHandler 发出 onNext()事件之后,源(source)才会被重新订阅。因为 delay 会让 onNext()先等待一段时间再发出, 这也就实现了对重新订阅的延时, 从而避免了一直不停地去查询服务器的数据。

Alternatively, delay resubscription with flatMap + timer:

另一个方案,利用 flatMap + timer来实现延迟重新订阅(resubscription):

source.retryWhen(errors -> errors.flatMap(error -> Observable.timer(5, TimeUnit.SECONDS)))

This alternative is useful when combined with other logic, such as...


Resubscribe a limited number of times with zip + range:

利用 zip + range来限制重试的次数:

source.retryWhen(errors -> errors.zipWith(Observable.range(1, 3), (n, i) -> i))

The end result is that each error is paired with one of the outputs of range, like so:

这样做最后的结果就是每一个错误(error)都会与 range的一个输出成对结合起来, 像这样:

zip(error1, 1) -> onNext(1)  <-- Resubscribe  
zip(error2, 2) -> onNext(2)  <-- Resubscribe  
zip(error3, 3) -> onNext(3)  <-- Resubscribe  
onCompleted()                <-- No resubscription  

Since range(1, 3) runs out of numbers on the fourth error, it calls onCompleted(), which causes the entire zip to complete. This prevents further retries.

在第四个错误的时候,因为range(1, 3)里所有数字都用掉了,所以它会调用onCompleted(),让整个zip结束掉(complete)。从而避免了再去重试。

Combine the above for limited retries with variable delays:


source.retryWhen(errors ->  
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS))

flatMap + timer is preferable over delay in this case because it lets us modify the delay by the number of retries. The above retries three times and delays each retry by 5 ^ retryCount, giving you exponential backoff with just a handful of operators!

在这种情况下, 相较于delay,我更加推荐使用flatMap + timer,因为后者可以让我们根据重试的次数调整延时时间。上面的代码会进行三次重试,并且每一次重试都会有5 ^ retryCount的延时,寥寥几个操作符就实现了指数退避(exponential backoff)的功能。




发布于:5月以前  |  808次阅读  |  详细内容 »



发布于:5月以前  |  770次阅读  |  详细内容 »

ASML CEO警告:出口管制不是可行做法,不要“逼迫中国大陆创新”


发布于:5月以前  |  756次阅读  |  详细内容 »


今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。

发布于:5月以前  |  648次阅读  |  详细内容 »



发布于:5月以前  |  589次阅读  |  详细内容 »

研究发现维生素 C 等抗氧化剂会刺激癌症生长和转移

近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。

发布于:5月以前  |  449次阅读  |  详细内容 »



发布于:6月以前  |  446次阅读  |  详细内容 »



发布于:5月以前  |  445次阅读  |  详细内容 »

亚马逊股东起诉公司和贝索斯,称其在购买卫星发射服务时忽视了 SpaceX

9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。

发布于:5月以前  |  444次阅读  |  详细内容 »


据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。

发布于:5月以前  |  442次阅读  |  详细内容 »


特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。

发布于:5月以前  |  441次阅读  |  详细内容 »



发布于:5月以前  |  437次阅读  |  详细内容 »


近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。

发布于:5月以前  |  430次阅读  |  详细内容 »

𝕏(推特)调整隐私政策,可拿用户发布的信息训练 AI 模型

据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。

发布于:5月以前  |  428次阅读  |  详细内容 »



发布于:5月以前  |  423次阅读  |  详细内容 »



发布于:6月以前  |  423次阅读  |  详细内容 »



发布于:6月以前  |  420次阅读  |  详细内容 »



发布于:6月以前  |  411次阅读  |  详细内容 »



发布于:5月以前  |  406次阅读  |  详细内容 »


罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。

发布于:5月以前  |  398次阅读  |  详细内容 »
简化Android的UI开发 4年以前  |  520575次阅读
Android 深色模式适配原理分析 3年以前  |  28425次阅读
Android阴影实现的几种方案 1年以前  |  10529次阅读
Android 样式系统 | 主题背景覆盖 3年以前  |  9424次阅读