Observer B get value: 2
Observer A complete!
Observer B complete!
通过自定义 Subject,我们实现了前面提到的功能。接下来我们进入正题 - RxJS Subject。
RxJS Subject
首先我们通过 RxJS Subject 来重写一下上面的示例:
const interval$ = Rx.Observable.interval(1000).take(3);
let subject = new Rx.Subject();
let observerA = {
next: value => console.log('Observer A get value: ' + value),
error: error => console.log('Observer A error: ' + error),
complete: () => console.log('Observer A complete!')
};
var observerB = {
next: value => console.log('Observer B get value: ' + value),
error: error => console.log('Observer B error: ' + error),
complete: () => console.log('Observer B complete!')
};
subject.subscribe(observerA); // 添加观察者A
interval$.subscribe(subject); // 订阅interval$对象
setTimeout(() => {
subject.subscribe(observerB); // 添加观察者B
}, 1000);
RxJS Subject 源码片段
/**
* Suject继承于Observable
*/
export class Subject extends Observable {
constructor() {
super();
this.observers = []; // 观察者列表
this.closed = false;
this.isStopped = false;
this.hasError = false;
this.thrownError = null;
}
next(value) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
if (!this.isStopped) {
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) { // 循环调用观察者next方法,通知观察者
copy[i].next(value);
}
}
}
error(err) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
this.hasError = true;
this.thrownError = err;
this.isStopped = true;
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) { // 循环调用观察者error方法
copy[i].error(err);
}
this.observers.length = 0;
}
complete() {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
this.isStopped = true;
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) { // 循环调用观察者complete方法
copy[i].complete();
}
this.observers.length = 0; // 清空内部观察者列表
}
}
通过 RxJS Subject 示例和源码片段,对于 Subject 我们可以得出以下结论:
- Subject 既是 Observable 对象,又是 Observer 对象
- 当有新消息时,Subject 会对内部的 observers 列表进行组播 (multicast)
Angular 2 RxJS Subject 应用
在 Angular 2 中,我们可以利用 RxJS Subject 来实现组件通信,具体示例如下:
