const interval$ = Rx.Observable.interval(1000).take(3); interval$.subscribe({ next: value => console.log('Observer A get value: ' + value); }); setTimeout(() => { interval$.subscribe({ next: value => console.log('Observer B get value: ' + value); }); }, 1000);
以上代码运行后,控制台的输出结果:
Observer A get value: 0
Observer A get value: 1
Observer B get value: 0
Observer A get value: 2
Observer B get value: 1
Observer B get value: 2
通过以上示例,我们可以得出以下结论:
- Observable 对象可以被重复订阅
- Observable 对象每次被订阅后,都会重新执行
上面的示例,我们可以简单地认为两次调用普通的函数,具体参考以下代码:
function interval() { setInterval(() => console.log('..'), 1000); } interval(); setTimeout(() => { interval(); }, 1000);
Observable 对象的默认行为,适用于大部分场景。但有些时候,我们会希望在第二次订阅的时候,不会从头开始接收 Observable 发出的值,而是从第一次订阅当前正在处理的值开始发送,我们把这种处理方式成为组播 (multicast),那我们要怎么实现呢 ?回想一下我们刚才介绍过观察者模式,你脑海中是不是已经想到方案了。没错,我们可以通过自定义 Subject 来实现上述功能。
自定义 Subject
Subject 类定义
class Subject { constructor() { this.observers = []; } addObserver(observer) { this.observers.push(observer); } next(value) { this.observers.forEach(o => o.next(value)); } error(error){ this.observers.forEach(o => o.error(error)); } complete() { this.observers.forEach(o => o.complete()); } }
使用示例
const interval$ = Rx.Observable.interval(1000).take(3); let subject = new Subject(); let observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.addObserver(observerA); // 添加观察者A interval$.subscribe(subject); // 订阅interval$对象 setTimeout(() => { subject.addObserver(observerB); // 添加观察者B }, 1000);
以上代码运行后,控制台的输出结果:
Observer A get value: 0
Observer A get value: 1
Observer B get value: 1
Observer A get value: 2