RxSwift基本概念与使用

去年公司项目搞重构时学习并应用过一阵RAC,感觉真是6的起飞。近来听说在swift上有一个RxSwift库写的棒棒的,特地学习了下。写下一点学习笔记仅供参考。

本篇文章主要分四个部分:RxSwift简介,RxSwift中的基本概念,RxSwift基本用法,RxSwift中丰富而神奇的操作。

先来一张图镇帖。

1

继续阅读之前,强烈建议读者先去读一遍官方的ReadMe,然后运行一下官方的Playgrounds和Example工程,稍微有个印象对于接下来的理解会很有帮助。

RxSwift简介

RxSwift是ReactiveX在Swift语言平台上实现的一个函数响应式编程框架。兼具函数式与响应式的特性,再加上Swift本身已经集众家之所长,配合使用简直如虎添翼,使得代码优雅简洁。

事实上,我们写的绝大部分代码其实都是为了响应事件:当一个按钮被点击时,需要写一个@IBAction方法来处理;当需要监听键盘位置变化时,需要注册一个notification;当使用URLSession做网络请求时需要提供一个closure来执行结果;想要时刻关注一个值的变化时,使用KVO;处理一个tableview的点击事件时,应用delegate。应用系统提供的多种不同的方式最终会导致我们的代码异常的复杂与混乱。如果有一种方式可以将上述五种合而为一,会不会很大程度提高代码可读性?答案是肯定的,Rx做到了。

RxSwift中的基本概念

所谓响应式编程最基本的就是两个概念:被观察者和观察者。被观察者能够对外发送可被接收的数据,观察者能够设置去接收这些数据,然后对不同数据进行不同处理。这个设置的动作叫做订阅。从设计模式的角度讲,这个过程应用了观察者模式,像系统提供的KVO,notification其实都是这一模式。

Observable & Observer

在RxSwift的世界中,Observable表示被观察者,Observer表示观察者。

首先确定你已经了解了Swift中的序列(sequence),一个Observable的实例可以理解成就是一个序列。
而一个Observable序列比Swift中的序列强大之处在于它能够异步接收数据。这是RxSwift的精髓。其他所有都是基于这一概念拓展而来的。

Observable有一个函数表示被订阅,即subscribe(_:) ,调用了之后它发送任何数据都能被接收到。

什么是订阅呢?你正在看博客,肯定知道神马叫订阅博客,一个意思。

创建一个Observable对象,此时它还只是拥有被观察的能力,并没有发挥出来,直到被调用了订阅函数subscribe(_:),在哪个对象中调用的这个函数,哪个对象即是Observer。

当一个Observable被一个Observer订阅时,可以认为在两者之间建立起了一条管道,Observable可以源源不断的向Observer发送数据,直到管道被销毁。但是要注意,这是一条很窄的管道,窄到一次只能通过一条数据。如果将一条数据理解成一个球,那么通俗的说就是一个球发送完再发送另一个球,而绝不能同时发送两个球。

Event

刚才提到Observable可以对外发送数据,那应用什么样的载体来装这些数据呢?在什么样的时机来发送这些数据呢?答案是事件。那什么是事件呢,我举几个例子,一个文本框内容改变是事件,一个按钮被点击是事件,一个列表被滑动是事件,一个网络请求被触发是事件,一个数据库被修改还是事件。

在RxSwift的世界中,Event表示事件。Observable序列可以发送出去的事件被分成了三种类型,分别是next类型,错误(error)类型和完成(completed)类型。其中next事件和错误事件内部都装有数据,而完成事件并没有。

所以下文中提到的Observable序列发送事件或者发送数据其实都是一个意思,别纠结。

一个Observable序列作为一个事件源理论上当然可以发送任意多个事件。所以规定以发送一个完成事件或者一个错误事件作为结束,在这之前可以发送无限多个next事件,但是只要这两个事件其中之一一发送,以后就不能再继续发送任何事件了。

根据Observable是否会发送代表结束的事件可以分两种类型,我举2个具体的例子感受下,网络请求可以作为一个Observable,设置去订阅它,被触发即调用API得到结果后将数据装配到next事件中发出,然后根据结果决定发送一个错误事件或是一个完成事件,此时这个网络请求完全结束,以后再不能发送事件。想重新调用此API只能创建另一个网络请求;一个按钮可以也作为一个Observable,设置去订阅它,被点击后就发送一个next事件,但是只要这个按钮不被销毁,就可以源源不断发送事件,即使被销毁时,也不会发送错误事件或是完成事件。

看一下官方出的图标特别形象:

–1–2–3–4–5–6–|—-> // “|” = Terminates normally: with an completed

–a–b–c–d–e–f–X—-> // “X” = Terminates with an error

–tap–tap———-tap–> // “|” = Continues indefinitely, such as a sequence of button taps

总结一下,记住四个核心概念,三个名词:Observable被观察者,Observer观察者,Event事件;一个动词:订阅subscribe。

RxSwift基本用法

RxSwift的使用说起来非常简单:第一步创建一个Observable序列;第二步去订阅它,此时可以提前设置好遇到不同的事件消息进行不同相应的处理;第三步记得在适当的时候销毁,回收资源。

三个步骤核心关键词:创建create,订阅subscribe,销毁dispose。

创建 create & 订阅 subscribe

前面提到过,创建一个Observable对象后只有调用订阅函数才能让其将能力发挥出来,所以两者需要配合使用。

针对Observable序列可以发出的三种不同类型事件,订阅时也可以有选择性的只订阅单一类型的事件或者全部类型的事件。

订阅所有类型事件:subscribe(_:)

只订阅next类型事件:subscribe(onNext:)

只订阅错误类型事件:subscribe(onError:)

只订阅完成类型事件:subscribe(onCompleted:)

分别订阅所有类型事件:subscribe(onNext:onError:onCompleted:)

另外,通常调用订阅函数subscribe(_:)之后的返回值都起名叫做***Subscription

快速创建

RxSwift自带一些可以快速创建Observable序列的操作符,应用它们创建后的序列本身已经具有发送事件的能力。一个Observable序列可以发送任意个next事件直到发送了一个完成事件或者错误事件作为结束。

never

创建一个永不发送任何事件且永不自动终结的序列。

举个🌰:

let disposeBag = DisposeBag() // 1
let neverSequence = Observable<String>.never()

let neverSequenceSubscription = neverSequence
    .subscribe { _ in
        print("This will never be printed")
}

neverSequenceSubscription.disposed(by: disposeBag) // 1

结果是什么也不输出。

说明:这两行代码都是和资源回收相关,下面会讲到,暂时先忽略。

empty

创建一个只会发送一个完成事件的Observable序列。

举个🌰:

let disposeBag = DisposeBag()

Observable<Int>.empty()
    .subscribe { event in
        print(event)
    }
    .disposed(by: disposeBag)

结果是:

completed
just

创建一个只会发送一个next事件和一个完成事件的Observable序列。

举个🌰:

let disposeBag = DisposeBag()

Observable.just("🔴")
    .subscribe { event in
        print(event)
    }
    .disposed(by: disposeBag)

结果是:

next(🔴)
completed
of

创建一个会发送固定数量数据的Observable序列。

举个🌰:

let disposeBag = DisposeBag()

Observable.of("🐶", "🐱", "🐭", "🐹")
    .subscribe(onNext: { element in
        print(element)
    })
    .disposed(by: disposeBag)

结果是:

🐶
🐱
🐭
🐹

注意此例子中只订阅了next事件,所以并没有像上两个例子一样打印completed

from

从一个Swift的系统类Array,Dictionary,Set这样的序列中创建一个Observable序列。

举个🌰:

let disposeBag = DisposeBag()

Observable.from(["🐶", "🐱", "🐭", "🐹"])
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

🐶
🐱
🐭
🐹
range

创建一个会发送一个范围内的连续整数的Observable序列。

举个🌰:

let disposeBag = DisposeBag()

Observable.range(start: 1, count: 10)
    .subscribe { print($0) }
    .disposed(by: disposeBag)

结果是:

next(1)
next(2)
next(3)
next(4)
next(5)
next(6)
next(7)
next(8)
next(9)
next(10)
completed
repeatElement

创建一个会重复发送特定数据的Observable序列。

举个🌰:

let disposeBag = DisposeBag()

Observable.repeatElement("🔴")
    .take(3)  // 1
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

🔴
🔴
🔴

说明:这里使用了一个take操作符,用来控制发送数据的次数,后面会讲到,暂时先忽略。

generate

有一个初始值一个判断条件和一个改变数据内容的函数,创建一个只要条件满足就会发送数据的Observable序列。

举个🌰:

let disposeBag = DisposeBag()

Observable.generate(
        initialState: 0,
        condition: { $0 < 3 },
        iterate: { $0 + 1 }
    )
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

0
1
2
interval

创建一个间隔固定时间就会发送一个next事件的Observable序列。

举个🌰:

let disposeBag = DisposeBag()

Observable<Int>.interval(1, scheduler: MainScheduler.instance)
   .subscribe(onNext: { print($0) })
   .disposed(by: disposeBag)

结果是:

0
1
2
3
4
5
6
...

说明:MainScheduler.instance这里代表主线程,多线程相关知识下一篇文章中会有介绍,暂时先忽略。

error

创建一个会立即发送一个错误事件的Observable序列。

举个🌰:

let disposeBag = DisposeBag()

Observable<Int>.error(TestError.test)
    .subscribe { print($0) }
    .disposed(by: disposeBag)

结果是:

error(test)

上面出现的所有操作符其实可以分四种,never不发送任何事件,interval一直发送next事件,error仅发送一个错误事件,其他的发送完next事件之后都会发送一个完成事件。

基本创建

create

创建一个自定义的Observable序列。

此时需要手动设置发送事件的类型和时机,并且要注意在发送完next事件之后需要发送一个完成事件或者错误事件表示结束。

先来看一个🌰:

let disposeBag = DisposeBag()
var count = 1
let myJust = { (element: String) -> Observable<String> in
    print("Creating \(count)")
    count += 1
    return Observable.create { observer in // 1
        print("Emitting...")
        observer.on(.next(element))
        observer.on(.completed)  // 2
        return Disposables.create()  // 3
    }
}

myJust("🔴")
    .subscribe { print($0) }
    .disposed(by: disposeBag)  // 3
myJust("🌰")
    .subscribe { print($0) }
    .disposed(by: disposeBag)

输出结果是:

Creating 1
Emitting...
next(🔴)
completed
Creating 2
Emitting...
next(🌰)
completed

解释一下上面的代码:

  1. 首先发现创建Observable时内部也有一个observer,字面意思也是观察者。而前面提到调用subscribe的对象才是观察者,那现在怎么有两个观察者?不要懵逼,确实有两个但是职责不同,外部的observer是为了观察Observable发出不同数据以便进行不同处理,而内部的observer是为了向外发送数据。事实上如果还是觉得奇怪可以改成任意名字,只要不影响发送数据就好;
  2. 最后要发送一个完成事件作为结束标志;
  3. 仍然是和资源回收相关,下面会讲到,暂时仍忽略。

create的闭包中有一个observer对象负责去发送事件,仍然是next事件和错误事件需要有数据,而完成事件不需要有数据。

发送所有类型事件:on(_:)

具体分为:on(.next()), on(.error()), on(.completed)

仅发送next类型事件:onNext(_:)

仅发送错误类型事件:onError(_:)

仅发送完成类型事件:onCompleted()

doOn

doOn操作并不是为了创建Observable序列。而是为了使得Observable序列可以在发送事件之前先来做一些事情。

举个🌰:

let disposeBag = DisposeBag()

Observable.of("🍎", "🍐", "🍊", "🍋")
    .do(onNext: { print("Intercepted:", $0) }, onError: { print("Intercepted error:", $0) }, onCompleted: { print("Completed")  })
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

Intercepted: 🍎
🍎
Intercepted: 🍐
🍐
Intercepted: 🍊
🍊
Intercepted: 🍋
🍋
Completed

可以看出在每次发送事件之前都做了一次打印,但是并没有改变要发送数据的值。

如例子中展示,函数为:do(onNext:onError:onCompleted:) ,可以对一个或多个参数赋值,从而选择在发送相应类型事件之前做些事情。

销毁 dispose

上面介绍了如何创建一个Observable序列并且去订阅它,那么问题来了,当这个序列不再发送新事件了,此时如果再继续订阅就没有意义了,如何进行取消或者说如何回收这一部分内存资源呢?

手动 Disposable

回到订阅函数:

public func subscribe(_ on: ) -> Disposable

我们发现订阅函数返回类型是Disposable协议,这个协议中只有一个函数:

func dispose()

于是乎通过调用这个函数可以进行资源回收。

举个🌰:

let subscription = Observable<Int>.interval(0.3, scheduler: MainScheduler.instance)   // 1
    .subscribe (onNext: { print($0) })

delay(2) {    // 2 
    subscription.dispose()
}

结果是:

0
1
2
3
4
5

解释一下上面的代码:

  1. 创建一个间隔0.3s在主线程发送整形变量的Observable;
  2. 延迟2s执行,非系统提供,理解意思即可。

可以将subscription.dispose()注释掉对比一下结果,会发现在调用了 dispose() 函数之后,创建的Observable序列就不再发送新的事件了,证明资源已经被回收了。

实际开发当中这种方式并不常用,因为通常并不能很好的掌控调用dispose() 函数的时机,更推荐下面一种函数。

自动 DisposeBag

上面介绍了很多创建Observable序列的函数,观察之前的例子中通用的代码有如下两行:

let disposeBag = DisposeBag()

.disposed(by: disposeBag)

先说一下什么是DisposeBag ,它是RxSwift创造的另一个重要的概念,可以直译成用于销毁的包。其实就是一个类似垃圾袋的东西。我还是举一个例子,我们在家里吃西瓜时会剩下一堆西瓜皮,但是我们不会吃一块西瓜剩下一块西瓜皮就扔到楼下的垃圾桶,否则你会累死。而是先放到一个垃圾袋里,等到切好的西瓜都被吃完了将所有的西瓜皮放到垃圾袋中一次性扔到楼下的垃圾桶。

DisposeBag的使用其实也是这个思路(作者默认你是在ARC的环境下进行开发,否则别往下看了)。首先在一个类中创建一个DisposeBag的实例,通常起名字叫做disposeBag(打开一个垃圾袋),然后将各种订阅后返回的Disposable类型的对象调用这个disposed(by:) 函数,也就是加入到这个实例中(将一块西瓜皮放到这个垃圾袋中)。程序运行时当这个类的实例被ARC系统自动回收时,会清空内部所有对象并释放相应资源当然也包括这个disposeBag(将垃圾袋扔到楼下垃圾桶)。

而这也解释了为什么在create一个Observable时闭包里最后要添加这么一个行代码:

return Disposables.create()

Subjects

Subject是RxSwift创造的另一个重要的概念,它既是Observable又是Observer,并且可以说是两者之间的一种桥梁。作为一个observer,它可以同时订阅一个或多个Observable,作为一个Observable,它能够传递它订阅的Observable发出的事件,同时自己也能发送新的事件。

PublishSubject

向它的所有订阅者发送(广播)新的事件。

举个🌰:

let disposeBag = DisposeBag()
let subject = PublishSubject<String>()

subject.addObserver("1").disposed(by: disposeBag) // 1
subject.onNext("🐶")  // 2
subject.onNext("🐱")

subject.addObserver("2").disposed(by: disposeBag) 
subject.onNext("🅰️")
subject.onNext("🅱️")

结果是:

Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)

首先要注意上面使用了一个addObserver函数,它的定义是这样的:

func addObserver(_ id: String) -> Disposable {
     return subscribe { print("Subscription:", id, "Event:", $0) }
}

其实也就是封装了一下subscribe 函数,这是为了在几个例子中可以共用。

然后解释一下上面的代码:

  1. 因为subject是一个observer,所以可以订阅Observable;
  2. 因为subject是一个Observable,所以可以发送数据。

观察打印结果可以发现:

  1. 订阅之后发出的事件可以接收到,但之前的事件接收不到;
  2. 这很正常,和上面创建Observable然后去订阅得到的结果是一样的。

ReplaySubject

仍然是向它的所有订阅者发送(广播)新的事件,但同时有一个缓存机制,通过设置bufferSize的值可以向新的订阅者重新发送固定数量的历史事件。

举个🌰:

let disposeBag = DisposeBag()
let subject = ReplaySubject<String>.create(bufferSize: 1)

subject.addObserver("1").disposed(by: disposeBag) 
subject.onNext("🐶")
subject.onNext("🐱")

subject.addObserver("2").disposed(by: disposeBag) 
subject.onNext("🅰️")
subject.onNext("🅱️")

结果是:

Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱) // 
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)

观察打印结果可以发现:

  1. 第二次订阅后立刻接收到了一个历史事件 "🐱"
  2. 可见ReplaySubject在内部会保存一下之前发送过的事件,以便将来被新的订阅者订阅时重复发送。

BehaviorSubject

仍然是向它的所有订阅者发送(广播)新的事件,但同时立刻向新的订阅者发送最近一次发送过的事件(或者将初始化的值作为事件)。

举个🌰:

let disposeBag = DisposeBag()
let subject = BehaviorSubject(value: "🔴")

subject.addObserver("1").disposed(by: disposeBag) 
subject.onNext("🐶")
subject.onNext("🐱")

subject.addObserver("2").disposed(by: disposeBag) 
subject.onNext("🅰️")
subject.onNext("🅱️")

subject.addObserver("3").disposed(by: disposeBag) 
subject.onNext("🍐")
subject.onNext("🍊")

结果是:

Subscription: 1 Event: next(🔴) //
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱) //
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
Subscription: 3 Event: next(🅱️) //
Subscription: 1 Event: next(🍐)
Subscription: 2 Event: next(🍐)
Subscription: 3 Event: next(🍐)
Subscription: 1 Event: next(🍊)
Subscription: 2 Event: next(🍊)
Subscription: 3 Event: next(🍊)

观察打印结果可以发现:

  1. 第一次订阅后立刻接收到了subject的初始值 "🔴"
  2. 第二次订阅后立刻接收到了subject之前发出的最后一个值 "🐱"
  3. 第三次订阅后也是立刻接收到了subject之前发出的最后一个值 "🅱️"

可见BehaviorSubject就是在将bufferSize设置为1的ReplaySubject的基础上,附加了一个发送初始值的功能。

Variable

在内部包装着一个BehaviorSubject,类似于将BehaviorSubject放进了一个箱子,所以它拥有BehaviorSubject的全部能力;此外Variable还记录着当前值的状态。Variable永远不会发送错误事件,但是它会自动发送一个完成事件。

举个🌰:

let disposeBag = DisposeBag()
let variable = Variable("🔴")

variable.asObservable().addObserver("1").disposed(by: disposeBag) // 1
variable.value = "🐶" // 2
variable.value = "🐱"

variable.asObservable().addObserver("2").disposed(by: disposeBag)
variable.value = "🅰️"
variable.value = "🅱️"

结果是:

Subscription: 1 Event: next(🔴)
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
Subscription: 1 Event: completed //
Subscription: 2 Event: completed

然后解释一下上面的代码:

  1. 因为Variable相当于BehaviorSubject外面的一个箱子,所以首先要调用asObservable()函数拆箱;
  2. Variable并没有直接实现发送事件的几个函数,作为代替它对外暴露了一个value属性用于取得当前值或是设置最新值。设置新值的时候就会加到内部的BehaviorSubject序列上,从而间接实现了发送数据功能。

观察打印结果可以发现:

  1. 拥有BehaviorSubject的全部特性;
  2. 会在最后自动发送一个完成事件。

种类丰富而神奇的操作

RxSwift已经定义好了非常多的操作符,熟练使用它们可以大幅度提高开发效率。这里是用形象的图表来展示描述它们,可以看看帮助理解,虽然并不全。

因为实在太多,下面仅介绍一些主要和常用的。

组合操作

将多个源Observale组合成一个Observale。

startWith

在源Observale开始发出事件之前先发出特定的事件序列。

举个🌰:

Observable.of("🐶", "🐱", "🐭", "🐹")
    .startWith("1️⃣")
    .startWith("2️⃣")
    .startWith("3️⃣", "🅰️", "🅱️")
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果为:

3️⃣
🅰️
🅱️
2️⃣
1️⃣
🐶
🐱
🐭
🐹    

可以看出 startWith拥有LIFOlast-in-first-out的特性,越在后面添加的事件越先被发送出去。

merge

将多个源Observale序列发出的事件组合装进一个新的Observale序列,每一个源Observale序列发出事件时,新合成的序列都会发出。

可以理解成多条管道汇并入一条,简单的说就是并。

举个🌰:

let disposeBag = DisposeBag()

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

Observable.of(subject1, subject2)
    .merge()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject1.onNext("🅰️")

subject1.onNext("🅱️")

subject2.onNext("①")

subject2.onNext("②")

subject1.onNext("🆎")

subject2.onNext("③")

结果为:

🅰️
🅱️
①
②
🆎
③

zip

将最多不超过8个源Observale序列组合成一个新的Observale序列,新序列将每次每个源Observale序列分别发出的事件组合后再发出。

注意每个源Observale都要发出一个事件后合成后的Observale序列才会发出一个事件。举个生活中的例子,做肉夹馍时,肉和馍都属于原材料,也就是源Observale,一份肉严格对应一个馍,夹在一起之后才算是一个成品,也就是才能合成一个事件。一种材料用完之后另一种无论剩下多少都不能做成新的肉夹馍,也就是不能合成新的事件。

举个🌰:

let disposeBag = DisposeBag()

let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()

Observable.zip(stringSubject, intSubject) { stringElement, intElement in
    "\(stringElement) \(intElement)"
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

stringSubject.onNext("🅰️")
stringSubject.onNext("🅱️")

intSubject.onNext(1)

intSubject.onNext(2)

stringSubject.onNext("🆎")
intSubject.onNext(3)

结果是:

🅰️ 1
🅱️ 2
🆎 3    

combineLatest

将最多不超过8个源Observale序列组合成一个新的Observale序列,当所有的源Observale都至少发出了一个事件或者之后任何一个Observale发出了一个新的事件时,合成后的Observale序列根据每一个源Observale序列最后发出的事件发出一个事件。

注意每个源Observale发出一个事件后合成后的Observale序列都会发出一个事件。举个例子,计算几个可手动输入的数字的总和时,每个数字重新被输入后,总和都要重新被计算才正确。

举个🌰:

let disposeBag = DisposeBag()

let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()

Observable.combineLatest(stringSubject, intSubject) { stringElement, intElement in
        "\(stringElement) \(intElement)"
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

stringSubject.onNext("🅰️")

stringSubject.onNext("🅱️")

intSubject.onNext(1)

intSubject.onNext(2)

stringSubject.onNext("🆎")

结果是:

🅱️ 1
🅱️ 2
🆎 2

switchLatest

将源Observale序列装进一个Variable中,使用者直接去订阅Variable转换成的Observale,当Variable的value改变时,只接收最新的Observale发出的事件,之前被放进Variable中的Observale发出的事件将不再被收到。

可以这里理解,每一个被放进Variable中的Observale都是皇上,订阅者都是大臣,大臣只听皇上的话
;当value改变时,新Observale变成了皇上,大臣开始只听他的,原来的Observale或者是变成太上皇或者是死了或者是出家,但不管怎样,大臣都不再听他的了。

举个🌰:

let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "⚽️")
let subject2 = BehaviorSubject(value: "🍎")

let variable = Variable(subject1)

variable.asObservable()
    .switchLatest()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject1.onNext("🏈")
subject1.onNext("🏀")

variable.value = subject2

subject1.onNext("⚾️")

subject2.onNext("🍐")

结果是:

⚽️
🏈
🏀
🍎
🍐

变换操作

对Observable序列发送的每一个next事件中的数值进行变换。

map

对Observable序列发出的每一个next事件中的数值应用一个闭包转换成一个新的数值,这样接收到的就是转换过后的数值了。

举个🌰:

let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
    .map { $0 * $0 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

1
4
9

观察打印结果可以发现:

源序列发送三个值,经过map之后仍然是三个值,也就是说map只是改变了每一个值的内容,并不会改变数量。

flatMap

在介绍flatMap之前,首先看一下下面的例子:

let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
    .map { Observable.just($0) }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果是:

RxSwift.Just<Swift.Int>
RxSwift.Just<Swift.Int>
RxSwift.Just<Swift.Int>

观察打印结果可以发现:

虽然发送数据的数量仍然没有变化,但是每一个数值却变成了一个序列。专业一点的说就是出现了升维。什么是升维?比如说过生日时朋友送你礼物一般都会装进一个包装盒中,但是这次将装礼物的包装盒外面又套了一层或者n层包装盒。拆礼物时我们可以耐着性子一层一层拆,总会拿到,但针对Observable我们不能订阅多次吧,因为关键问题是我们并不知道到底包装了多少层。这就尴尬了,此时应该怎样取到最内层的数据呢?

这就是flatMap能做的事,针对升维做降维,理解了升维就不难理解降维,直白的说就是取得序列中的序列的数值。

看一下官方解释:对Observable序列发出的每一个Next事件中的数值应用一个闭包转换成一个Observable序列,然后将所有这些Observable序列应用上面介绍的merge操作,合并进一个新的Observable序列。

所以flatMap操作其实是map + merge两个操作结合而成的复合操作。

针对上面的例子将map换成flatMap:

let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
    .flatMap { Observable.just($0) }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

1
4
9

观察打印结果可以发现:

结果和map例子中是一样的,证明确实能取得多层序列中的值。

另外说明一点,源序列发送三个值,经过flatMap之后不一定是三个值,也就是很可能会改变数量。

再举个🌰:

let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
    .flatMap { Observable.of($0, $0 * $0) }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

1
1
2
4
3
9 

很神奇很酷有木有?!

flatMapLatest

理解了flatMap再理解flatMapLatest就非常简单了,加上Latest则只会用内部最近的Observable序列发送事件。

flatMapLatest操作其实就是map + switchLatest两个操作结合而成的复合操作。

举个🌰:

let disposeBag = DisposeBag()

struct Player {
    var score: Variable<Int>
}

let 👦🏻 = Player(score: Variable(80))
let 👧🏼 = Player(score: Variable(90))

let player = Variable(👦🏻)

player.asObservable()
    .flatMapLatest { $0.score.asObservable() } 
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

👦🏻.score.value = 85

player.value = 👧🏼

👦🏻.score.value = 95     
👧🏼.score.value = 100

结果是:

80
85
90
100

说一个flatMapLatest可以实际应用的场景,在搜索页面中当用户输入内容时一般都会据此进行网络请求获得联想数据,那如果上一个网络请求得到的数据还没有回来时用户输入的内容变了,此时就需要根据新内容发送新请求获得新数据。此时上一个请求获得的数据就没有意义了,无需再展示给用户。具体例子就不写了,第三篇Demo工程中会展示。

scan

有一个初始值,应用一个累加的闭包到Observable序列发送的每一条数据上,每一次的计算结果都作为下一个闭包的初始值,最后返回和原序列数量相等的事件。

举个🌰:

let disposeBag = DisposeBag()

Observable.of(10, 100, 1000)
    .scan(2) { aggregateValue, newValue in
        return aggregateValue + newValue
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

12
112
1112        

过滤和条件操作

让一个源Observable序列有选择的发送事件。

filter

让源Observable序列只发送满足条件的事件。

举个🌰:

let disposeBag = DisposeBag()

Observable.of(
    "🐱", "🐰", "🐶",
    "🐸", "🐱", "🐰",
    "🐹", "🐸", "🐱")
    .filter {
        $0 == "🐱"
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

🐱
🐱
🐱

distinctUntilChanged

阻止源Observable序列连续发送重复事件,也就是事件变化了才发送。

举个🌰:

let disposeBag = DisposeBag()

Observable.of("🐱", "🐷", "🐱", "🐱", "🐱", "🐵", "🐱")
    .distinctUntilChanged()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

🐱
🐷
🐱
🐵
🐱

elementAt

让源Observable序列只发送特定索引的事件。

举个🌰:

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .elementAt(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

🐸    

single

仅发送源Observable序列发出的第一个事件(或者是符合条件的第一个事件),如果Observable序列不是正好发送一个事件会报错。

举个🌰:

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .single()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .single { $0 == "🐸" }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

Observable.of("🐱", "🐰", "🐶", "🐱", "🐰", "🐶")
    .single { $0 == "🐰" }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .single { $0 == "🔵" }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

结果是:

🐱
Received unhandled error: Sequence contains more than one element.

next(🐸)
completed

next(🐰)
error(Sequence contains more than one element.)

error(Sequence doesn't contain any elements.)

take

让源Observable序列只发送从头开始的特定数量的事件。

举个🌰:

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .take(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

🐱
🐰
🐶    

takeLast

让源Observable序列只发送在最后的特定数量的事件。

举个🌰:

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .takeLast(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

🐸
🐷
🐵

takeWhile

让源Observable序列只发送满足特定条件的事件。

举个🌰:

let disposeBag = DisposeBag()

Observable.of(1, 2, 3, 4, 5, 6)
    .takeWhile { $0 < 4 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

1
2
3

takeUntil

让源Observable序列正常发送事件直到另一个Observable序列发出了第一个事件。

举个🌰:

let disposeBag = DisposeBag()

let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .takeUntil(referenceSequence)
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sourceSequence.onNext("🐱")
sourceSequence.onNext("🐰")
sourceSequence.onNext("🐶")

referenceSequence.onNext("🔴")

sourceSequence.onNext("🐸")
sourceSequence.onNext("🐷")
sourceSequence.onNext("🐵")

结果是:

next(🐱)
next(🐰)
next(🐶)
completed

skip

阻止源Observable序列从头开始发送特定数量的事件,也就是跳过前n个事件,从n+1个事件开始发。

举个🌰:

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .skip(2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

🐶
🐸
🐷
🐵    

skipWhile

阻止源Observable序列发送满足特定条件的事件,也就是条件满足就跳过。

举个🌰:

let disposeBag = DisposeBag()

Observable.of(1, 2, 3, 4, 5, 6)
    .skipWhile { $0 < 4 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

4
5
6

skipWhileWithIndex

阻止源Observable序列发送满足特定条件的事件,和上面操作的区别是判断条件的闭包中也传入了当前事件所属的索引号,所以不但可以用实际数据做判断条件也可以用索引号做判断条件。

举个🌰:

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .skipWhileWithIndex { element, index in
        index < 3
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

🐸
🐷
🐵

skipUntil

阻止源Observable序列发送事件直到另一个Observable序列发出了第一个事件。

举个🌰:

let disposeBag = DisposeBag()

let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .skipUntil(referenceSequence)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

sourceSequence.onNext("🐱")
sourceSequence.onNext("🐰")
sourceSequence.onNext("🐶")

referenceSequence.onNext("🔴")

sourceSequence.onNext("🐸")
sourceSequence.onNext("🐷")
sourceSequence.onNext("🐵")

结果是:

🐸
🐷
🐵

数学和整体操作

对Observable发出的整个事件序列进行操作。

toArray

将一个Observable序列转变成一个数组,将这个数组作为一个新的单一事件发出,然后发出完成事件。

举个🌰:

let disposeBag = DisposeBag()

Observable.range(start: 1, count: 10)
    .toArray()
    .subscribe { print($0) }
    .disposed(by: disposeBag)    )    

结果是:

next([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
completed

reduce

以一个值开始,应用一个累加的闭包到Observable序列发出的每一条数据上,最后返回总的计算结果作为一个单一事件。

举个🌰:

let disposeBag = DisposeBag()

Observable.of(10, 100, 1000)
    .reduce(1, accumulator: +)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

1111    

concat

拼接多个Observable序列成一个,等待上一个序列发出了完成事件后下一个序列才开始发送事件。

类似接力比赛,只有上一棒的小伙伴A跑完他的位置将棒成功放到下一棒小伙伴B的手中B才拿着棒开始跑。

举个🌰:

let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")

let variable = Variable(subject1)

variable.asObservable()
    .concat()
    .subscribe { print($0) }
    .disposed(by: disposeBag)

subject1.onNext("🍐")
subject1.onNext("🍊")

variable.value = subject2

subject2.onNext("I would be ignored")
subject2.onNext("🐱")

subject1.onCompleted()

subject2.onNext("🐭")

结果是:

next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)

连接操作

可连接的Observable序列内部封装了普通的Observable序列,区别在于普通的Observable序列在被订阅后立刻发送事件,而可连接的Observable序列只有被调用了connect()函数后才开始发送。这样可以等所有订阅者都订阅了之后再发送事件。

在学习可连接的Observable序列之前,先回顾一下普通的Observable序列。

举个🌰:

let interval = Observable<Int>.interval(2, scheduler: MainScheduler.instance)

_ = interval
    .subscribe(onNext: { print("Subscription: 1, Event: \($0)") })

delay(2) {
    print("delay two seconds")
    _ = interval
        .subscribe(onNext: { print("Subscription: 2, Event: \($0)") })
}    }

结果是:

Subscription: 1, Event: 0
delay two seconds
Subscription: 1, Event: 1
Subscription: 2, Event: 0
Subscription: 1, Event: 2
Subscription: 2, Event: 1
Subscription: 1, Event: 3
Subscription: 2, Event: 2
Subscription: 1, Event: 4
Subscription: 2, Event: 3
Subscription: 1, Event: 5
Subscription: 2, Event: 4
...

观察打印结果可以发现:

  1. 普通的Observable序列每当被订阅时立刻发送事件;
  2. 普通的Observable序列每当被新的订阅者订阅时都会重新从头开始发送事件。

publish

将普通的源Observable序列转换成一个可连接的Observable序列。

举个🌰:

print("here is begin")

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .publish()  // 1

_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })

delay(2) {
    print("delay two seconds")
    _ = intSequence.connect()  // 2
}

delay(4) {
    print("delay four seconds")
    _ = intSequence
        .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}

delay(6) {
    print("delay six seconds")
    _ = intSequence
        .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

结果是:

here is begin
delay two seconds
Subscription 1:, Event: 0
delay four seconds
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
delay six seconds
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 3:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4
...

解释一下上面的代码:

  1. 调用publish()函数将普通的Observable序列转换成一个可连接的Observable序列;
  2. 调用connect()函数使得可连接的Observable序列开始发送事件。

观察打印结果可以发现:

  1. 可连接的Observable序列被订阅时不会立刻发送事件;
  2. 可连接的Observable序列只有被调用connect()函数后才会从头开始发送事件;
  3. 可连接的Observable序列每当被新的订阅者订阅时不会重新从头开始发送事件,而是继续保持当前发送事件的节奏。

replay

将普通的源Observable序列转换成一个可连接的Observable序列,并且有一个缓存机制可以使得新的订阅者能收到之前已经发送过的特定数量的历史事件。

举个🌰:

print("here is begin")

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .replay(5)  // 1

_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })

delay(2) {
    print("delay two seconds")
    _ = intSequence.connect()  // 2
}

delay(4) {
    print("delay four seconds")
    _ = intSequence
        .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}

delay(8) {
    print("delay eight seconds")
    _ = intSequence
        .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

结果是:

here is begin
delay two seconds
Subscription 1:, Event: 0
delay four seconds
Subscription 2:, Event: 0
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
delay eight seconds
Subscription 3:, Event: 0
Subscription 3:, Event: 1
Subscription 3:, Event: 2
Subscription 3:, Event: 3
Subscription 3:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5
Subscription 1:, Event: 6
Subscription 2:, Event: 6
Subscription 3:, Event: 6
...

解释一下上面的代码:

  1. 调用replay()函数将普通的Observable序列转换成一个可连接的Observable序列,并且设置缓存历史事件数量为5;
  2. 调用connect()函数使得可连接的Observable序列开始发送事件。

观察打印结果可以发现:

  1. replay拥有上面publish一切的能力;
  2. 通过设置replay数量为5使得可连接的Observable序列每当被新的订阅者订阅时首先会最多发送5个历史事件;

multicast

将普通的源Observable序列转换成一个可连接的Observable序列,并且通过特定的subject对象发送事件。

举个🌰:

print("here is begin")

let subject = PublishSubject<Int>()

_ = subject
    .subscribe(onNext: { print("Subject: \($0)") }) // 1

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .multicast(subject)  // 2

_ = intSequence
    .subscribe(onNext: { print("\tSubscription 1:, Event: \($0)") })

delay(2) {
    print("delay two seconds")
    _ = intSequence.connect()  // 3
}

delay(4) {
    print("delay four seconds")
    _ = intSequence
        .subscribe(onNext: { print("\tSubscription 2:, Event: \($0)") })
}

delay(6) {
    print("delay six seconds")
    _ = intSequence
        .subscribe(onNext: { print("\tSubscription 3:, Event: \($0)") })
}

结果是:

here is begin
delay two seconds
Subject: 0
    Subscription 1:, Event: 0
delay four seconds
Subject: 1
    Subscription 1:, Event: 1
    Subscription 2:, Event: 1
Subject: 2
    Subscription 1:, Event: 2
    Subscription 2:, Event: 2
delay six seconds
Subject: 3
    Subscription 1:, Event: 3
    Subscription 2:, Event: 3
    Subscription 3:, Event: 3
Subject: 4
    Subscription 1:, Event: 4
    Subscription 2:, Event: 4
    Subscription 3:, Event: 4
...

解释一下上面的代码:

  1. 首先创建一个PublishSubject类型的对象subject并且去订阅它;
  2. 调用multicast()函数将普通的Observable序列转换成一个可连接的Observable序列,并且将刚才创建的subject对象作为参数传入;
  3. 调用connect()函数使得可连接的Observable序列开始发送事件。

观察打印结果可以发现:

  1. multicast拥有上面publish一切的能力;
  2. 通过将subject对象传入multicast()函数,使得每次subject对象都会首先接收到事件。

综上,可连接的Observable序列只有被调用了connect()方法之后才会开始发送事件,并且维持发送事件的节奏不变,不论同时有多少个订阅者是新订阅者还是老订阅者,他们收到的事件都是一样的。可见它的内部是有一个Subject的。

错误处理操作

遇到错误事件时进行一些处理的操作。

catchErrorJustReturn

遇到错误事件时改成返回一个next类型事件,然后发送一个完成事件。

举个🌰:

let disposeBag = DisposeBag()

let sequenceThatFails = PublishSubject<String>()

sequenceThatFails
    .catchErrorJustReturn("😊")
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)

结果是:

next(😬)
next(😨)
next(😡)
next(🔴)
next(😊)
completed

catchError

遇到错误事件时返回另一个Observable序列。

举个🌰:

let disposeBag = DisposeBag()

let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()

sequenceThatFails
    .catchError {
        print("Error:", $0)
        return recoverySequence
    }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)

recoverySequence.onNext("😊")

结果是:

next(😬)
next(😨)
next(😡)
next(🔴)
Error: test
next(😊)

retry

遇到错误事件时重新订阅此Observable序列。

举个🌰:

let disposeBag = DisposeBag()
var count = 1

let sequenceThatErrors = Observable<String>.create { observer in

    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onNext("🍊")

    if count == 1 {
        observer.onError(TestError.test)
        print("Error encountered")
        count += 1
    }

    observer.onNext("🐶")
    observer.onNext("🐱")
    observer.onNext("🐭")
    observer.onCompleted()

    return Disposables.create()
}

sequenceThatErrors
    .retry(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

-------

let disposeBag = DisposeBag()
var count = 1

let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onNext("🍊")

    if count < 5 {
        observer.onError(TestError.test)
        print("Error encountered")
        count += 1
    }

    observer.onNext("🐶")
    observer.onNext("🐱")
    observer.onNext("🐭")
    observer.onCompleted()

    return Disposables.create()
}

sequenceThatErrors
    .retry(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
🐶
🐱
🐭

-------

🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
Error encountered
Received unhandled error: -> test

Debug操作

帮助debug的操作。

debug

打印所有被订阅,事件和被销毁。

举个🌰:

let disposeBag = DisposeBag()
var count = 1

let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onNext("🍊")

    if count < 5 {
        observer.onError(TestError.test)
        print("Error encountered")
        count += 1
    }

    observer.onNext("🐶")
    observer.onNext("🐱")
    observer.onNext("🐭")
    observer.onCompleted()

    return Disposables.create()
}

sequenceThatErrors
    .retry(3)
    .debug()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

结果是:

2017-04-17 21:41:29.960: playground20.swift:42 (__lldb_expr_20) -> subscribed
2017-04-17 21:41:29.963: playground20.swift:42 (__lldb_expr_20) -> Event next(🍎)
🍎
2017-04-17 21:41:29.964: playground20.swift:42 (__lldb_expr_20) -> Event next(🍐)
🍐
2017-04-17 21:41:29.964: playground20.swift:42 (__lldb_expr_20) -> Event next(🍊)
🍊
Error encountered
2017-04-17 21:41:29.967: playground20.swift:42 (__lldb_expr_20) -> Event next(🍎)
🍎
2017-04-17 21:41:29.967: playground20.swift:42 (__lldb_expr_20) -> Event next(🍐)
🍐
2017-04-17 21:41:29.967: playground20.swift:42 (__lldb_expr_20) -> Event next(🍊)
🍊
Error encountered
2017-04-17 21:41:29.969: playground20.swift:42 (__lldb_expr_20) -> Event next(🍎)
🍎
2017-04-17 21:41:29.969: playground20.swift:42 (__lldb_expr_20) -> Event next(🍐)
🍐
2017-04-17 21:41:29.970: playground20.swift:42 (__lldb_expr_20) -> Event next(🍊)
🍊
Error encountered
2017-04-17 21:41:29.971: playground20.swift:42 (__lldb_expr_20) -> Event error(test)
Received unhandled error: -> test
2017-04-17 21:41:29.997: playground20.swift:42 (__lldb_expr_20) -> isDisposed

RxSwift.Resources.total

展示了Rx当前所有资源使用数,用于开发期间检查是否存在内存泄漏。

举个🌰:

{
    print(RxSwift.Resources.total)

    let disposeBag = DisposeBag()

    print(RxSwift.Resources.total)

    let variable = Variable("🍎")

    let subscription1 = variable.asObservable().subscribe(onNext: { print($0) })

    print(RxSwift.Resources.total)

    let subscription2 = variable.asObservable().subscribe(onNext: { print($0) })

    print(RxSwift.Resources.total)

    subscription1.dispose()

    print(RxSwift.Resources.total)

    subscription2.dispose()

    print(RxSwift.Resources.total)
}

print(RxSwift.Resources.total)

结果是:

0
2
🍎
8
🍎
10
9
8
0

再看一眼最开始出现过的图片。

1