[Abp vNext 源码分析] - 12. 后台作业与后台工作者

基于的 ABP vNext 版本:1.0.0

创作日期:2019 年 10 月 24 日晚

更新日期:暂无

ABP vNext 提供了后台工作者后台作业的支持,基本实现与原来的 ABP 框架类似,并且 ABP vNext 还提供了对 HangFire 和 RabbitMQ 的后台作业集成。开发人员在使用这些第三方库的时候,基本就是开箱即用,不需要做其他复杂的配置。

后台作业在系统开发的过程当中,是比较常用的功能。因为总是有一些长耗时的任务,而这些任务我们不是立即响应的,例如 Excel 文档导入、批量发送短信通知等。

后台工作者 的话,ABP vNext 的实现就是在 CLR 的 Timer 之上封装了一层,周期性地执行用户逻辑。ABP vNext 默认提供的 后台任务管理器,就是在后台工作者基础之上进行的封装。

涉及到后台任务、后台工作者的模块一共有 6 个,它们分别是:

Volo.Abp.Threading :提供了一些常用的线程组件,其中 AbpTimer 就是在里面实现的。

Volo.Abp.BackgroundWorkers :后台工作者的定义和实现。

Volo.Abp.BackgroundJobs.Abstractions :后台任务的一些共有定义。

Volo.Abp.BackgroundJobs :默认的后台任务管理器实现。

Volo.Abp.BackgroundJobs.HangFire :基于 Hangfire 库实现的后台任务管理器。

Volo.Abp.BackgroundJobs.RabbitMQ : 基于 RabbitMQ 实现的后台任务管理器。

二、源码分析 2.1 线程组件 2.1.1 健壮的计时器

CLR 为我们提供了多种计时器,我们一般使用的是 System.Threading.Timer ,它是基于 CLR 线程池的一个周期计时器,会根据我们配置的 Period (周期) 定时执行。在 CLR 线程池中,所有的 Timer 只有 1 个线程为其服务。这个线程直到下一个计时器的触发时间,当下一个 Timer 对象到期时,这个线程就会将 Timer 的回调方法通过 ThreadPool.QueueUserWorkItem() 扔到线程池去执行。

不过这带来了一个问题,即你的回调方法执行时间超过了计时器的周期,那么就会造成上一个任务还没执行完成又开始执行新的任务。

解决这个方法其实很简单,即启动之后,将周期设置为 Timeout.Infinite ,这样只会执行一次。当回调方法执行完成之后,就设置 dueTime 参数说明下次执行要等待多久,并且周期还是 Timeout.Infinite。

ABP vNext 已经为我们提供了健壮的计时器,该类型的定义是 AbpTimer ,在内部用到了 volatile 关键字和 Monitor 实现 条件变量模式 解决多线程环境下的问题。

public class AbpTimer : ITransientDependency { // 回调事件。 public event EventHandler Elapsed; // 执行周期。 public int Period { get; set; } // 定时器启动之后就开始运行,默认为 Fasle。 public bool RunOnStart { get; set; } // 日志记录器。 public ILogger<AbpTimer> Logger { get; set; } private readonly Timer _taskTimer; // 定时器是否在执行任务,默认为 false。 private volatile bool _performingTasks; // 定时器的运行状态,默认为 false。 private volatile bool _isRunning; public AbpTimer() { Logger = NullLogger<AbpTimer>.Instance; // 回调函数是 TimerCallBack,执行周期为永不执行。 _taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite); } public void Start(CancellationToken cancellationToken = default) { // 如果传递的周期小于等于 0 ,则抛出异常。 if (Period <= 0) { throw new AbpException("Period should be set before starting the timer!"); } // 使用互斥锁,保证线程安全。 lock (_taskTimer) { // 如果启动之后就需要马上执行,则设置为 0,马上执行任务,否则会等待 Period 毫秒之后再执行(1 个周期)。 _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite); // 定时器成功运行了。 _isRunning = true; } // 释放 _taskTimer 的互斥锁。 } public void Stop(CancellationToken cancellationToken = default) { // 使用互斥锁。 lock (_taskTimer) { // 将内部定时器设置为永不执行的状态。 _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); // 检测当前是否还有正在执行的任务,如果有则等待任务执行完成。 while (_performingTasks) { // 临时释放锁,阻塞当前线程。但是其他线程可以获取 _timer 的互斥锁。 Monitor.Wait(_taskTimer); } // 需要表示停止状态,所以标记状态为 false。 _isRunning = false; } } private void TimerCallBack(object state) { lock (_taskTimer) { // 如果有任务正在运行,或者内部定时器已经停止了,则不做任何事情。 if (!_isRunning || _performingTasks) { return; } // 临时停止内部定时器。 _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); // 表明马上需要执行任务了。 _performingTasks = true; } try { // 调用绑定的事件。 Elapsed.InvokeSafely(this, new EventArgs()); } catch { // 注意,这里将会吞噬异常。 } finally { lock (_taskTimer) { // 任务执行完成,更改状态。 _performingTasks = false; // 如果定时器还在运行,没有被停止,则启动下一个 Period 周期。 if (_isRunning) { _taskTimer.Change(Period, Timeout.Infinite); } // 解除因为释放锁而阻塞的线程。 // 如果已经调用了 Stop,则会唤醒那个因为 Wait 阻塞的线程,就会使 _isRunning 置为 false。 Monitor.Pulse(_taskTimer); } } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsfpgd.html