[Abp vNext 源码分析] - 12. 后台作业与后台工作者 (4)

这个后台工作者会周期性(取决于 AbpBackgroundJobWorkerOptions.JobPollPeriod 值,默认为 5 秒种)地从 IBackgroundJobStore 捞出一堆后台任务,并且在后台执行。至于每次执行多少个后台任务,这也取决于 AbpBackgroundJobWorkerOptions.MaxJobFetchCount 的值,默认值是 1000 个。

注意:

这里的 Options 类是 AbpBackgroundJobWorkerOptions,别和 AbpBackgroundWorkerOptions 混淆了。

所以在 AbpBackgroundJobsModule 模块里面,只做了一件事情,就是将负责后台作业的后台工作者,添加到后台工作者管理器种,并开始周期性地执行。

public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value; if (options.IsJobExecutionEnabled) { // 获得后台工作者管理器,并将负责后台作业的工作者添加进去。 context.ServiceProvider .GetRequiredService<IBackgroundWorkerManager>() .Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>() ); } } 2.3.2 后台作业的定义

在上一节里面看到,只要是实现 IBackgroundJob<TArgs> 类型的都视为一个后台作业。这个后台作业接口,只定义了一个行为,那就是执行(Execute(TArgs))。这里的 TArgs 泛型作为执行后台作业时,需要传递的参数类型。

// 因为是传入的参数,所以泛型参数是逆变的。 public interface IBackgroundJob<in TArgs> { void Execute(TArgs args); }

检查源码,发现 ABP vNext 的邮箱模块定义了一个邮件发送任务 BackgroundEmailSendingJob,它的实现大概如下。

public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency { // ... public override void Execute(BackgroundEmailSendingJobArgs args) { AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml)); } } 2.3.3 后台作业管理器

后台作业都是通过一个后台作业管理器(IBackgroundJobManager)进行管理的,这个接口定义了一个入队方法(EnqueueAsync()),注意,我们的后台作业在入队后,不是马上执行的。

说一下这个入队处理逻辑:

首先我们会通过参数的类型,获取到任务的名称。(假设任务上面没有标注 BackgroundJobNameAttribute 特性,那么任务的名称就是参数类型的 FullName 。)

构造一个 BackgroundJobInfo 对象。

通过 IBackgroundJobStore 持久化任务信息。

public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { // 获取任务名称。 var jobName = BackgroundJobNameAttribute.GetName<TArgs>(); var jobId = await EnqueueAsync(jobName, args, priority, delay); return jobId.ToString(); } protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { var jobInfo = new BackgroundJobInfo { Id = GuidGenerator.Create(), JobName = jobName, // 通过序列化器,序列化参数值,方便存储。这里内部其实使用的是 JSON.NET 进行序列化。 JobArgs = Serializer.Serialize(args), Priority = priority, CreationTime = Clock.Now, NextTryTime = Clock.Now }; // 如果任务有执行延迟,则任务的初始执行时间要加上这个延迟。 if (delay.HasValue) { jobInfo.NextTryTime = Clock.Now.Add(delay.Value); } // 持久化任务信息,方便后面执行后台作业的工作者能够取到。 await Store.InsertAsync(jobInfo); return jobInfo.Id; }

BackgroundJobNameAttribute 相关的方法:

public static string GetName<TJobArgs>() { return GetName(typeof(TJobArgs)); } public static string GetName([NotNull] Type jobArgsType) { Check.NotNull(jobArgsType, nameof(jobArgsType)); // 判断参数类型上面是否标注了特性,并且特性实现了 IBackgroundJobNameProvider 接口。 return jobArgsType .GetCustomAttributes(true) .OfType<IBackgroundJobNameProvider>() .FirstOrDefault() ?.Name // 拿不到名字,则使用类型的 FullName。 ?? jobArgsType.FullName; } 2.3.4 后台作业的存储

后台作业的存储默认是放在内存的,这点可以从 InMemoryBackgroundJobStore 类型实现看出来。在它的内部使用了一个并行字典,通过作业的 Guid 与作业进行关联绑定。

除了内存实现,在 Volo.Abp.BackgroundJobs.Domain 模块还有一个 BackgroundJobStore 实现,基本套路与 SettingStore 一样,都是存储到数据库里面。

public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency { protected IBackgroundJobRepository BackgroundJobRepository { get; } // ... public BackgroundJobInfo Find(Guid jobId) { return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>( BackgroundJobRepository.Find(jobId) ); } // ... public void Insert(BackgroundJobInfo jobInfo) { BackgroundJobRepository.Insert( ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo) ); } // ... } 2.3.5 后台作业的执行

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsfpgd.html