RxJS v6 学习指南 (11)

BehaviorSubject 需要在实例化时给定一个初始值,如果没有默认是 undefined,每次订阅时都会发出最新的状态,即使已经错过数据的发送时间。

const observerA = { next: x => console.log('Observer A: ' + x) } const observerB = { next: x => console.log('Observer B: ' + x) } const subject = new BehaviorSubject(0) subject.subscribe(observerA) // Observer A: 0 subject.next(1) // Observer A: 1 subject.next(2) // Observer A: 2 subject.next(3) // Observer A: 3 setTimeout(() => { subject.subscribe(observerB) // Observer B: 3 }, 500)

observerB 已经错过流数据的发送时间,但是订阅时也能获取到最新数据 3。

BehaviorSubject 有点类似于状态,一开始可以提供初始状态,之后订阅都可以获取最新的状态。

2)ReplaySubject

ReplaySubject 表示重放,在新的观察者订阅时重新发送原来的数据,可以通过参数指定重放最后几个数据。

const observerA = { next: x => console.log('Observer A: ' + x) } const observerB = { next: x => console.log('Observer B: ' + x) } const subject = new ReplaySubject(2) // 重放最后两个 subject.subscribe(observerA) subject.next(1) // Observer A: 1 subject.next(2) // Observer A: 2 subject.next(3) // Observer A: 3 subject.complete() setTimeout(() => { subject.subscribe(observerB) // Observer B: 2 // Observer B: 3 }, 500)

这里我们可以看到,即使 subject 完结后再去订阅依然可以重放最后两个数据。

ReplaySubject(1) 和前面的 BehaviorSubject 是不一样的,首先后者可以提供默认数据,而前者不行,其次前者在 subject 终结后再去订阅依然可以得到最近发出的数据而后者不行。

3)AsyncSubject

AsyncSubject 有点类似 operator last,会在 subject 完结后送出最后一个值。

const subject = new AsyncSubject() subject.subscribe(observerA) subject.next(1) subject.next(2) subject.next(3) subject.complete() // Observer A: 3 setTimeout(() => { subject.subscribe(observerB) // Observer B: 3 }, 500)

observerA 即使早就订阅了,但是并不会响应前面的 next,完结后才接收到最后一个值 3。

多播操作符

前面我们写的 Subject 需要去订阅源数据流和被观察者订阅,写起来比较繁琐,我们可以借助操作符来实现。

1)multicast

使用方式如下,接收一个 subject 或者 subject factory。这个操作符返回了一个 connectable 的 Observable。等到执行 connect() 才会用真的 subject 订阅 source,并开始发送数据,如果没有 connect,Observable 是不会执行的。

const source = interval(1000).pipe( map(x => Math.floor(Math.random() * 10)), take(3), multicast(new Subject) ) const observerA = { next: x => console.log('Observer A: ' + x), error: null, complete: () => console.log('Observer A completed') } const observerB = { next: x => console.log('Observer B: ' + x), error: null, complete: () => console.log('Observer B completed') } source.subscribe(observerA) // subject.subscribe(observerA) source.connect() // source.subscribe(subject) setTimeout(() => { source.subscribe(observerB) // subject.subscribe(observerB) }, 1000)

2)refCount

上面使用了 multicast,但是还是有些麻烦,还需要去手动 connect。这时我们可以再搭配 refCount 操作符创建只要有订阅就会自动 connect 的 Observable。只需要去掉 connect 方法调用,在 multicast 后面再加一个 refCount 操作符。

multicast(new Subject), refCount()

refCount 其实就是自动计数的意思,当 Observer 数量大于 1 时,subject 订阅上游数据流,减少为 0 时退订上游数据流。

3)multicast selector 参数

multicast 第一个参数除了是一个 subject,还可以是一个 subject factory,即返回 subject 的函数。这时使用了不同的中间人,每个观察者订阅时都重新生产数据,适用于退订了上游之后再次订阅的场景。

multicast 还可以接收可选的第二个参数,称为 selector 参数。它可以使用上游数据流任意多次,而不会重复订阅上游的数据。当使用了这个参数时,multicast 不会返回 connectable Observable,而是这个参数(回调函数)返回的 Observable。selecetor 回调函数有一个参数,通常叫做 shared,即 multicast 第一个参数所代表的 subject 对象。

const selector = shared => { return shared.pipe(concat(of('done'))) } const source = interval(1000).pipe( take(3), multicast(new Subject, selector) ) const observerA = { next: x => console.log('Observer A: ' + x), error: null, complete: () => console.log('Observer A completed') } const observerB = { next: x => console.log('Observer B: ' + x), error: null, complete: () => console.log('Observer B completed') } source.subscribe(observerA) setTimeout(() => { source.subscribe(observerB) }, 5000) // Observer A: 0 // Observer A: 1 // Observer A: 2 // Observer A: done // Observer A completed // Observer B: done // Observer B: completed

observerB 订阅时会调用 selector 函数,subject 即shared 已经完结,但是 concat 依然会在这个 Observable 后面加上 'done'。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdzpj.html