代码很多,先展示下完整代码;在下一个说明处会进行简化
abstract class _RxImpl<T> extends RxNotifier<T> with RxObjectMixin<T> { _RxImpl(T initial) { _value = initial; } void addError(Object error, [StackTrace? stackTrace]) { subject.addError(error, stackTrace); } Stream<R> map<R>(R mapper(T? data)) => stream.map(mapper); void update(void fn(T? val)) { fn(_value); subject.add(_value); } void trigger(T v) { var firstRebuild = this.firstRebuild; value = v; if (!firstRebuild) { subject.add(v); } } } class RxNotifier<T> = RxInterface<T> with NotifyManager<T>; mixin NotifyManager<T> { GetStream<T> subject = GetStream<T>(); final _subscriptions = <GetStream, List<StreamSubscription>>{}; bool get canUpdate => _subscriptions.isNotEmpty; void addListener(GetStream<T> rxGetx) { if (!_subscriptions.containsKey(rxGetx)) { final subs = rxGetx.listen((data) { if (!subject.isClosed) subject.add(data); }); final listSubscriptions = _subscriptions[rxGetx] ??= <StreamSubscription>[]; listSubscriptions.add(subs); } } StreamSubscription<T> listen( void Function(T) onData, { Function? onError, void Function()? onDone, bool? cancelOnError, }) => subject.listen( onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError ?? false, ); void close() { _subscriptions.forEach((getStream, _subscriptions) { for (final subscription in _subscriptions) { subscription.cancel(); } }); _subscriptions.clear(); subject.close(); } } mixin RxObjectMixin<T> on NotifyManager<T> { late T _value; void refresh() { subject.add(value); } T call([T? v]) { if (v != null) { value = v; } return value; } bool firstRebuild = true; String get string => value.toString(); @override String toString() => value.toString(); dynamic toJson() => value; @override bool operator ==(dynamic o) { if (o is T) return value == o; if (o is RxObjectMixin<T>) return value == o.value; return false; } @override int get hashCode => _value.hashCode; set value(T val) { if (subject.isClosed) return; if (_value == val && !firstRebuild) return; firstRebuild = false; _value = val; subject.add(_value); } T get value { if (RxInterface.proxy != null) { RxInterface.proxy!.addListener(subject); } return _value; } Stream<T?> get stream => subject.stream; void bindStream(Stream<T> stream) { final listSubscriptions = _subscriptions[subject] ??= <StreamSubscription>[]; listSubscriptions.add(stream.listen((va) => value = va)); } }简化 _RxImpl,上面内容太多了,我这地方简化下,把需要关注的内容展示出来:此处有几个需要重点关注的点
RxInt是一个内置callback的数据类型(GetStream)
RxInt的value变量改变的时候(set value),会触发subject.add(_value),内部逻辑是自动刷新操作
获取RxInt的value变量的时候(get value),会有一个添加监听的操作,这个灰常重要!
abstract class _RxImpl<T> extends RxNotifier<T> with RxObjectMixin<T> { void update(void fn(T? val)) { fn(_value); subject.add(_value); } } class RxNotifier<T> = RxInterface<T> with NotifyManager<T>; mixin NotifyManager<T> { GetStream<T> subject = GetStream<T>(); final _subscriptions = <GetStream, List<StreamSubscription>>{}; bool get canUpdate => _subscriptions.isNotEmpty; void addListener(GetStream<T> rxGetx) { if (!_subscriptions.containsKey(rxGetx)) { final subs = rxGetx.listen((data) { if (!subject.isClosed) subject.add(data); }); final listSubscriptions = _subscriptions[rxGetx] ??= <StreamSubscription>[]; listSubscriptions.add(subs); } } } mixin RxObjectMixin<T> on NotifyManager<T> { late T _value; void refresh() { subject.add(value); } set value(T val) { if (subject.isClosed) return; if (_value == val && !firstRebuild) return; firstRebuild = false; _value = val; subject.add(_value); } T get value { if (RxInterface.proxy != null) { RxInterface.proxy!.addListener(subject); } return _value; } }为啥GetStream的add会有刷新操作:删了很多代码,保留了重点代码
调用add方法时候,会调用 _notifyData 方法
_notifyData 方法中,会遍历 _onData 列表,根据条件会执行其泛型的 _data 的方法
我猜,_data 中的方法体,十有八九在某个地方肯定添加了 setState()
class GetStream<T> { GetStream({this.onListen, this.onPause, this.onResume, this.onCancel}); List<LightSubscription<T>>? _onData = <LightSubscription<T>>[]; FutureOr<void> addSubscription(LightSubscription<T> subs) async { if (!_isBusy!) { return _onData!.add(subs); } else { await Future.delayed(Duration.zero); return _onData!.add(subs); } } void _notifyData(T data) { _isBusy = true; for (final item in _onData!) { if (!item.isPaused) { item._data?.call(data); } } _isBusy = false; } T? _value; T? get value => _value; void add(T event) { assert(!isClosed, 'You cannot add event to closed Stream'); _value = event; _notifyData(event); } } typedef OnData<T> = void Function(T data); class LightSubscription<T> extends StreamSubscription<T> { OnData<T>? _data; }图示,先来看下,Rx类具有的功能
get value 添加监听
set value 执行已添加的监听
Obx刷新机制Obx最大的特殊之处,应该就是使用它的时候,不需要加泛型且能自动刷新,这是怎么做到的呢?
Obx:代码并不多,但是皆有妙用
Obx继承ObxWidget,ObxWidget实际上也是一个StatefulWidget