一、执行器注册流程
二、具体流程 1.注册监控线程 //类:JobRegistryHelper.java;方法:public void start() registryMonitorThread = new Thread(new Runnable() { @Override public void run() { while (!toStop) { try { //获取自动注册型执行器 List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0); if (groupList!=null && !groupList.isEmpty()) { //移除注册中心死亡的地址 List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); if (ids!=null && ids.size()>0) { XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); } //刷新注册中心活跃的地址,并保存为app和注册地址列表的映射 HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>(); List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date()); if (list != null) { for (XxlJobRegistry item: list) { if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) { String appname = item.getRegistryKey(); List<String> registryList = appAddressMap.get(appname); if (registryList == null) { registryList = new ArrayList<String>(); } if (!registryList.contains(item.getRegistryValue())) { registryList.add(item.getRegistryValue()); } appAddressMap.put(appname, registryList); } } } //刷新执行器地址 for (XxlJobGroup group: groupList) { List<String> registryList = appAddressMap.get(group.getAppname()); String addressListStr = null; //注册中心存在活跃的地址则更新为活跃地址,否则更新为空地址 if (registryList!=null && !registryList.isEmpty()) { Collections.sort(registryList); StringBuilder addressListSB = new StringBuilder(); for (String item:registryList) { addressListSB.append(item).append(","); } addressListStr = addressListSB.toString(); addressListStr = addressListStr.substring(0, addressListStr.length()-1); } group.setAddressList(addressListStr); group.setUpdateTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); } } } catch (Exception e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); } } try { //睡眠一个心跳超时时间,集训监控自动注册型执行器列表 TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); } } } logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop"); } }); registryMonitorThread.setDaemon(true); registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread"); registryMonitorThread.start(); 2.注册过程 1 初始化执行器 //XxlJobExecutor.java private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { if (address!=null && address.trim().length()>0) { AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); if (adminBizList == null) { adminBizList = new ArrayList<AdminBiz>(); } adminBizList.add(adminBiz); } } } } 2 执行器端注册 public void start(final String appname, final String address){ //省略部分 registryThread = new Thread(new Runnable() { @Override public void run() { // registry while (!toStop) { try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { //选择一个执行器,发起rpc注册请求 ReturnT<String> registryResult = adminBiz.registry(registryParam); if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { registryResult = ReturnT.SUCCESS; logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); //注册成功则退出循环 break; } else { //注册失败则打印日志,尝试下一个执行器 logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); } } catch (Exception e) { logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e); } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } try { if (!toStop) { //睡眠一个心跳超时时间,继续注册 TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } } catch (InterruptedException e) { if (!toStop) { logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage()); } } } //移除注册部分省略 } }); registryThread.setDaemon(true); registryThread.setName("xxl-job, executor ExecutorRegistryThread"); registryThread.start(); } 3 调度中心执行注册 //JobRegistryHelper.java public ReturnT<String> registry(RegistryParam registryParam) { // valid if (!StringUtils.hasText(registryParam.getRegistryGroup()) || !StringUtils.hasText(registryParam.getRegistryKey()) || !StringUtils.hasText(registryParam.getRegistryValue())) { return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument."); } //从线程池中获取注册线程执行注册 registryOrRemoveThreadPool.execute(new Runnable() { @Override public void run() { //更新注册结果 int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date()); if (ret < 1) { //更新失败则添加注册结果 XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date()); // fresh freshGroupRegistryInfo(registryParam); } } }); return ReturnT.SUCCESS; }xxl-job执行器的注册
内容版权声明:除非注明,否则皆为本站原创文章。