[iOS]Rx系列-RxSwift入门知识整理

Author Avatar
与狼同行 11月 29, 2016

一直特别想系统学习一下Rx系列,体会它的思想,于是先看看文档,之后也会在github上挂一个实际操练的小项目

Observable

Observable 也可以叫做队列,Observable是观察者模式中被观察的对象,相当于一个事件序列(Sequence),会向订阅者发送新产生的事件信息。
每一个 Observable 队列只是一个队列而已。它跟Swift的 Sequence 类型的队列有什么不同呢? 其关键在于RxSwift的Observable可以异步的接受元素。这也是RxSwift的实现核心,然后基于此我们开始衍生讨论:

  1. Observable(ObservableType)等同于 Sequence
  2. ObservableType.subscribe 等同于 Sequence.makeIterator 迭代方法
  3. Observer (callback) 需要被传给 ObservableType.subscribe 方法来接收队列元素而不是在返回的迭代器上调用 next() 。
    如果我们不使用RxSwift的话,我们的代码将会充斥着状态机,现在我们需要把那些状态给抽象出来。
    列表和队列也许是开发者最早接触的数据结构,来看一下下面的数字:

    –1–2–3–4–5–6–| // 正常结束
    再来看一下字符串队列:
    –a–b–a–a–a—d—X // 异常结束
    有些队列是有限的,有些则是无限的,比如说触发按钮的队列:
    —tap-tap——-tap—>
    这种表达方式叫做大理石图表。应该Rx系列中才有特殊名词,可以去官网查看具体大理石图。

我们来看一下订阅事件信息的种类分为三种:

  1. .Next(value)表示新的事件数据。
  2. .Completed表示事件序列的完结。
  3. .Error同样表示完结,但是代表异常导致的完结。

队列在Rx被描绘为一种push接口。(即是上一节说的push方式,可以称为回调)

1
2
3
4
5
6
7
8
9
10
11
12
13
enum Event<Element> {
case next(Element) // next element of a sequence
case error(Swift.Error) // sequence failed with error
case completed // sequence terminated successfully
}
class Observable<Element> {
func subscribe(_ observer: Observer<Element>) -> Disposable
}
protocol ObserverType {
func on(_ event: Event<Element>)
}

如果观察者队列接受到 2 和 3 ,那么队列将不会接受其他订阅事件。同时内部用来计算队列元素的资源就一起释放掉了–这种理解为自然结束
如果你想关闭队列资源的产生并且立即把队列元素都给释放了,你需要在返回的迭代器方法后调用 dispose 方法。–理解为手动关闭
如果一个队列会在有限时间内结束,即使不调用 dispose 或者使用 addDisposableTo(disposeBag) 也不会引发永久的内存泄漏。然而直到这些资源被使用到完成触发 completed ,队列会一直产生事件元素或者是产生错误。
如果一个队列因此某些原因没有结束,资源将会永久的被分配除非 dispose 被你手动的去调用。
使用 dispose bag 或者 takeUntil操作符 是一种很强大的方式来确保资源被清除,我们建议使用它们即使队列会在有限的时间内结束。

如果你好奇为什么Swift.Error不是泛型的,之后会进行解释。

Disposing

还有一种额外的方式来结束观察队列。当我们正在操作队列时,假如我们要释放所有我们用来计算即将到来元素的资源,我们可以在一个订阅时使用dispose方法。
这里有一个例子:

1
2
3
4
5
6
7
let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
.subscribe { event in
print(event)
}
Thread.sleep(forTimeInterval: 2.0)
subscription.dispose()

注意一下你其实不应该手动调用dispose方法,这只是一个演示的例子。手动调用dispose通常错误的代码习惯。更好的方式应该是将订阅回收掉,我们可以使用 DisposeBag, takeUntil或者其他的技巧
如果代码再调用dispose后还会打印东西吗?答案是不好说:

  1. 如果当前是在串行线程队列,在Rx中叫做调度者,比如说主线程调度者,那么答案是不会。
  2. 如果在并行线程队列中,那么答案是仍然会。
    因为结果会是一个产生元素的线程和一个回收订阅的线程会并行执行。
    以防那种情况出现我们需要这样写:
1
2
3
4
5
6
7
8
9
let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
.observeOn(MainScheduler.instance)
.subscribe { event in
print(event)
}
// ....
subscription.dispose() // called from main thread

这样就防止了在 dispose 调用后,什么都不会打印了。

Dispose Bag

Dispose Bag被用于返回类似自动引用计数的行为对于RX来说。先把分配的资源统一丢到袋子里 (有点像是 autoreleasepool ) ,然后当 disposeBag 销毁的时候就一起销毁这些资源。在代码里可以看到,只要有 subscribe 的基本在最后都会兜上一个 .addDisposableTo(disposeBag) 用来处理资源自动销毁的问题。
当DisposeBag被析构时,它会对每一个增加进来的可回收对象调用 dispose。
它没有dispose 方法,因此它不允许去显式地去调用dispose方法。如果有必要马上清理资源,我们只需要创建一个新的bag。

1
self.disposeBag = DisposeBag()

它会清理旧的引用然后引起资源回收。
如果需要显式的手动清理资源,那么需要使用 CompositeDisposable 。

Take until

另外一个方法去自动清理订阅是使用takeUntil操作符。

1
2
3
4
5
sequence
.takeUntil(self.rx.deallocated)
.subscribe {
print($0)
}

隐式 Observable 防卫

无论哪一个线程生成元素,如果生成元素并传给观察者队列,通过调用 observer.on(.next(nextElement)) 那么直到这个 observe.on这个操作完成,队列将不会传入下一个元素,即线程安全。
如果 .next 元素没有完成操作,那么是不会调用 .completed 或者 .error

1
2
3
4
5
6
someObservable
.subscribe { (e: Event<Element>) in
print("Event processing started")
// processing
print("Event processing ended")
}

它将总是打印:

1
2
3
4
5
6
Event processing started
Event processing ended
Event processing started
Event processing ended
Event processing started
Event processing ended

而不会打印出:

1
2
3
4
Event processing started
Event processing started
Event processing ended
Event processing ended

创建你自己的观察队列

如果你仅进创建一个方法返回Observable队列,其实队列是不是生成的,也不会任何附带效果。Observable仅仅是定义队列如何生成和哪些参数用于生成元素。所以队列生成只有当 subscribe迭代方法调用时 才会开始执行。

1
2
3
4
5
6
7
8
9
10
func searchWikipedia(searchTerm: String) -> Observable<Results> {}
let searchForMe = searchWikipedia("me")
// no requests are performed, no work is being done, no URL requests were fired
let cancel = searchForMe
// sequence generation starts now, URL requests are fired
.subscribe(onNext: { results in
print(results)
})

有很多方式来创造自己的队列,也许最方便的函数是使用 create 函数。
现在我们写一个函数,它会生成队列并且返回一个订阅元素,这个函数叫做 just 。

1
2
3
4
5
6
7
8
9
10
11
12
func myJust<E>(element: E) -> Observable<E> {
return Observable.create { observer in
observer.on(.next(element))
observer.on(.completed)
return Disposables.create()
}
}
myJust(0)
.subscribe(onNext: { n in
print(n)
})

它会打印 0
create函数只是用来帮助更轻松去用闭包来实现 subscribe 方法。比如 subscribe 方法它只有一个参数,observer,然后返回一个可回收对象。
这种方式产生的队列是同步的,他将会在 subscribe 调用并返回可回收的订阅前 生成元素并且终止。
当生成同步队列时,通常的可回收对象会是一个单例– NopDisposable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func myFrom<E>(sequence: [E]) -> Observable<E> {
return Observable.create { observer in
for element in sequence {
observer.on(.next(element))
}
observer.on(.completed)
return Disposables.create()
}
}
let stringCounter = myFrom(["first", "second"])
print("Started ----")
// first time
stringCounter
.subscribe(onNext: { n in
print(n)
})
print("----")
// again
stringCounter
.subscribe(onNext: { n in
print(n)
})
print("Ended ----")

它会打印

1
2
3
4
5
6
7
Started ----
first
second
----
first
second
Ended ----

共享订阅和 shareReplay 方法

如果你想让多个观察者共享同一个订阅里的事件呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
let counter = myInterval(0.1)
.shareReplay(1)
print("Started ----")
let subscription1 = counter
.subscribe(onNext: { n in
print("First \(n)")
})
let subscription2 = counter
.subscribe(onNext: { n in
print("Second \(n)")
})
Thread.sleep(forTimeInterval: 0.5)
subscription1.dispose()
Thread.sleep(forTimeInterval: 0.5)
subscription2.dispose()
print("Ended ----")

这样会打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Started ----
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
First 5
Second 5
Second 6
Second 7
Second 8
Second 9
Disposed
Ended ----

单元

单元是RxSwift中非常有用的概念,现在我们来系统的了解一下。
Swift是一个强类型语言,它可以很好的提升代码安全和稳定性,配合Rx就更简单了。
单元在 RxCocoa 被用到。
为什么要称为单元?在文档中原因被描述为其设计思想等同对物理单元概念
如果说 50m 是 数字+单位 ,那么我们的观察队列也是一样,只是队列还不够,队里只是一个值,它需要单位来表示意义。
数字有操作符 +, -, *, /.
而观察列表有操作符 map, filter, flatMap …
比如11 m / 0.5 s = …这个操作

  1. 首先把单位转化为纯数字然后做除法 11 / 0.5 = 22
  2. 然后计算单位 m / s
  3. 最后把结果组合起来 22 m / s
    而在Rx单位中,比如下面这句

    1
    2
    let d: Driver<Int> = Driver.just(11)
    driver.map { $0 / 0.5 } = ...
  4. 首先把Driver转化为一个观察队列(即物理单位映射的纯值),然后用map作用

    1
    let mapped = driver.asObservable().map { $0 / 0.5 }
  5. 将单位和组合起来就是结果

    1
    let result = Driver(mapped)

    (m, kg, s, A, K, cd, mol)这些物理单位都是这个正交的。
    而在RxCocoa许多观察者队列的属性也是正交的。

    • Can’t error out
  • Observe on main scheduler
  • Subscribe on main scheduler
  • Sharing side effects

物理学中派生单位有时会有一些特殊的名称,比如

1
2
3
N (Newton) = kg * m / s / s
C (Coulomb) = A * s
T (Tesla) = kg / A / s / s

而在Rx中派生单位也会有一些特殊的名称,比如

1
2
Driver = (can't error out) * (observe on main scheduler) * (sharing side effects)
ControlProperty = (sharing side effects) * (subscribe on main scheduler)

在物理学上对不同单位的转换是通过*和/来对数字进行变化。
而Rx中不同单位的转换则是依靠观察队列的方法来达到的。
比如:

1
2
3
4
Can't error out = catchError
Observe on main scheduler = observeOn(MainScheduler.instance)
Subscribe on main scheduler = subscribeOn(MainScheduler.instance)
Sharing side effects = share* (one of the `share` operators)

Driver单位

特点:

  1. 不抛出异常
  2. 在主线程中观察
  3. 共享效应(shareReplayLatestWhileConnected)

它是一个精密设计的单位,它的目的适用于提供简单的方式来对UI层使用响应式编程。
那么为什么它要被叫做司机呢?
它的设计目的是通过将序列模型化来驱动程序。
比如: 1. 通过CoreData来驱动UI层

  1. 通过其他UI元素的值绑定来驱动UI层
    就像操作系统的驱动一样,当一个队列出问题时,应用程序将会停止用户继续输入。
    而且有一点也很重要,UI元素和应用逻辑应该在主线程中被执行,因为它们通常不是现场安全的。
    另外用Driver打造的观察者队列是具有share语义的。
    比如说:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    let results = query.rx.text
    .throttle(0.3, scheduler: MainScheduler.instance)
    .flatMapLatest { query in
    fetchAutoCompleteItems(query)
    }
    results
    .map { "\($0.count)" }
    .bindTo(resultCount.rx.text)
    .addDisposableTo(disposeBag)
    results
    .bindTo(resultsTableView.rx.items(cellIdentifier: "Cell")) { (_, result, cell) in
    cell.textLabel?.text = "\(result)"
    }
    .addDisposableTo(disposeBag)

这段代码的目的:

  1. 调节用户输入频率
  2. 与服务端联系然后拽取下来一段最新的用户数据
  3. 返回结果绑定在两个UI元素上,label和tableview将会显示一系列的结果
    但是这段代码会有什么问题呢?
    如果fetchAutoCompleteItems队列发生错误,比如链接失败,它将会抛出异常,然后解绑UI元素,也不会在接受其他请求。
    如果fetchAutoCompleteItems在后台线程中返回结果,结果绑定到UI元素,将会发生致命的崩溃。
    如果结果绑定2个UI元素,就意味着同一个用户请求将会产生两个HTTp请求,这显然不合适
    更合理的版本应该是这样:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    let results = query.rx.text
    .throttle(0.3, scheduler: MainScheduler.instance)
    .flatMapLatest { query in
    fetchAutoCompleteItems(query)
    .observeOn(MainScheduler.instance) // results are returned on MainScheduler
    .catchErrorJustReturn([]) // in the worst case, errors are handled
    }
    .shareReplay(1) // HTTP requests are shared and results replayed
    // to all UI elements
    results
    .map { "\($0.count)" }
    .bindTo(resultCount.rx.text)
    .addDisposableTo(disposeBag)
    results
    .bindTo(resultTableView.rx.items(cellIdentifier: "Cell")) { (_, result, cell) in
    cell.textLabel?.text = "\(result)"
    }
    .addDisposableTo(disposeBag)

要确保这些需求可以在大型系统被正确处理是很有挑战性的,现在用简单的方式来使用编译器和单元来保证这些需求被满足。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let results = query.rx.text.asDriver() // This converts a normal sequence into a `Driver` sequence.
.throttle(0.3, scheduler: MainScheduler.instance)
.flatMapLatest { query in
fetchAutoCompleteItems(query)
.asDriver(onErrorJustReturn: []) // Builder just needs info about what to return in case of error.
}
results
.map { "\($0.count)" }
.drive(resultCount.rx.text) // If there is a `drive` method available instead of `bindTo`,
.addDisposableTo(disposeBag) // that means that the compiler has proven that all properties
// are satisfied.
results
.drive(resultTableView.rx.items(cellIdentifier: "Cell")) { (_, result, cell) in
cell.textLabel?.text = "\(result)"
}
.addDisposableTo(disposeBag)

代码看上去很清爽,背后发生了什么呢?
第一个asDriver方法将 ControlProperty单元转化为了 Driver单元。
Driver拥有 ControlProperty 的所有属性。
最后如果使用Driver,那么可以将 bind 用 driver来替代。

热观察和冷观察

观察队列什么时候开始输送队列观察结果呢?这取决于被观察者。如果是热观察队列一旦被创建,它就可以传输对象,任何后来订阅它的观察者也可以从队列中间开始观察。而冷观察队列它会一直等待直到观察者订阅它,它才开始传输项目,所以这样的观察可以保证从偷到尾的。
热观察:

  1. 是观察队列
  2. 无论有没有观察者订阅,它都会产生资源
  3. 变量/属性/常量,轻触位置,鼠标位置,UI控制属性,当前时间
  4. 经常包含 ~ N元素
  5. 队列元素被生产无论有没有观察者订阅
  6. 队列计算资源会经常被其他所有观察者分享
  7. 是有状态的
    冷观察:
  8. 是观察队列
  9. 不使用资源直到被观察者订阅
  10. 异步操作,HTTP链接,TCP链接,流
  11. 只包含一个元素
  12. 只会在有订阅时队列元素被生产
  13. 队列计算资源会被每一个观察者单独使用
  14. 是有状态的

Rx背后的数学问题

在观察者模式和生产者模式间之间拥有一定联系。它可以帮助我们从异步回调的世界过渡到同步队列变换的世界。
简单来说,枚举器和观察者模式都是队列,枚举器要定义队列的原因很简单,但观察者却要稍微复杂一些。
有一个简单的例子,你不需要懂数学知识。假设你在观察你鼠标的位置,过了一会到时间了,这些鼠标的位置会一个序列,很显然,这就是一个观察序列。
有两种简单的方式队列:

  1. Push – 观察者()
  2. Pull – Iterator / Enumerator / Generator

Variables

Variables 相当于被观察者的状态。不包含值就无法初始化。
变量V 会包裹住 Subject。特别的是它是一个 BehaviorSubject,但不像 BehaviorSubject,它只暴露value接口,因此 变量不会终结和失败。
它可以在订阅中广播它的当前值。
变量析构之后,它将会完成 .asObservable(). 的被观察队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
let variable = Variable(0)
print("Before first subscription ---")
_ = variable.asObservable()
.subscribe(onNext: { n in
print("First \(n)")
}, onCompleted: {
print("Completed 1")
})
print("Before send 1")
variable.value = 1
print("Before second subscription ---")
_ = variable.asObservable()
.subscribe(onNext: { n in
print("Second \(n)")
}, onCompleted: {
print("Completed 2")
})
variable.value = 2
print("End ---")

将会打印:

1
2
3
4
5
6
7
8
9
10
11
Before first subscription ---
First 0
Before send 1
First 1
Before second subscription ---
Second 1
First 2
Second 2
End ---
Completed 1
Completed 2

KVO

KVO是OC中的机制,这种机制不是类型安全的。但是却能解决很多问题。
在这个库中有两种方式来支持KVO

1
2
3
4
5
6
7
8
9
10
11
// KVO
extension Reactive where Base: NSObject {
public func observe<E>(type: E.Type, _ keyPath: String, options: NSKeyValueObservingOptions, retainSelf: Bool = true) -> Observable<E?> {}
}
#if !DISABLE_SWIZZLING
// KVO
extension Reactive where Base: NSObject {
public func observeWeakly<E>(type: E.Type, _ keyPath: String, options: NSKeyValueObservingOptions) -> Observable<E?> {}
}
#endif

下面是代码示例:

1
2
3
4
5
view
.rx.observe(CGRect.self, "frame")
.subscribe(onNext: { frame in
...
})

或者

1
2
3
4
5
view
.rx.observeWeakly(CGRect.self, "frame")
.subscribe(onNext: { frame in
...
})

rx.observe

rx.observe 是对KVO机制的包装,但却有场景使用的限制

  1. 它可以被用于观察从自身或者自身父节点开始的路径(retainSelf = false)
  2. 它可以用于观察子节点的路径(retainSelf = true)
  3. 路径必须依靠强属性来维持,而且在析构后如果不注销KVO观察者就会产生系统崩溃。
    例如:
    self.rx.observe(CGRect.self, “view.frame”, retainSelf: false)

rx.observeWeakly

相比之下它就要略慢一些了,如果有弱引用,必须处理对象析构。
rx.observe使用的场景它都可以使用,而且还有额外场景

  1. 因为它不持有观察对象,它可以被用于观察任何未知关系的对象。
  2. 它可以被用于观察弱属性
    比如:
    someSuspiciousViewController.rx.observeWeakly(Bool.self, “behavingOk”)

Subject

Subject可以看做是一种代理和桥梁。它既是订阅者又是订阅源,这意味着它既可以订阅其他Observable对象,同时又可以对它的订阅者们发送事件。

PublishSubject

PublishSubject会发送订阅者从订阅之后的事件序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
example("PublishSubject") {
let subject = PublishSubject<String>()
writeSequenceToConsole("1", sequence: subject)
subject.on(.Next("a"))
subject.on(.Next("b"))
writeSequenceToConsole("2", sequence: subject)
subject.on(.Next("c"))
subject.on(.Next("d"))
}
--- PublishSubject example ---
Subscription: 1, event: Next(a)
Subscription: 1, event: Next(b)
Subscription: 1, event: Next(c)
Subscription: 2, event: Next(c)
Subscription: 1, event: Next(d)
Subscription: 2, event: Next(d)

ReplaySubject

ReplaySubject在新的订阅对象订阅的时候会补发所有已经发送过的数据列,bufferSize是缓冲区的大小,决定了补发队列的最大值。如果bufferSize是1,那么新的订阅者出现的时候就会补发上一个事件,如果是2,则补两个,以此类推。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
example("ReplaySubject") {
let subject = ReplaySubject<String>.create(bufferSize: 1)
writeSequenceToConsole("1", sequence: subject)
subject.on(.Next("a"))
subject.on(.Next("b"))
writeSequenceToConsole("2", sequence: subject)
subject.on(.Next("c"))
subject.on(.Next("d"))
}
--- ReplaySubject example ---
Subscription: 1, event: Next(a)
Subscription: 1, event: Next(b)
Subscription: 2, event: Next(b) // 补了一个 b
Subscription: 1, event: Next(c)
Subscription: 2, event: Next(c)
Subscription: 1, event: Next(d)
Subscription: 2, event: Next(d)

BehaviorSubject

BehaviorSubject在新的订阅对象订阅的时候会发送最近发送的事件,如果没有则发送一个默认值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
example("BehaviorSubject") {
let subject = BehaviorSubject(value: "z")
writeSequenceToConsole("1", sequence: subject)
subject.on(.Next("a"))
subject.on(.Next("b"))
writeSequenceToConsole("2", sequence: subject)
subject.on(.Next("c"))
subject.on(.Completed)
}
--- BehaviorSubject example ---
Subscription: 1, event: Next(z)
Subscription: 1, event: Next(a)
Subscription: 1, event: Next(b)
Subscription: 2, event: Next(b)
Subscription: 1, event: Next(c)
Subscription: 2, event: Next(c)
Subscription: 1, event: Completed
Subscription: 2, event: Completed

Operator

之前提到过,相当与物理体系的 + - * /

map

map就是对每个元素都用函数做一次转换,挨个映射一遍。

1
2
3
4
5
6
7
8
9
10
11
12
13
example("map") {
let originalSequence = sequenceOf(1,2,3)
originalSequence
.map { $0 * 2 }
.subscribe { print($0) }
}
--- map example ---
Next(2)
Next(4)
Next(6)
Completed

flatMap

在 Rx 中,flatMap可以把一个序列转换成一组序列,然后再把这一组序列『拍扁』成一个序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
example("flatMap") {
let sequenceInt = sequenceOf(1, 2, 3)
let sequenceString = sequenceOf("A", "B", "--")
sequenceInt
.flatMap { int in
sequenceString
}
.subscribe {
print($0)
}
}
--- flatMap example ---
Next(A)
Next(B)
Next(--)
Next(A)
Next(B)
Next(--)
Next(A)
Next(B)
Next(--)
Completed

scan

scan 有点像 reduce ,它会把每次的运算结果累积起来,作为下一次运算的输入值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
example("scan") {
let sequenceToSum = sequenceOf(0, 1, 2, 3, 4, 5)
sequenceToSum
.scan(0) { acum, elem in
acum + elem
}
.subscribe {
print($0)
}
}
--- scan example ---
Next(0)
Next(1)
Next(3)
Next(6)
Next(10)
Next(15)
Completed

filter

filter除了上面的各种转换,我们还可以对序列进行过滤。filter只会让符合条件的元素通过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
example("filter") {
let subscription = sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
.filter {
$0 % 2 == 0
}
.subscribe {
print($0)
}
}
--- filter example ---
Next(0)
Next(2)
Next(4)
Next(6)
Next(8)
Completed

distinctUntilChanged

distinctUntilChanged 会废弃掉重复的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
example("distinctUntilChanged") {
let subscription = sequenceOf(1, 2, 3, 1, 1, 4)
.distinctUntilChanged()
.subscribe {
print($0)
}
}
--- distinctUntilChanged example ---
Next(1)
Next(2)
Next(3)
Next(1)
Next(4)
Completed

take

take只获取序列中的前 n 个事件,在满足数量之后会自动.Completed。

1
2
3
4
5
6
7
8
9
10
11
12
13
example("take") {
let subscription = sequenceOf(1, 2, 3, 4, 5, 6)
.take(3)
.subscribe {
print($0)
}
}
--- take example ---
Next(1)
Next(2)
Next(3)
Completed

Combining

这部分是关于序列的运算,可以将多个序列源进行组合拼装成一个新的事件序列。

startWith

startWith 会在队列开始之前插入一个事件元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
example("startWith") {
let subscription = sequenceOf(4, 5, 6)
.startWith(3)
.subscribe {
print($0)
}
}
--- startWith example ---
Next(3)
Next(4)
Next(5)
Next(6)
Completed

combineLatest

如果存在两条事件队列,需要同时监听,那么每当有新的事件发生的时候,combineLatest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
example("combineLatest 1") {
let intOb1 = PublishSubject<String>()
let intOb2 = PublishSubject<Int>()
combineLatest(intOb1, intOb2) {
"\($0) \($1)"
}
.subscribe {
print($0)
}
intOb1.on(.Next("A"))
intOb2.on(.Next(1))
intOb1.on(.Next("B"))
intOb2.on(.Next(2))
}
--- combineLatest 1 example ---
Next(A 1)
Next(B 1)
Next(B 2)

zip

zip 人如其名,就是压缩两条队列用的,不过它会等到两个队列的元素一一对应地凑齐了之后再合并。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
example("zip 1") {
let intOb1 = PublishSubject<String>()
let intOb2 = PublishSubject<Int>()
zip(intOb1, intOb2) {
"\($0) \($1)"
}
.subscribe {
print($0)
}
intOb1.on(.Next("A"))
intOb2.on(.Next(1))
intOb1.on(.Next("B"))
intOb1.on(.Next("C"))
intOb2.on(.Next(2))
}
--- zip 1 example ---
Next(A 1)
Next(B 2)

merge

merge就是 merge 啦,把两个队列按照顺序组合在一起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
example("merge 1") {
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
sequenceOf(subject1, subject2)
.merge()
.subscribeNext { int in
print(int)
}
subject1.on(.Next(1))
subject1.on(.Next(2))
subject2.on(.Next(3))
subject1.on(.Next(4))
subject2.on(.Next(5))
}
--- merge 1 example ---
1
2
3
4
5

switch

当你的事件序列是一个事件序列的序列 (Observable>) 的时候,(可以理解成二维序列?),可以使用switch将序列的序列平铺成一维,并且在出现新的序列的时候,自动切换到最新的那个序列上。和merge相似的是,它也是起到了将多个序列『拍平』成一条序列的作用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
example("switchLatest") {
let var1 = Variable(0)
let var2 = Variable(200)
// var3 is like an Observable<Observable<Int>>
let var3 = Variable(var1)
let d = var3
.switchLatest()
.subscribe {
print($0)
}
var1.value = 1
var1.value = 2
var1.value = 3
var1.value = 4
var3.value = var2
var2.value = 201
var1.value = 5
var3.value = var1
var2.value = 202
var1.value = 6
}
--- switchLatest example ---
Next(0)
Next(1)
Next(2)
Next(3)
Next(4)
Next(200)
Next(201)
Next(5)
Next(6)

catchError

catchError可以捕获异常事件,并且在后面无缝接上另一段事件序列,丝毫没有异常的痕迹。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
example("catchError 1") {
let sequenceThatFails = PublishSubject<Int>()
let recoverySequence = sequenceOf(100, 200)
sequenceThatFails
.catchError { error in
return recoverySequence
}
.subscribe {
print($0)
}
sequenceThatFails.on(.Next(1))
sequenceThatFails.on(.Next(2))
sequenceThatFails.on(.Error(NSError(domain: "Test", code: 0, userInfo: nil)))
}
--- catchError 1 example ---
Next(1)
Next(2)
Next(100)
Next(200)
Completed

retry

retry顾名思义,就是在出现异常的时候会再去从头订阅事件序列,妄图通过『从头再来』解决异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
example("retry") {
var count = 1 // bad practice, only for example purposes
let funnyLookingSequence: Observable<Int> = create { observer in
let error = NSError(domain: "Test", code: 0, userInfo: nil)
observer.on(.Next(0))
observer.on(.Next(1))
if count < 2 {
observer.on(.Error(error))
count++
}
observer.on(.Next(2))
observer.on(.Completed)
return NopDisposable.instance
}
funnyLookingSequence
.retry()
.subscribe {
print($0)
}
}
--- retry example ---
Next(0)
Next(1)
Next(0)
Next(1)
Next(2)
Completed

Utility

subscribe

subscribeNext

doOn

doOn 可以监听事件,并且在事件发生之前调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
example("doOn") {
let sequenceOfInts = PublishSubject<Int>()
sequenceOfInts
.doOn {
print("Intercepted event \($0)")
}
.subscribe {
print($0)
}
sequenceOfInts.on(.Next(1))
sequenceOfInts.on(.Completed)
}
--- doOn example ---
Intercepted event Next(1)
Next(1)
Intercepted event Completed
Completed

Conditional

takeUntil

takeUntil其实就是take,它会在终于等到那个事件之后触发.Completed事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
example("takeUntil") {
let originalSequence = PublishSubject<Int>()
let whenThisSendsNextWorldStops = PublishSubject<Int>()
originalSequence
.takeUntil(whenThisSendsNextWorldStops)
.subscribe {
print($0)
}
originalSequence.on(.Next(1))
originalSequence.on(.Next(2))
whenThisSendsNextWorldStops.on(.Next(1))
originalSequence.on(.Next(3))
}
--- takeUntil example ---
Next(1)
Next(2)
Completed

takeWhile

takeWhile则是可以通过状态语句判断是否继续take。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
example("takeWhile") {
let sequence = PublishSubject<Int>()
sequence
.takeWhile { int in
int < 2
}
.subscribe {
print($0)
}
sequence.on(.Next(1))
sequence.on(.Next(2))
sequence.on(.Next(3))
}
--- takeWhile example ---
Next(1)
Completed

Aggregate

我们可以对事件序列做一些集合运算。

concat

concat可以把多个事件序列合并起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
example("concat") {
let var1 = BehaviorSubject(value: 0)
let var2 = BehaviorSubject(value: 200)
// var3 is like an Observable<Observable<Int>>
let var3 = BehaviorSubject(value: var1)
let d = var3
.concat()
.subscribe {
print($0)
}
var1.on(.Next(1))
var1.on(.Next(2))
var3.on(.Next(var2))
var2.on(.Next(201))
var1.on(.Next(3))
var1.on(.Completed)
var2.on(.Next(202))
}
--- concat example ---
Next(0)
Next(1)
Next(2)
Next(3)
Next(201)
Next(202)

reduce

这里的reduce和CollectionType中的reduce是一个意思,都是指通过对一系列数据的运算最后生成一个结果。

1
2
3
4
5
6
7
8
9
10
11
example("reduce") {
sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
.reduce(0, +)
.subscribe {
print($0)
}
}
--- reduce example ---
Next(45)
Completed