RxJS v6 学习指南 (12)

可以利用 selector 处理 “三角关系”的数据流,如有一个 tick$ 数据流,对其进行 delay(500) 操作后的下游 delayTick$, 一个由它们合并得到的 mergeTick$,这时就形成了三角关系。delayTick$ 和 mergeTick$ 都订阅了 tick$。

const tick$ = interval(1000).pipe( take(1), tap(x => console.log('source: ' + x)) ) const delayTick$ = tick$.pipe( delay(500) ) const mergeTick$ = merge(tick$, delayTick$).subscribe(x => console.log('observer: ' + x)) // source: 0 // observer: 0 // source: 0 // observer: 0

从上面的结果我们可以验证,tick$ 被订阅了两次。

我们可以使用 selector 函数来使其只订阅一次,将上面的过程移到 selector 函数内即可。

const source$ = interval(1000).pipe( take(1), tap(x => console.log('source: ' + x)) ) const result$ = source$.pipe( multicast(new Subject(), shared => { const tick$ = shared const delayTick$ = tick$.pipe(delay(500)) const mergeTick$ = merge(tick$, delayTick$) return mergeTick$ }) ) result$.subscribe(x => console.log('observer: ' + x))

这时只会输出一次 'source: 0'。

4)publish

publish 是 multicast 的一种简写方式,效果等同于如下:

function publish (selector) { if (selector) { return multicast(() => new Subject(), selector) } else { return multicast(new Subject()) } }

有上一节说到的 selector 函数时,等价于:

multicast(() => new Subject(), selector)

没有时,等价于:

multicast(new Subject())

5)share

share 是 multicast 和 refCount 的简写,share() 等同于在 pipe 中先调用了 multicast(() => new Subject()),再调用了 refCount()。

const source = interval(1000).pipe( take(3), share() ) const observerA = { next: x => console.log('Observer A: ' + x), error: null, complete: () => console.log('Observer A completed') } const observerB = { next: x => console.log('Observer B: ' + x), error: null, complete: () => console.log('Observer B completed') } source.subscribe(observerA) setTimeout(() => { source.subscribe(observerB) }, 5000) // Observer A: 0 // Observer A: 1 // Observer A: 2 // Observer A completed // Observer B: 0 // Observer B: 1 // Observer B: 2 // Observer B completed

由于 share 是调用了 subject 工厂函数,而不是一个 subject 对象,因此 observerB 订阅时可以重新获取数据。

6)publishLast、publishBehavior、publishReplay

同前面的 publish,只不过使用的不是普通 Subject,而是对应的 AsyncSubject、BehaviorSubject、ReplaySubject。

Scheduler

Scheduler(调度器)用于控制数据流中数据的推送节奏。

import { range, asapScheduler } from 'rxjs' const source$ = range(1, 3, asapScheduler) console.log('before subscribe') source$.subscribe(x => console.log(x)) console.log('subscribed')

上面的代码,如果去掉 asapScheduler 参数,因为 range 是同步的,会先输出 1, 2, 3,再输出 'subscribed',但是加了以后就变成 先输出 'subscribed',改变了原来数据产生的方式。asap 是 as soon as possible 的缩写,同步任务完成后就会马上执行。

Scheduler 拥有一个虚拟时钟,如 interval 创建的数据流每隔一段时间要发出数据,由 Scheduler 提供时间来判断是否到了发送数据的时间。

Scheduler 实例

undefined/null:不指定 Scheduler,代表同步执行的 Scheduler

asap:尽快执行的 Scheduler

async:利用 setInterval 实现的 Scheduler

queue:利用队列实现的 Scheduler,用于迭代一个的大的集合的场景。

animationFrame:用于动画的 Scheduler

asap 会尽量使用 micro task,而 async 会使用 macro task。

相关操作符

一些创建数据流的方法可以提供 Scheduler 参数,合并类操作符如 merge 也可以,在创建数据流后我们也可以使用操作符,使得产生的下游 Observable 推送数据的节奏由指定的 Scheduler 来控制。这个操作符就是 observeOn。

const tick$ = interval(10) // Intervals are scheduled with async scheduler by default... tick$.pipe( observeOn(animationFrameScheduler) // but we will observe on animationFrame scheduler to ensure smooth animation. ) .subscribe(val => { someDiv.style.height = val + 'px' })

本来每 10 ms 就会发送一个数据,修改 Scheduler 为 animationFrame 后只有浏览器重绘才会发送数据更新样式。

我们还可以通过操作符 subscribeOn 控制订阅的时机。

const source$ = new Observable(observer => { console.log('on subscribe') observer.next(1) observer.next(2) observer.next(3) return () => { console.log('on unsubscribe') } }) const tweaked$ = source$.pipe(subscribeOn(asapScheduler)) console.log('before subscribe') tweaked$.subscribe(x => console.log(x)) console.log('subscribed') // before subscribe // subscribed // on subscribe // 1 // 2 // 3

通过 subscribeOn(asapScheduler),我们把订阅时间推迟到尽快执行。

TestScheduler

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdzpj.html