我们之前说到,从 IBackgroundJobStore 拿到了需要执行的后台任务信息集合,接下来我们就要开始执行后台任务了。
foreach (var jobInfo in waitingJobs) { jobInfo.TryCount++; jobInfo.LastTryTime = clock.Now; try { // 根据任务名称获取任务的配置参数。 var jobConfiguration = JobOptions.GetJob(jobInfo.JobName); // 根据配置里面存储的任务类型,将参数值进行反序列化。 var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType); // 构造一个新的执行上下文,让执行器执行任务。 var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs); try { jobExecuter.Execute(context); // 如果任务执行成功则删除该任务。 store.Delete(jobInfo.Id); } catch (BackgroundJobExecutionException) { // 发生任务执行失败异常时,根据指定的公式计算下一次的执行时间。 var nextTryTime = CalculateNextTryTime(jobInfo, clock); if (nextTryTime.HasValue) { jobInfo.NextTryTime = nextTryTime.Value; } else { // 超过超时时间的时候,公式计算函数返回 null,该任务置为废弃任务。 jobInfo.IsAbandoned = true; } TryUpdate(store, jobInfo); } } catch (Exception ex) { // 执行过程中,产生了未知异常,设置为废弃任务,并打印日志。 Logger.LogException(ex); jobInfo.IsAbandoned = true; TryUpdate(store, jobInfo); } }执行后台任务的时候基本分为 5 步,它们分别是:
获得任务关联的配置参数,默认不用提供,因为在之前模块初始化的时候就已经配置了(你也可以显式指定)。
通过之前存储的配置参数,将参数值反序列化出来,构造具体实例。
构造一个执行上下文。
后台任务执行器执行具体的后台任务。
成功则删除任务,失败则更新任务下次的执行状态。
至于执行器里面的真正执行操作,你都拿到了参数值和任务类型了。就可以通过类型用 IoC 获取后台任务对象的实例,然后通过反射匹配方法签名,在实例上调用这个方法传入参数即可。
public virtual void Execute(JobExecutionContext context) { // 构造具体的后台作业实例对象。 var job = context.ServiceProvider.GetService(context.JobType); if (job == null) { throw new AbpException("The job type is not registered to DI: " + context.JobType); } // 获得需要执行的方法签名。 var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute)); if (jobExecuteMethod == null) { throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType); } try { // 直接通过 MethodInfo 的 Invoke 方法调用,传入具体的实例对象和参数值即可。 jobExecuteMethod.Invoke(job, new[] { context.JobArgs }); } catch (Exception ex) { Logger.LogException(ex); // 如果是执行方法内的异常,则包装进行处理,然后抛出。 throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex) { JobType = context.JobType.AssemblyQualifiedName, JobArgs = context.JobArgs }; } } 2.3.5 集成 HangfireABP vNext 对于 Hangfire 的集成代码分布在 Volo.Abp.HangFire 和 Volo.Abp.BackgroundJobs.HangFire 模块内部,前者是在模块配置里面,调用 Hangfire 库的相关方法,注入组件到 IoC 容器当中。后者则是对后台作业进行了适配处理,替换了默认的 IBackgroundJobManager 实现。
在 AbpHangfireModule 模块内部,通过工厂创建出来一个 BackgroudJobServer 实例,并将它的生命周期与应用程序的生命周期进行绑定,以便进行销毁处理。
public class AbpHangfireModule : AbpModule { private BackgroundJobServer _backgroundJobServer; public override void ConfigureServices(ServiceConfigurationContext context) { context.Services.AddHangfire(configuration => { context.Services.ExecutePreConfiguredActions(configuration); }); } public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value; _backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider); } public override void OnApplicationShutdown(ApplicationShutdownContext context) { //TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown _backgroundJobServer.SendStop(); _backgroundJobServer.Dispose(); } }我们直奔主题,看一下基于 Hangfire 的后台作业管理器是怎么实现的。
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency { public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { // 如果没有延迟参数,则直接通过 Enqueue() 方法扔进执行对了。 if (!delay.HasValue) { return Task.FromResult( BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>( adapter => adapter.Execute(args) ) ); } else { return Task.FromResult( BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>( adapter => adapter.Execute(args), delay.Value ) ); } }上述代码中使用 HangfireJobExecutionAdapter 进行了一个适配操作,因为 Hangfire 要将一个后台任务扔进队列执行,不是用 TArgs 就能解决的。