如果没有配置TaskScheduler或者ScheduledExecutorService类型的Bean,那么调度模块只会创建一个线程去调度所有装载完毕的任务,如果任务比较多,执行密度比较大,很有可能会造成大量任务饥饿,表现为存在部分任务不会触发调度的场景(这个是调度模块生产中经常遇到的故障,需要重点排查是否没有设置TaskScheduler或者ScheduledExecutorService)。
SchedulingConfigurer是调度模块提供给使用的进行扩展的钩子接口,用于在激活所有调度任务之前回调ScheduledTaskRegistrar实例,只要拿到ScheduledTaskRegistrar实例,我们就可以使用它注册和装载新的Task。
调度任务动态装载Scheduling模块本身已经支持基于NamespaceHandler支持通过XML文件配置调度任务,但是笔者一直认为XML给人的感觉太"重",使用起来显得太笨重,这里打算扩展出JSON文件配置和基于JDBC数据源配置(也就是持久化任务,这里选用MySQL)。根据前文的源码分析,需要用到SchedulingConfigurer接口的实现,用于在所有调度任务触发之前从外部添加自定义的调度任务。先定义调度任务的一些配置属性类:
// 调度任务类型枚举 @Getter @RequiredArgsConstructor public enum ScheduleTaskType { CRON("CRON"), FIXED_DELAY("FIXED_DELAY"), FIXED_RATE("FIXED_RATE"), ; private final String type; } // 调度任务配置,enable属性为全局开关 @Data public class ScheduleTaskProperties { private Long version; private Boolean enable; private List<ScheduleTasks> tasks; } // 调度任务集合,笔者设计的时候采用一个宿主类中每个独立方法都是一个任务实例的模式 @Data public class ScheduleTasks { // 这里故意叫Klass代表Class,避免关键字冲突 private String taskHostKlass; private Boolean enable; private List<ScheduleTaskMethod> taskMethods; } // 调度任务方法 - enable为任务开关,没有配置会被ScheduleTaskProperties或者ScheduleTasks中的enable覆盖 @Data public class ScheduleTaskMethod { private Boolean enable; private String taskDescription; private String taskMethod; // 时区,cron的计算需要用到 private String timeZone; private String cronExpression; private String intervalMilliseconds; private String initialDelayMilliseconds; }设计的时候,考虑到多个任务执行方法可以放在同一个宿主类,这样可以方便同一种类的任务进行统一管理,如:
public class TaskHostClass { public void task1() { } public void task2() { } ...... public void taskN() { } }细节方面,intervalMilliseconds和initialDelayMilliseconds的单位设计为毫秒,使用字符串形式,方便可以基于StringValueResolver解析配置文件中的属性配置。添加一个抽象的SchedulingConfigurer:
@Slf4j public abstract class AbstractSchedulingConfigurer implements SchedulingConfigurer, InitializingBean, BeanFactoryAware, EmbeddedValueResolverAware { @Getter private StringValueResolver embeddedValueResolver; private ConfigurableBeanFactory configurableBeanFactory; private final List<InternalTaskProperties> internalTasks = Lists.newLinkedList(); private final Set<String> tasksLoaded = Sets.newHashSet(); @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { configurableBeanFactory = (ConfigurableBeanFactory) beanFactory; } @Override public void afterPropertiesSet() throws Exception { internalTasks.clear(); internalTasks.addAll(loadTaskProperties()); } @Override public void setEmbeddedValueResolver(StringValueResolver resolver) { embeddedValueResolver = resolver; } @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { for (InternalTaskProperties task : internalTasks) { try { synchronized (tasksLoaded) { String key = task.taskHostKlass() + "#" + task.taskMethod(); // 避免重复加载 if (!tasksLoaded.contains(key)) { if (task instanceof CronTaskProperties) { loadCronTask((CronTaskProperties) task, taskRegistrar); } if (task instanceof FixedDelayTaskProperties) { loadFixedDelayTask((FixedDelayTaskProperties) task, taskRegistrar); } if (task instanceof FixedRateTaskProperties) { loadFixedRateTask((FixedRateTaskProperties) task, taskRegistrar); } tasksLoaded.add(key); } else { log.info("调度任务已经装载,任务宿主类:{},任务执行方法:{}", task.taskHostKlass(), task.taskMethod()); } } } catch (Exception e) { throw new IllegalStateException(String.format("加载调度任务异常,任务宿主类:%s,任务执行方法:%s", task.taskHostKlass(), task.taskMethod()), e); } } } private ScheduledMethodRunnable loadScheduledMethodRunnable(String taskHostKlass, String taskMethod) throws Exception { Class<?> klass = ClassUtils.forName(taskHostKlass, null); Object target = configurableBeanFactory.getBean(klass); Method method = ReflectionUtils.findMethod(klass, taskMethod); if (null == method) { throw new IllegalArgumentException(String.format("找不到目标方法,任务宿主类:%s,任务执行方法:%s", taskHostKlass, taskMethod)); } Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); return new ScheduledMethodRunnable(target, invocableMethod); } private void loadCronTask(CronTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); String cronExpression = embeddedValueResolver.resolveStringValue(pops.cronExpression()); if (null != cronExpression) { String timeZoneString = embeddedValueResolver.resolveStringValue(pops.timeZone()); TimeZone timeZone; if (null != timeZoneString) { timeZone = TimeZone.getTimeZone(timeZoneString); } else { timeZone = TimeZone.getDefault(); } CronTask cronTask = new CronTask(runnable, new CronTrigger(cronExpression, timeZone)); taskRegistrar.addCronTask(cronTask); log.info("装载CronTask[{}#{}()]成功,cron表达式:{},任务描述:{}", cronExpression, pops.taskMethod(), pops.cronExpression(), pops.taskDescription()); } } private void loadFixedDelayTask(FixedDelayTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); long fixedDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds())); long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds())); FixedDelayTask fixedDelayTask = new FixedDelayTask(runnable, fixedDelayMilliseconds, initialDelayMilliseconds); taskRegistrar.addFixedDelayTask(fixedDelayTask); log.info("装载FixedDelayTask[{}#{}()]成功,固定延迟间隔:{} ms,初始延迟执行时间:{} ms,任务描述:{}", pops.taskHostKlass(), pops.taskMethod(), fixedDelayMilliseconds, initialDelayMilliseconds, pops.taskDescription()); } private void loadFixedRateTask(FixedRateTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); long fixedRateMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds())); long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds())); FixedRateTask fixedRateTask = new FixedRateTask(runnable, fixedRateMilliseconds, initialDelayMilliseconds); taskRegistrar.addFixedRateTask(fixedRateTask); log.info("装载FixedRateTask[{}#{}()]成功,固定执行频率:{} ms,初始延迟执行时间:{} ms,任务描述:{}", pops.taskHostKlass(), pops.taskMethod(), fixedRateMilliseconds, initialDelayMilliseconds, pops.taskDescription()); } private long parseDelayAsLong(String value) { if (null == value) { return 0L; } if (value.length() > 1 && (isP(value.charAt(0)) || isP(value.charAt(1)))) { return Duration.parse(value).toMillis(); } return Long.parseLong(value); } private boolean isP(char ch) { return (ch == 'P' || ch == 'p'); } /** * 加载任务配置,预留给子类实现 */ protected abstract List<InternalTaskProperties> loadTaskProperties() throws Exception; interface InternalTaskProperties { String taskHostKlass(); String taskMethod(); String taskDescription(); } @Builder protected static class CronTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String cronExpression; private String taskDescription; private String timeZone; @Override public String taskDescription() { return taskDescription; } public String cronExpression() { return cronExpression; } public String timeZone() { return timeZone; } @Override public String taskHostKlass() { return taskHostKlass; } @Override public String taskMethod() { return taskMethod; } } @Builder protected static class FixedDelayTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String intervalMilliseconds; private String initialDelayMilliseconds; private String taskDescription; @Override public String taskDescription() { return taskDescription; } public String initialDelayMilliseconds() { return initialDelayMilliseconds; } public String intervalMilliseconds() { return intervalMilliseconds; } @Override public String taskHostKlass() { return taskHostKlass; } @Override public String taskMethod() { return taskMethod; } } @Builder protected static class FixedRateTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String intervalMilliseconds; private String initialDelayMilliseconds; private String taskDescription; @Override public String taskDescription() { return taskDescription; } public String initialDelayMilliseconds() { return initialDelayMilliseconds; } public String intervalMilliseconds() { return intervalMilliseconds; } @Override public String taskHostKlass() { return taskHostKlass; } @Override public String taskMethod() { return taskMethod; } } }loadTaskProperties()方法用于加载任务配置,留给子类实现。
JSON配置