Observer B get value: 2
Observer A complete!
Observer B complete!
通过自定义 Subject,我们实现了前面提到的功能。接下来我们进入正题 - RxJS Subject。
RxJS Subject
首先我们通过 RxJS Subject 来重写一下上面的示例:
const interval$ = Rx.Observable.interval(1000).take(3); let subject = new Rx.Subject(); let observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); // 添加观察者A interval$.subscribe(subject); // 订阅interval$对象 setTimeout(() => { subject.subscribe(observerB); // 添加观察者B }, 1000);
RxJS Subject 源码片段
/** * Suject继承于Observable */ export class Subject extends Observable { constructor() { super(); this.observers = []; // 观察者列表 this.closed = false; this.isStopped = false; this.hasError = false; this.thrownError = null; } next(value) { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者 copy[i].next(value); } } } error(err) { if (this.closed) { throw new ObjectUnsubscribedError(); } this.hasError = true; this.thrownError = err; this.isStopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循环调用观察者error方法 copy[i].error(err); } this.observers.length = 0; } complete() { if (this.closed) { throw new ObjectUnsubscribedError(); } this.isStopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循环调用观察者complete方法 copy[i].complete(); } this.observers.length = 0; // 清空内部观察者列表 } }
通过 RxJS Subject 示例和源码片段,对于 Subject 我们可以得出以下结论:
- Subject 既是 Observable 对象,又是 Observer 对象
- 当有新消息时,Subject 会对内部的 observers 列表进行组播 (multicast)
Angular 2 RxJS Subject 应用
在 Angular 2 中,我们可以利用 RxJS Subject 来实现组件通信,具体示例如下: