RxSwift进阶,细节与UI绑定

上一篇主要是一些基础概念,这篇来涉及些稍微深奥难懂的。

仍然先来一张图镇帖。

1

多线程

Schedulers是RxSwift中对ThreadGCDOperationQueue等iOS开发中线程相关进行的抽象和封装。可以认为一个Scheduler代表一个线程。

涉及的类主要有:

  • MainScheduler
  • CurrentThreadScheduler
  • SerialDispatchQueueScheduler
  • ConcurrentDispatchQueueScheduler
  • OperationQueueScheduler

什么意思其实从命名上就可以看出来,不多解释了。

涉及的操作符有两个:

并不是很好解释,还是直接看代码吧。

首先定义一个辅助函数:

func myJust(_ element: String) -> Observable<String> {
    print("Creating an Observable in thread : \(Thread.current)")
    return Observable.create { observer in
        print("Emitting one Event in thread : \(Thread.current)")
        observer.on(.next(element))
        observer.on(.completed)
        return Disposables.create()
    }
}

作用和RxSwift自带的Just操作符一样,只是额外多了两行打印代码。

🌰1:

print("Subscribing an Observable in thread : \(Thread.current)")
myJust("🍎")
    .do(onNext: { print("Intercepted \($0) in thread : \(Thread.current)") }
        , onError: nil, onCompleted: nil
    )
    .filter({
        print("Filtering \($0) in thread : \(Thread.current)")
        return $0 != "🍐"
    })
    .subscribe(onNext: {
        print("Receiving \($0) in thread : \(Thread.current)")
    })
    .disposed(by: disposeBag)

结果1:

Subscribing an Observable in thread : <NSThread: 0x608000078f40>{number = 1, name = main}
Creating an Observable in thread : <NSThread: 0x608000078f40>{number = 1, name = main}
Emitting one Event in thread : <NSThread: 0x608000078f40>{number = 1, name = main}
Intercepted 🍎 in thread : <NSThread: 0x608000078f40>{number = 1, name = main}
Filtering 🍎 in thread : <NSThread: 0x608000078f40>{number = 1, name = main}
Receiving 🍎 in thread : <NSThread: 0x608000078f40>{number = 1, name = main}

观察结果可以发现:

  1. 代码执行顺序为:订阅-创建-发送-拦截-过滤-接收,所有代码的执行都发生在同一个线程中 ;

解释一下上面代码:

  1. 订阅作为初始步骤,所在的上下文环境是主线程,导致接下来的所有步骤都是发生在主线程:也就是说默认情况下在哪个线程订阅一个Observable,之后所有代码都会在同一个线程中执行。

🌰2:

let serialScheduler1 = SerialDispatchQueueScheduler(internalSerialQueueName: "com.kaisa.gcdqueue1")  
let serialScheduler2 = SerialDispatchQueueScheduler(internalSerialQueueName: "com.kaisa.gcdqueue2")  // 1

print("Subscribing an Observable in thread : \(Thread.current)")
myJust("🍎")
    .do(onNext: { print("Intercepted \($0) in thread : \(Thread.current)") }
        , onError: nil, onCompleted: nil
    )
    .observeOn(serialScheduler1)  // 2
    .filter({
        print("Filtering \($0) in thread : \(Thread.current)")
        return $0 != "🍐"
    })
    .observeOn(serialScheduler2)  // 2
    .map({ fruit in
        print("Mapping \(fruit) in thread : \(Thread.current)")
        return "🍊"
    })
    .observeOn(MainScheduler.instance)  // 3
    .subscribe(onNext: { (fruit: String) in
        print("Receiving \(fruit) in thread : \(Thread.current)")
    })
    .disposed(by: disposeBag)    

结果2:

Subscribing an Observable in thread : <NSThread: 0x60000007e440>{number = 1, name = main}
Creating an Observable in thread : <NSThread: 0x60000007e440>{number = 1, name = main}
Emitting one Event in thread : <NSThread: 0x60000007e440>{number = 1, name = main}
Intercepted 🍎 in thread : <NSThread: 0x60000007e440>{number = 1, name = main}
Filtering 🍎 in thread : <NSThread: 0x600000470c40>{number = 3, name = (null)}
Mapping 🍎 in thread : <NSThread: 0x608000671200>{number = 4, name = (null)}
Receiving 🍊 in thread : <NSThread: 0x60000007ee40>{number = 1, name = main}

观察结果可以发现:

  1. 代码执行顺序并没有改变,但是却发生在三个不同的线程中 ;

解释一下上面代码:

  1. 创建两个和线程对应的Scheduler对象(此例子中应用的是GCD的串行队列);
  2. observeOn()操作符传入Scheduler对象可以改变事件接受时代码执行所在的线程:也就是说这个操作符会改变Observable对外发送事件时使用的线程,导致之后接收到这个事件时也会在相同线程中 ;
  3. 通常都是在主线程中订阅一个Observable,然后回调也和UI相关,所以如果中途偏离了主线程最后都要转回来 ;
  4. 在一段代码串中可以同时使用多次observeOn()操作符。

🌰3:

let serialScheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "com.kaisa.gcdqueue")  // 1

print("Subscribing an Observable in thread : \(Thread.current)")
myJust("🍎")
    .do(onNext: { print("Intercepted \($0) in thread : \(Thread.current)") }
        , onError: nil, onCompleted: nil
    )
    .map({ fruit in
        print("Mapping \(fruit) in thread : \(Thread.current)")
        return "🍋"
    })
    .subscribeOn(serialScheduler)  // 2
    .observeOn(MainScheduler.instance)  // 3
    .subscribe(onNext: { (fruit: String) in
        print("Receiving \(fruit) in thread : \(Thread.current)")
    })
    .disposed(by: disposeBag)

结果3:

Subscribing an Observable in thread : <NSThread: 0x60000006fec0>{number = 1, name = main}
Creating an Observable in thread : <NSThread: 0x60000006fec0>{number = 1, name = main}
Emitting one Event in thread : <NSThread: 0x60000046e2c0>{number = 3, name = (null)}
Intercepted 🍎 in thread : <NSThread: 0x60000046e2c0>{number = 3, name = (null)}
Mapping 🍎 in thread : <NSThread: 0x60000046e2c0>{number = 3, name = (null)}
Receiving 🍋 in thread : <NSThread: 0x60000006fec0>{number = 1, name = main}

观察结果可以发现:

  1. 代码执行顺序依然没有改变,但是却发生在两个不同的线程中 ;

解释一下上面代码:

  1. 创建一个和线程对应的Scheduler对象 ;
  2. subscribeOn()操作符传入Scheduler对象可以改变创建Observable之后所有代码执行所在的线程:也就是说这个操作符会从源头上改变代码执行所在的环境,比observeOn()起作用的时机要早 ;
  3. 应用observeOn()操作符传入MainScheduler.instance使得最后的回调依然发生在主线程 ;
  4. subscribeOn()操作符并不常用。

🌰4:

let serialScheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "com.kaisa.gcdqueue")

let observable = Observable.just("🍎").delay(1.0, scheduler: serialScheduler)  // 1

print("Subscribing an Observable in thread : \(Thread.current)")
observable
    .do(onNext: { print("Intercepted \($0) in thread : \(Thread.current)") }
        , onError: nil, onCompleted: nil
    )
    .map({ fruit in
        print("Mapping \(fruit) in thread : \(Thread.current)")
        return "🍐"
    })
    .observeOn(MainScheduler.instance) 
    .subscribe(onNext: { (fruit: String) in
        print("Receiving \(fruit) in thread : \(Thread.current)")
    })
    .disposed(by: disposeBag)

结果4:

Subscribing an Observable in thread : <NSThread: 0x600000076c80>{number = 1, name = main}
Intercepted 🍎 in thread : <NSThread: 0x60800047c180>{number = 3, name = (null)}
Mapping 🍎 in thread : <NSThread: 0x60800047c180>{number = 3, name = (null)}
Receiving 🍐 in thread : <NSThread: 0x600000076c80>{number = 1, name = main}

观察结果可以发现:

  1. 应用RxSwift提供的便捷创建Observable的方法时,创建&发送两个时机就不能直接控制了 ;

解释一下上面代码:

  1. 创建一个Observable时通过调用delay操作符可以传入一个Scheduler对象,作用类似于上面的subscribeOn(),所以此时无须调用subscribeOn()了。

PS:还记得上一篇文章中提到的interval操作符么?使用方式类似这样:

Observable<Int>
    .interval(0.3, scheduler: MainScheduler.instance)
    .subscribe (onNext: { print($0) })
    .disposed(by: disposeBag)

实现固定时间间隔发送一个事件(在这里是一个整数)的功能,值得注意的是interval操作符也要传入一个Scheduler对象,意思和上面例子中一样,相信你已经理解了。

Cold & Hot

在swift语言上另一个非常强大的FRP框架RAC中和RxSwift中Observable相对应的概念是signal,翻译过来就是信号。RAC中明确将信号分为两种,即:冷信号和热信号,它们是两种不同的类型。那么在RxSwift中有没有Cold Observable 和 Hot Observable之分呢?其实也是有的。

ReactiveX.io中有如下一段说明文字:

“Hot” and “Cold” Observables

When does an Observable begin emitting its sequence of items? It depends on the Observable. A “hot” Observable may begin emitting items as soon as it is created, and so any observer who later subscribes to that Observable may start observing the sequence somewhere in the middle. A “cold” Observable, on the other hand, waits until an observer subscribes to it before it begins to emit items, and so such an observer is guaranteed to see the whole sequence from the beginning.

In some implementations of ReactiveX, there is also something called a “Connectable” Observable. Such an Observable does not begin emitting items until its Connect method is called, whether or not any observers have subscribed to it.

关键区别点在于Observable开始发送事件的时机。如果它被创建后无论有没有被订阅都立即发送那么它就是一个热的Observable,如果只有被订阅后才开始发送那么它就是一个冷的Observable。而这其实会引申出另一个更重要的问题:不同订阅者在不同时间点订阅同一个源Observable,它们接收到的是否是这个源Observable发送的全部事件。新的订阅者订阅了一个热的Observable后只会接收到之后发出的事件,也就是说不同订阅者之间共享同一个源Observable;而订阅了一个冷的Observable后会完整接收到发出的全部事件,也就是说每一个订阅者对应着一个单独的源Observable。

举个不是十分恰当的例子:它们之间有点像火车和出租车的关系,一列火车出发的时间点以及经过哪些站都是提前就定好了的(忽略晚点及其他特殊情况),它不会因有没有乘客有多少乘客而改变,所有乘客共享这一列火车,一个乘客若上一站没有赶上则只能去下一站上车了;而出租车呢,任何一定时间内对于一个乘客而言都是一一对应的(暂不考虑多人共打一辆车的情况),A想从海淀去朝阳需要打一辆车,B也想从海淀去朝阳则需要打另外一辆车,它们彼此之间相互独立,也就是不共享。

核心关键就是一个词:共享。

还记得上一篇介绍可连接的Observable时涉及到的连接操作符么,尤其是replay,忘了的话翻回去复习一遍,对接下来的理解很有帮助。

下面直接看代码。

首先定义一个辅助函数:

func myInterval(_ interval: TimeInterval) -> Observable<Int> {
    return Observable.create { observer in
        print("Subscribed")
        let timer = DispatchSource.makeTimerSource(queue: DispatchQueue.global())
        timer.scheduleRepeating(deadline: DispatchTime.now() + interval, interval: interval)

        let cancel = Disposables.create {
            print("Disposed")
            timer.cancel()
        }

        var next = 0
        timer.setEventHandler {
            if cancel.isDisposed {
                return
            }
            observer.on(.next(next))
            next += 1
        }
        timer.resume()

        return cancel
    }
}

如果看不懂也没关系,不用关心太多细节,只要理解它是仿照RxSwift提供的interval操作符,实现间隔一定时间发送一个整形变量的功能就可以了。

🌰1:

let observable = myInterval(1)

print("Started ----")

let subscription1 = observable
    .subscribe(onNext: { n in
        print("First \(n)")
    })

Thread.sleep(forTimeInterval: 2)

let subscription2 = observable
    .subscribe(onNext: { n in
        print("Second \(n)")
    })

Thread.sleep(forTimeInterval: 3)

subscription1.dispose()

Thread.sleep(forTimeInterval: 4)

subscription2.dispose()

print("Ended ----")

结果1:

Started ----
Subscribed
First 0
First 1
Subscribed
First 2
Second 0
First 3
Second 1
First 4
Second 2
Disposed
Second 3
Second 4
Second 5
Second 6
Disposed
Ended ----

观察结果可以发现:

  1. observable在被订阅之后每间隔1s都会发送一个整形数 ;
  2. 订阅者1 subscription1 在订阅的5s时间内共接收到5条数据,订阅者2 subscription2 在订阅的7s时间内共接收到7条数据,一如预期 ;
  3. SubscribedDisposed分别都被打印了两次,订阅者1和订阅者2接收到的整形数都是从0开始的 ;

解释一下上面代码:

  1. SubscribedDisposed分别都被打印了两次,证明源observable内部的创建流程也走了两次 ;
  2. 订阅者2在2s后才订阅也是从0开始接收到数据,证明每一个订阅者都对应一个源observable

这就是一种典型的Cold Observable的使用方式。那么到底这种方式有什么弊端或者说不满足什么情况呢?

一切取决于observable内部发送事件的机制,像上面例子中只是间隔固定时间发送一个整形数,对外界并不会产生任何影响,所以并没有问题。但是考虑这么一种情况:源observable被订阅后会触发一个网络请求,然后根据请求结果决定对外发送的数据,两个订阅者分别是两个UI控件,它们等待源observable发出数据进行刷新。那么用上面这种方式会导致网络请求发送两次,从而有可能造成UI刷新不一致,浪费流量,数据统计错误等一系列问题。有些文章中会说这是一种“副作用”,叫什么名字无所谓,领会精神就好。

那么问题来了,怎么样解决这种问题呢?答案很简单,有一个新的操作符是:shareReplay()

shareReplay

来看下面的🌰2:

let observable = myInterval(1).shareReplay(1)  // 1

print("Started ----")

let subscription1 = observable
    .subscribe(onNext: { n in
        print("First \(n)")
    })

Thread.sleep(forTimeInterval: 2)

let subscription2 = observable
    .subscribe(onNext: { n in
        print("Second \(n)")
    })

Thread.sleep(forTimeInterval: 3)

subscription1.dispose()

Thread.sleep(forTimeInterval: 4)

subscription2.dispose()

print("Ended ----")

结果2:

Started ----
Subscribed
First 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
Second 5
Second 6
Second 7
Second 8
Disposed
Ended ----

观察结果可以发现:

  1. observable在被订阅之后仍然每间隔1s发送一个整形数 ;
  2. 订阅者1 subscription1 在订阅的5s时间内共接收到5条数据,而且是从0开始的,和之前一样 ;
  3. 订阅者2 subscription2 在订阅的7s时间内共接收到8条数据,比之前多一条,而且是从1开始的,和之前不一样 ;
  4. SubscribedDisposed分别都只被打印了一次 ;

解释一下上面代码:

  1. 和上面例子唯一的区别是应用了shareReplay()操作符并且传入了一个1作为参数 ;
  2. SubscribedDisposed分别都只被打印了一次,证明源observable内部的创建流程只走了一次 ;
  3. 订阅者2在2s后开始订阅然后立刻接收到的第一个数据是1是因为经过了shareReplay(1)之后的observable内部会保存当前发送的最后一条数据,当有新的订阅者订阅之后首先将最后一条再发送给它,也就是说经过shareReplay(1)之后一个无状态的observable会变为有状态的observable
  4. 订阅者2在收到1之后又和订阅者1同时陆续收到了整数2,3,4,证明和两个订阅者之间确实共享着同一个源Observable

建议读者再向shareReplay()中分别传入0和2对比观察一下输出结果。

可见,通过使用shareReplay()使得多个订阅者之间共享同一个源Observable就完美的解决了上面例子中使用Cold Observable所带来的问题,此时就变成了Hot Observable。

shareReplayLatestWhileConnected

shareReplay 还有一个兄弟操作符叫做 shareReplayLatestWhileConnected,两者之间只有一个非常小的区别。

🌰3:

let observable = myInterval(1).shareReplay(1)

print("Started ----")

let subscription1 = observable
    .subscribe(onNext: { n in
        print("First \(n)")
    })

Thread.sleep(forTimeInterval: 2)

let subscription2 = observable
    .subscribe(onNext: { n in
        print("Second \(n)")
    })

Thread.sleep(forTimeInterval: 3)

subscription1.dispose()

Thread.sleep(forTimeInterval: 4)

subscription2.dispose()

print("Observer become 0")

Thread.sleep(forTimeInterval: 1)

let subscription3 = observable
    .subscribe(onNext: { n in
        print("Third \(n)")
    })

Thread.sleep(forTimeInterval: 2)

subscription3.dispose()

print("Ended ----")

结果3:

Started ----
Subscribed
First 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
Second 5
Second 6
Second 7
Second 8
Disposed
Observer become 0
Third 8
Subscribed
Third 0
Third 1
Disposed
Ended ----

注意和上面例子的区别,observable会经历一个从被订阅到没有被订阅再到重新被订阅的过程,而重新被订阅时仍然首先会发送一个此前曾经发送过的最后一个值,之后再重新开始。

🌰4:

let observable = myInterval(1).shareReplayLatestWhileConnected()

print("Started ----")

let subscription1 = observable
    .subscribe(onNext: { n in
        print("First \(n)")
    })

Thread.sleep(forTimeInterval: 2)

let subscription2 = observable
    .subscribe(onNext: { n in
        print("Second \(n)")
    })

Thread.sleep(forTimeInterval: 3)

subscription1.dispose()

Thread.sleep(forTimeInterval: 4)

subscription2.dispose()

print("Observer become 0")

Thread.sleep(forTimeInterval: 1)

let subscription3 = observable
    .subscribe(onNext: { n in
        print("Third \(n)")
    })

Thread.sleep(forTimeInterval: 2)

subscription3.dispose()

print("Ended ----")

结果4:

Started ----
Subscribed
First 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
Second 5
Second 6
Second 7
Second 8
Disposed
Observer become 0
Subscribed
Third 0
Third 1
Disposed
Ended ----  

对比shareReplayLatestWhileConnectedshareReplay(1)可以发现,从被订阅到没有被订阅再到重新被订阅时不会发送此前曾经发送过的最后一个值,而是直接重新开始。

弄懂了这两个之后再翻回去看一下上一篇介绍的Subject相关的几个操作符,相信你一定会豁然开朗。

最后看一下RxSwift官方介绍加深一下理解:Hot and Cold Observables

自定义operator

上一篇文章中介绍了很多个操作符,这篇文章前面部分又重点介绍了3个操作符,另外还有很多并未提及的操作符,所以从某种意义上可以说,RxSwift之所以强大就在于它本身定义了各种各样的操作符。

这里是所有操作符的列表,有事没事可以查一查,看看具体都有哪些。

但即使是已经有了这么多,在实际开发中可能也会遇到不满足条件的情况,怎么办呢?很简单,我们可以自定义操作符。

鉴于本身已经存在的操作符的实现方式已经做了相当程度的优化,所以貌一看不一定能看得懂。但万变不离其宗,创建操作符在本质上仍然是创建一个Observable序列,所以其实方法你已经知道了。而且前面也已经或多或少的涉及过,还记得上文中出现的 myJust()myInterval() 么?翻回去再看一遍,它们就是自定义的操作符。

来看一下官方提供的未经过优化的map操作符版本:

extension ObservableType {
    func myMap<R>(transform: @escaping (E) -> R) -> Observable<R> {
        return Observable.create { observer in
            let subscription = self.subscribe { e in
                switch e {
                case .next(let value):
                    let result = transform(value)
                    observer.on(.next(result))
                case .error(let error):
                    observer.on(.error(error))
                case .completed:
                    observer.on(.completed)
                }
            }

            return subscription
        }
    }
}

使用方式和使用系统提供的一模一样:

let subscription = myInterval(0.1)
    .myMap { event in
        return "This is simply \(event)"
    }
    .subscribe(onNext: { next in
        print(next)
    })

一如预期,输出结果为:

Subscribed
This is simply 0
This is simply 1
This is simply 2
This is simply 3
This is simply 4
This is simply 5
This is simply 6
...

相当简单有木有?!


目前为止介绍的全是RxSwift针对swift这门语言实现的函数式响应式编程范式,至于你是用swift写客户端还是写服务端其实无所谓。接下来要介绍的一些内容就只适用于iOS系统开发苹果手机App了。

大一统

上一篇中提到过,RxSwift可以将iOS系统默认提供的几种事件响应方式进行统一,这样能很大程度上提高代码可读性,使得代码更加易于维护与拓展。下面就来看看具体是怎么做的。

KVO

先来看一下系统提供的使用KVO的三个函数,注意是三个!!!

func addObserver(_ observer: NSObject, forKeyPath keyPath: String, options: NSKeyValueObservingOptions = [], context: UnsafeMutableRawPointer?)
func removeObserver(_ observer: NSObject, forKeyPath keyPath: String)
func observeValue(forKeyPath keyPath: String?, of object: Any?, change: [NSKeyValueChangeKey : Any]?, context: UnsafeMutableRawPointer?)

具体有多难用就不提了,谁用谁知道。

来看一下RxSwift提供的支持KVO的两种方式,每种方式都只有一个函数。

func observe<E>(_ type: E.Type, _ keyPath: String, options: NSKeyValueObservingOptions = [.new, .initial], retainSelf: Bool = true) -> Observable<E?> { }

func observeWeakly<E>(_ type: E.Type, _ keyPath: String, options: NSKeyValueObservingOptions = [.new, .initial]) -> Observable<E?> { }

区别主要在于内存管理方式不一致。来看一下怎么使用:

view
   .rx.observe(CGRect.self, "frame")
    .subscribe(onNext: { frame in
      ...
    })
    .disposed(by: disposeBag)

或者

view
    .rx.observeWeakly(CGRect.self, "frame")
    .subscribe(onNext: { frame in
      ...
    }) 
    .disposed(by: disposeBag)       

真是没有对比就没有伤害,既不需要注册又不需要移除,比系统的方便简单很多有木有?!

rx.observe

rx.observe性能更加高效是因为它仅仅是对系统KVO的简单封装,相对来说适合应用的场景比较少:

  • 它可以用于监测由self或者父类开始的路径 (retainSelf = false)
  • 它可以用于监测由子类开始的路径 (retainSelf = true)
  • 整个路径当中的所有属性必须都是用strong修饰的,否则就会面临着crash的风险

举个例子:

self.rx.observe(CGRect.self, "view.frame", retainSelf: false)

rx.observeWeakly

rx.observeWeakly会比rx.observe性能稍微差一些是因为它为了防止弱引用出现不得不去处理对象的释放(dealloc)。

它除了可以用于任何适用于rx.observe的场景之外,还适合应用的场景有:

  • 因为它不会保留被监测的目标,它可以用于监测任意的对象,即使它的所有权关系未知
  • 它可以用于监测用weak修饰的属性

再举个例子:

someSuspiciousViewController.rx.observeWeakly(Bool.self, "behavingOk")

总结一下,除非你十分确定用rx.observe很合适,否则都用rx.observeWeakly没毛病。

Notification

通知和KVO特别像,首先还是看一下系统提供的函数:

func addObserver(_ observer: Any, selector aSelector: Selector, name aName: NSNotification.Name?, object anObject: Any?)
func addObserver(forName name: NSNotification.Name?, object obj: Any?, queue: OperationQueue?, using block: @escaping (Notification) -> Swift.Void) -> NSObjectProtocol
removeObserver(_ observer: Any)
post(name aName: NSNotification.Name, object anObject: Any?, userInfo aUserInfo: [AnyHashable : Any]? = nil)

前两个函数是并列关系,使用时二选一。其实相比于KVO,并没有觉得通知多么难用。

来看一下RxSwift提供的用于代替系统通知的函数。

public func notification(_ name: Notification.Name?, object: AnyObject? = nil) -> Observable<Notification> { }

使用方式为:

NotificationCenter.default
   .rx.notification(Notification.Name(rawValue: "testNotification"), object: nil)
   .subscribe(onNext: { noti in
          ...
   })
   .disposed(by: disposeBag)

或者

NotificationCenter.default
     .rx.notification(Notification.Name.UITextViewTextDidBeginEditing, object: myTextView)
     .subscribe(onNext: { n in
       ...
     }) 
     .disposed(by: disposeBag) 

可见,只是使得接收通知时的处理方式变的更加优雅,发送和销毁的方式不变。

Target-Action

这种方式变得尤其简单。

@IBOutlet weak var button: UIButton!

button.rx.tap
   .subscribe(onNext: {
       print("button is clicked")
   })
   .addDisposableTo(disposeBag)

真是没什么可解释的,So Easy,照葫芦画瓢就可以。

Delegate

不废话,直接看代码。

@IBOutlet weak var scrollView: UIScrollView!

scrollView.rx.didScroll
   .subscribe(onNext: { [weak self] in
       print("current offsetY is \(self?.scrollView.contentOffset.y)")
   })
   .addDisposableTo(disposeBag)

可见,RxSwift将UIScrollView的代理方法scrollViewDidScroll:封装成了一个Observable的属性,使用的时候不再需要跨方法,直接订阅就好了。代码更加紧凑有木有?

至于如何拓展自己定义的代理方法也可以应用这种方式略有点复杂,暂时忽略,有兴趣的可以自己去研究一下。

绑定

一句话描述重要性:这是RxSwift甚至响应式编程中最重要的一个概念,没有之一。

其实全称应该是数据绑定,百度百科上是这么定义的:绑定是将一个用户界面元素(控件)的属性绑定到一个类型(对象)实例上的某个属性的方法。

如果你并没有完全理解,那么我来举一个通俗易懂的例子。假设现在要做一个类似微信朋友圈的功能,可以发帖可以点赞,在每条帖子下面要展示当前获得的点赞数。很简单的需求对吧,抛开发帖点赞不提只考虑展示点赞,假设模型类的实例对象叫做article,点赞数量的属性字段叫做starCount,只需要取到这个字段然后赋值给一个label就可以。那么当当前用户也点赞了后,starCount要+1,label要改变,怎么做呢?按照以往最简单的方式无外乎是对articlestarCount做一个KVO,当改变时重新对label进行赋值。那如果应用绑定的方式呢,将这个labelarticlestarCount进行绑定,starCount可以随意变,无需手动写KVO,无需手动重新赋值,label都会自动改变,666有木有?!

然后注意这只是单向绑定,双向绑定太复杂,暂时不做讨论。

先来看一个例子:

@IBOutlet weak var number1: UITextField!
@IBOutlet weak var number2: UITextField!
@IBOutlet weak var label: UILabel!

let disposeBag = DisposeBag()

number1.rx.text
    .bind(to: label.rx.text)
    .addDisposableTo(disposeBag)

number1.rx.text
    .bind(to: number2.rx.text)
    .addDisposableTo(disposeBag)

首先定义了两个UITextField和一个UILabel分别叫做number1number2label,然后应用bind命令将number1label进行绑定,将number1number2也进行绑定。实际运行时会发现无论在number1中输入任何内容number2label都会变成同样的内容。这就是绑定的厉害。那么问题来了,为什么可以这样绑定呢?

原来,通过查看源码会发现 number1.rx.text 是一个 ControlProperty<String?> 类型的属性,而 label.rx.text 是一个 UIBindingObserver<Base, String?> 类型的属性,那么这两个究竟是什么呢?

在介绍这两个之前,首先要插个队,还记得上面用过的 button.rx.tap吗,其实它是一个 ControlEvent<Void> 类型的属性,现在也来探究一下它究竟是什么。

在实际开发中最常用的三个基础控件:UIButtonUITextFieldUILabel

相对应的想要理解如何实现的绑定首先要清楚三个概念:ControlEventControlPropertyUIBindingObserver

首先要明确的一点是这三个概念都只适用于UI层,所以不存在多线程的问题。

ControlEvent

首先看定义:

struct ControlEvent<PropertyType> : ControlEventType

protocol ControlEventType : ObservableType

于是我们发现首先ControlEvent是一个结构体,然后归根结底实现了ObservableType协议,也就是说它是一个被观察者,即Observable

然后它有以下这些性质:

- it never fails
- it won't send any initial value on subscription
- it will `Complete` sequence on control being deallocated
- it never errors out
- it delivers events on `MainScheduler.instance`

都是什么意思呢,拿刚才的按钮举例,一个按钮被点击是一个ControlEvent的实例,那么:

  1. 不考虑主线程卡死,按钮一直可以被点击,当然不会失败;
  2. 按钮只有被按下时才会发送事件,刚被进行绑定后不会发送事件;
  3. 按钮被销毁时会发送一个完成类型事件;
  4. 和第一条差不多,不会失败也就不会发送错误类型事件;
  5. 发送事件都是在主线程;

一句话总结就是,这是一个只能在主线程发送事件的Observable

顺便提一句,上面用过的 scrollView.rx.didScroll其实也是一个ControlEvent<Void> 类型的属性。所以现在你终于知道它们为什么可以被订阅了吧!

ControlProperty

仍然首先看定义:

struct ControlProperty<PropertyType> : ControlPropertyType

protocol ControlPropertyType : ObservableType, ObserverType

于是我们发现首先ControlProperty也是一个结构体,然后归根结底同时实现了ObservableTypeObserverType两个协议,也就是说它即是一个被观察者,即Observable,又是一个观察者,即Observer

然后它有以下这些性质:

- it never fails
- `shareReplay(1)` behavior
    - it's stateful, upon subscription (calling subscribe) last element is immediately replayed if it was produced
- it will `Complete` sequence on control being deallocated
- it never errors out
- it delivers events on `MainScheduler.instance`

拿刚才的输入框举例子,一个输入框的text值是一个ControlProperty的实例,那么:

  1. 不考虑主线程卡死,输入框一直可以被改变,当然不会失败;
  2. 一个输入框当然可以同时被多个观察者订阅,此时这些观察者是共享这一个输入框的;
  3. 剩下三条都和上面一样。

一句话总结就是,这是一个只能在主线程发送事件的Observable和一个只能在主线程订阅事件的Observer的联合体。

UIBindingObserver

依然首先看定义:

class UIBindingObserver<UIElementType, Value> : ObserverType where UIElementType: AnyObject

于是我们发现首先UIBindingObserver是一个类,然后归根结底实现了ObserverType协议,也就是说它是一个观察者,即Observer

然后它有以下这些规则或者说限制:

* can't bind errors 
 * ensures binding is performed on main thread

拿刚才的标签举例子,一个标签的text值是一个UIBindingObserver的实例,那么:

  1. 因为标签的text属性对应的只能是有效的字符串,那么如果绑定一个会发送错误事件的Observable,接收到错误事件时无法处理了;
  2. 既然是标签就属于UIKit库,也就被限制住只能在主线程操作,既然只能在主线程接收值,那么也就限制了只能在主线程绑定值。

一句话总结就是,这是一个只能在主线程进行绑定的Observer

所谓源码面前没有秘密,初步了解了这三个基本概念之后再回过头去看上面的例子就会豁然开朗。绑定其实也不过就是一个Observable去调用绑定函数,函数中传入另一个Observer而已。number1.rx.text 作为一个Observable当然可以去调用bind函数,而number2.rx.textlabel.rx.text作为一个Observer当然也可以作为参数传到bind函数中。

顺便提一句,UIBindingObserver还可以自拓展以更好的满足实际开发中的需求。

怎么做呢,参考已经实现好了的呀,比如刚才的label.rx.text是这样实现的:

extension Reactive where Base: UILabel {

    /// Bindable sink for `text` property.
    public var text: UIBindingObserver<Base, String?> {
        return UIBindingObserver(UIElement: self.base) { label, text in
            label.text = text
        }
    }

}

于是乎仿照一个:

extension ValidationResult: CustomStringConvertible {
    public var description: String {
        switch self {
        case .empty:
            return ""
        case let .ok(message):
                return message
           case let .failed(message):
                return message
            }
        }
}

extension ValidationResult {
    var textColor: UIColor {
            switch self {
            case .empty:
            return UIColor.yellow
        case .ok:
            return UIColor.green
        case .failed:
            return UIColor.red
            }
    }
}

extension Reactive where Base: UILabel {
    var validationResult: UIBindingObserver<Base, ValidationResult> {
        return UIBindingObserver(UIElement: base) { label, result in
            label.text = result.description
            label.textColor = result.textColor
        }
    }
}

通过使用这个拓展,可以做到同时修改标签的内容及颜色。

Driver

到目前为止,对RxSwift也介绍了不少内容了,但是最核心的其实就是四个概念,两个名词:被观察者Observable)和观察者Observer);两个动词:订阅subscribe)和绑定bind)。接下来终于要轮到第五个也是最后一个了,它就是 – 驱动drive)。

事实上,它是专为UI层绑定方便而创造出的一个概念,也就是说当在UI层做绑定时它并不是必须的,但是应用它会使得事情更加简单。

UI层的订阅和绑定

既然说到UI层,特意说明一下:在UI层做绑定所使用的Observable其实是必须要满足于以下三个条件的。

主线程

所有系统提供的UI控件全部在UIKit框架中,而这个框架只能在主线程中被使用,否则不知道会发生什么。当然这一点你肯定是知道的,那么相对应的在主线程中绑定使用的Observable也就只能在主线程中发送数据,所以通常也就会调用一下操作符:observeOn(),然后传入MainScheduler.instance

错误处理

上一小节介绍的三个概念有很多相同之处,其中一个就是can't error。因为无论是一个UITextField还是一个UILabel都无法接收处理错误,所以通常也就会调用一下操作符:catchErrorJustReturn()然后传入一个当错误发生时要传递的值。

共享给多个订阅者

通常在UI层都会多个Observer共享同一个Observable,比如当多个UI控件的数据来源于同一个网络请求的结果时,你肯定不希望此请求发生多次,所以通常也就会调用一下操作符:
shareReplay(),一般都会传入1

实例

下面来看一个实例,首先定义几个变量供下面使用:

@IBOutlet weak var searchText: UITextField!
@IBOutlet weak var resultCount: UILabel!
@IBOutlet weak var resultsTableView: UITableView!
let disposeBag = DisposeBag()

最初始版本:

let results = searchText.rx.text
    .throttle(0.3, scheduler: MainScheduler.instance)
    .distinctUntilChanged()
    .flatMapLatest { query in
        API.getSearchResults(query) 
    }

results
    .map { "\($0.count)" }
    .bind(to: resultCount.rx.text)
    .addDisposableTo(disposeBag)

results
    .bind(to: resultsTableView.rx.items(cellIdentifier: "Cell")) { (_, result, cell) in
        cell.textLabel?.text = "\(result)"
    }
    .addDisposableTo(disposeBag)

很简单,根据输入内容触发一个搜索请求,将结果绑定到两个UI控件上。

顺便说下:throttle操作符可以控制当输入框searchText中的内容超过0.3s无变化后再往下继续,避免用户还在快速输入时就触发下面的网络请求;

那么问题来了,上面代码有什么问题?

答案一目了然,上面说的三个条件都没满足:非主线程对UI控件赋值不安全;请求错误之后无法识别;网络请求发生两次。

怎么解决呢?

改进后版本:

let results = searchText.rx.text
    .throttle(0.3, MainScheduler.instance) 
    .distinctUntilChanged()
    .flatMapLatest { query in
          API.getSearchResults(query)
            .observeOn(MainScheduler.instance) // 1
          .catchErrorJustReturn([]) // 2
    }
    .shareReplay(1) // 3  

results
    .map { "\($0.count)" }
    .bind(to: resultCount.rx.text) 
    .disposed(by: disposeBag)

results
    .bind(to: resultsTableView.rx.items(cellIdentifier: "Cell")) { (_, result, cell) in  
        cell.textLabel?.text = "\(result)"
    }
    .disposed(by: disposeBag)

完全解决了上面代码中存在的问题,三个条件都满足:

  1. 应用observeOn()操作符使得之后会在主线程发送数据;
  2. 应用catchErrorJustReturn()操作符使得遇到错误时会返回一个空数组;
  3. 应用shareReplay()操作符共享网络请求得到的数据;

那么问题又来了,上面代码有什么问题(或者缺陷)?

答案是稍微有点麻烦。因为可以想像这三个操作符会大量重复使用。

那么有没有更好的方式呢?

答案当然是肯定的,也就是应用本小节的主角-drive

优秀的版本:

let results = searchText.rx.text.asDriver() 
    .throttle(0.3, scheduler: MainScheduler.instance)
    .distinctUntilChanged()
    .flatMapLatest { query in
         API.getSearchResults(query)
            .asDriver(onErrorJustReturn: []) 
    }

results
    .map { "\($0.count)" }
    .drive(resultCount.rx.text)               
    .disposed(by: disposeBag)                                                            

results
    .drive(resultsTableView.rx.items(cellIdentifier: "Cell")) { (_, result, cell) in  
        cell.textLabel?.text = "\(result)"
    }
    .disposed(by: disposeBag)

稍微解释一下几处不同的地方:

  1. 应用asDriver()操作符将ControlProperty转换成一个Driver,应用asDriver(onErrorJustReturn: [])使得请求结果通过一个Driver发送出去;
  2. 在原来应用bind操作符的地方全换成drive操作符,参数不变。

通过这样小小的调整,新的results就自动满足了上面说的三个条件,完美解决了最初的问题:

  • 不发送错误事件
  • 在主线程发送数据
  • 共享事件流(注意其实是shareReplayLatestWhileConnected()

可以想象,asDriver(onErrorJustReturn: [])内部实现大概是这样:

let safeObservable = xs
      .observeOn(MainScheduler.instance)       // observe events on main scheduler
      .catchErrorJustReturn(onErrorJustReturn) // can't error out
      .shareReplayLatestWhileConnected()       // side effects sharing
return Driver(raw: safeObservable)           // wrap it up

总结一下:凡是在UI层做绑定的地方将bind全部换成drive操作符没毛病。

最后再看一眼最开始出现过的图片。

1