RxJS v6 学习指南 (10)

先思考一下下面的例子结果是什么?

const source$ = interval(1000).pipe( take(3) ) source$.subscribe(x => console.log('Observer 1: ' + x)) setTimeout(() => { source$.subscribe(x => console.log('Observer 2: ' + x)) }, 1000)

你可能会以为 Observer 2 一秒后才订阅,错过了数据 0,因此只会输出 1 和 2,但实际上会先输出 0。为什么如此呢?这就涉及到对已错过数据的两种处理策略。

错过的就让它过去,只要订阅之后生产的数据就好

不能错过,订阅之前生产的数据也要

第一种策略类似于直播,第二种和点播相似。使用第一种策略的 Observable 叫做 Cold Observable,因为每次都要重新生产数据,是 “冷”的,需要重新发动。第二种,因为一直在生产数据,只要使用后面的数据就可以了,所以叫 Hot Observable。

RxJS 中如 interval、range 这些方法产生的 Observable 都是 Cold Observable,产生 Hot Observable 的是由 Promise、Event 这些转化而来的 Observable,它们的数据源都在外部,和 Observer 无关。

前面我们提到 Observable 都是 lazy evaluation 的,数据管道内的逻辑只有订阅后才会执行,但是 Cold Observable 相对更 lazy 一些。Cold Observable 如果没有订阅者连数据都不会产生,对于 Hot Observable,数据仍会产生,但是不会进入管道处理。

Hot Observable 是多播,对于 Cold Observable,每次订阅都重新生产了一份数据流,所以不是多播。下面的例子更加明显,两个订阅者有很大的概率会接收到不同的数据。

const source$ = interval(1000).pipe( map(x => Math.floor(Math.random() * 10)), take(3) ) source$.subscribe(x => console.log('Observer 1: ' + x)) setTimeout(() => { source$.subscribe(x => console.log('Observer 2: ' + x)) }, 1000)

如果想要实现多播,就要使用 RxJS 中 Subject。

Subject

为了防止每次订阅都重新生产一份数据流,我们可以使用中间人,让这个中间人去订阅源数据流,观察者都去订阅这个中间人。这个中间人能去订阅数据流,所以是个 Observer,又能被观察者订阅,所以也是 Observable。我们可以自己实现一个这样的中间人:

const subject = { observers: [], subscribe: function (observer) { this.observers.push(observer) }, next: function (value) { this.observers.forEach(o => o.next(value)) }, error: function (error) { this.observers.forEach(o => o.error(error)) }, complete: function () { this.observers.forEach(o => o.complete()) } }

这个 subject 拥有 Observer 的 next、error、complete 方法,每次被观察者订阅时都会在内部保存这个观察者。当接收到源数据流的数据时,会把数据发送给每一个观察者。

const source$ = interval(1000).pipe( map(x => Math.floor(Math.random() * 10)), take(3) ) const observerA = { next: x => console.log('Observer A: ' + x), error: null, complete: () => console.log('Observer A completed') } const observerB = { next: x => console.log('Observer B: ' + x), error: null, complete: () => console.log('Observer B completed') } source$.subscribe(subject) subject.subscribe(observerA) setTimeout(() => { subject.subscribe(observerB) }, 1000)

这时我们发现两个观察者接收到的是同一份数据,ObserverB 由于延迟一秒订阅,所以少接收到一个数据。将我们自己实现的 subject 换成 RxJS 中的 Subject,效果相同:

import { Subject } from 'rxjs' const subject = new Subject()

从上面可以看到,Subject 和 Observable 有一个很大的不同:它内部保存有一个观察者列表。

前面的 subject 是在源数据流发出值时调用 next 方法,向订阅的观察者发送这个值,我们也可以手动调用 subject 的next 方法送出值:

const observerA = { next: x => console.log('Observer A: ' + x) } const observerB = { next: x => console.log('Observer B: ' + x) } const subject = new Subject() subject.subscribe(observerA) setTimeout(() => { subject.subscribe(observerB) }, 500) subject.next(1) setTimeout(() => { subject.next(2) }, 1000)

总结一下,Subject 既是 Observable 又是 Observer,它会对内部的 observers 清单进行组播(multicast)。

Subject 的错误处理

在 RxJS 5 中,如果 Subject 的某个下游数据流产生了错误异常,而又没有被 Observer 处理,那这个 Subject 的其他 Observer 都会失败。但是在 RxJS 6 中不会如此。

在 v6 的这个例子 中,ObserverA 没有对错误进行处理,但是并不影响 ObserverB,而在 v5 这个demo中因为 ObserverA 没有对错误进行处理,使得 ObserverB 终止了。很明显 v6 的这种处理更符合直觉。

BehaviorSubject、ReplaySubject、AsyncSubject

1)BehaviorSubject

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdzpj.html