长安手机网站建设,站群论坛,监察部门网站建设方案,关掉自己做的网站文章目录简介调度中心一.程序启动初始化1.初始化入口类2.初始化I18n3.初始化快慢调度线程池4.初始化处理执行器注册或移除线程池更新执行器最新在线的守护线程5.初始化监控任务调度失败或执行失败的守护线程6.初始化处理执行器回调线程池监控任务执行结果丢失的守护线程7.初始化…
文章目录简介调度中心一.程序启动初始化1.初始化入口类2.初始化I18n3.初始化快慢调度线程池4.初始化处理执行器注册或移除线程池更新执行器最新在线的守护线程5.初始化监控任务调度失败或执行失败的守护线程6.初始化处理执行器回调线程池监控任务执行结果丢失的守护线程7.初始化计算每天调度情况统计、清理过期日志记录的守护线程8.初始化预读和执行任务的守护线程9.初始化处理预读任务的守护线程10.初始化资源汇总说明图二.主动发起请求1.beat调用2.idleBeat调用3.run调用4.kill调用5.log调用三.接收请求处理1.callback请求2.registry请求3.registryRemove请求四.程序结束销毁处理1.销毁入口类2.资源销毁处理执行器一.程序启动初始化1.初始化入口类2.初始化处理任务的方法3.初始化执行日志目录4.初始化操作调度中心的客户端5.初始化清除日志文件的守护线程6.初始化向调度中心反馈执行结果的守护线程7.初始化重试反馈失败记录的守护线程8.初始化守护线程并创建netty服务监听端口调用处理调用的线程池9.初始化注册执行器在线情况的守护线程10.初始化资源汇总说明图二.主动发起请求1.callback调用2.registry调用3.registryRemove调用三.接收请求处理1.beat请求2.idleBeat请求3.run请求4.kill请求5.log请求四.程序结束销毁处理1.销毁入口类2.资源销毁处理Redisson优化分布式锁问题简介
xxl-job是一款基于java开发的分布式任务调度平台集成非常简单官网下载工程后调度中心配置上mysql数据源把默认需要的表导入到数据库中调度中心项目打成jar包直接启动调度平台就创建完成。执行器为具体业务开发项目只需引入xxl-job-core依赖配置上调度中心地址、执行日志存放目录创建执行器对象使用XxlJob即可定义调度的具体任务。实现对任务调度的可视化操作方式操作非常简单此处针对重要流程进行源码分析具体使用详情可以参考官网。
调度中心
调度中心是对任务的管理任务执行状态、执行结果、执行日志进行监控的平台是一个web工程用户可以方便的进行任务管理。支持邮件报警支持CRON和固定速度两种调度方式支持Bean、shell脚本、php等多种运行模式支持随机、轮询、故障转移等多种路由策略支持子任务的连带执行支持忽略、立即执行一次两种调度过期策略支持单机串行、丢弃后续调度、覆盖之前调度三种阻塞处理策略。
一.程序启动初始化
程序启动后会做很多的资源初始化创建需要的守护线程资源初始化的入口类为JobAlarmer类和XxlJobAdminConfig类我们从这两个类来看初始化过程。
1.初始化入口类
JobAlarmer类和XxlJobAdminConfig类会作为初始化入口类是因为它们被Component修饰在spring
容器中注册为Bean对象并实现了InitializingBean接口此接口有一个方法afterPropertiesSet()在Bean初始化完并把参数注入成功后会调用afterPropertiesSet(属性设置之后)方法在此方法进行的资源初始化。JobAlarmer类还实现了ApplicationContextAware接口此接口有一个方法setApplicationContext会把spring容器上下文设置到此方法中我们可以定义一个变量来接收ApplicationContext这样就可以获取到spring容器中注册的bean对象。JobAlarmer类是为了从spring容器中获取到定义的报警类当需要报警时调用所有的报警类执行报警方法报警类支持自定义扩展扩展方式只需实现JobAlarm接口并把自定义扩展类设置为BeanComponent修饰重写alarm方法完成具体扩展方式的报警处理。
看下JobAlarmer类的源码
Component
public class JobAlarmer implements ApplicationContextAware, InitializingBean {private static Logger logger LoggerFactory.getLogger(JobAlarmer.class);private ApplicationContext applicationContext;private ListJobAlarm jobAlarmList; //存放报警的实现类可以进行扩展实现JobAlarm接口并把实现类注册为bean即可//实现ApplicationContextAware接口获取上下文得到加载到spring容器中的所有bean对象Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext applicationContext;}//实现了InitializingBean接口在Bean初始化完并把参数注入成功后会调用afterPropertiesSet()Overridepublic void afterPropertiesSet() throws Exception {//从spring容器中获取JobAlarm类型的bean并存放到list集合中MapString, JobAlarm serviceBeanMap applicationContext.getBeansOfType(JobAlarm.class);if (serviceBeanMap ! null serviceBeanMap.size() 0) {jobAlarmList new ArrayListJobAlarm(serviceBeanMap.values());}}/*** job alarm* 发送预警邮件* param info* param jobLog* return*/public boolean alarm(XxlJobInfo info, XxlJobLog jobLog) {boolean result false;//报警的集合类不为空if (jobAlarmList!null jobAlarmList.size()0) {result true; // success means all-successfor (JobAlarm alarm: jobAlarmList) {boolean resultItem false;try {//执行报警方法resultItem alarm.doAlarm(info, jobLog);} catch (Exception e) {logger.error(e.getMessage(), e);}if (!resultItem) {result false;}}}return result;}}看下XxlJobAdminConfig类的部分源码
Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {private XxlJobScheduler xxlJobScheduler;//实现了InitializingBean接口在Bean初始化完并把参数注入成功后会调用afterPropertiesSet()Overridepublic void afterPropertiesSet() throws Exception {xxlJobScheduler new XxlJobScheduler();//初始化调度中心资源xxlJobScheduler.init();}
}2.初始化I18n
系统web页面的文字支持三种类型en英文、zh_CN中文、zh_TC中文繁体具体使用哪种类型在application.propreties配置文件中约定配置项如下
xxl.job.i18nzh_CN初始化资源的入口为XxlJobAdminConfig类的afterPropertiesSet()方法此方法创建了XxlJobScheduler类执行它的init方法在它的init方法中有初始化i18n的方法initI18n()来看下initI18n()源码 //初始化i18n用于不同语言的字符显示private void initI18n(){//对阻塞处理策略枚举类重新设置title值连带着初始化了I18nfor (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {//根据选择的i18n类型从i18n配置文件中根据key加载对应的文本item.setTitle(I18nUtil.getString(jobconf_block_.concat(item.name())));}}此方法对阻塞处理策略枚举类重新设置title值title的值需要从i18n字典中匹配在读取I18n字典的时候初始化了i18n。看下根据key从I18n中获取值的方法源码 public static String getString(String key) {//加载i18n的字典文件从此字典文件中根据key获取value值return loadI18nProp().getProperty(key);}private static Properties prop null;//根据选择的i18n类型加载对应的配置文件public static Properties loadI18nProp(){if (prop ! null) {return prop;}try {// build i18n prop//获取配置的i18n类型String i18n XxlJobAdminConfig.getAdminConfig().getI18n();//根据类型拼接出需要的字典文件名String i18nFile MessageFormat.format(i18n/message_{0}.properties, i18n);// load prop//根据文件目录加载资源Resource resource new ClassPathResource(i18nFile);EncodedResource encodedResource new EncodedResource(resource,UTF-8);//加载properties配置文件信息prop PropertiesLoaderUtils.loadProperties(encodedResource);} catch (IOException e) {logger.error(e.getMessage(), e);}return prop;}
获取字典项Properties若是第一次调用则Properties为空此时会从配置文件中获取到配置的i18n类型根据类型拼接出需要的字典文件名然后加载字典文件字典文件是properties类型存放在resources/i18n目录下
然后根据key从Properties中获取到value值第一次加载后后面直接从prop中取值。
3.初始化快慢调度线程池
为了优化调度效率定义了快慢调度线程池快慢线程池的区别在于最大线程数、阻塞队列的大小快线程池的最大线程数默认为200小于200的按200处理阻塞队列为1000慢线程池的最大线程数默认为100小于100的按100处理阻塞队列为2000。执行任务调度时该选哪种线程池来执行的依据在1分钟内此任务有10次超过500毫米才调度完成使用慢线程池处理否则使用快线程池处理。
初始化此资源的入口为JobTriggerPoolHelper.toStart()方法看下此toStart()源码 private static JobTriggerPoolHelper helper new JobTriggerPoolHelper();//初始化调度线程池public static void toStart() {//调用JobTriggerPoolHelper的start方法helper.start();}JobTriggerPoolHelper类中创建了自身对象helper调用自身的start方法看下start方法源码 private ThreadPoolExecutor fastTriggerPool null;private ThreadPoolExecutor slowTriggerPool null;public void start(){//初始化快的调度线程池fastTriggerPool new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueueRunnable(1000),new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {return new Thread(r, xxl-job, admin JobTriggerPoolHelper-fastTriggerPool- r.hashCode());}});//初始化慢的调度线程池与快的不同是阻塞队列的大小为2000、最大线程数slowTriggerPool new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueueRunnable(2000),new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {return new Thread(r, xxl-job, admin JobTriggerPoolHelper-slowTriggerPool- r.hashCode());}});}此方法初始化了快慢线程池从配置文件中获取到快慢线程池的最大线程数在application.properties配置文件中约定此值
xxl.job.triggerpool.fast.max200
xxl.job.triggerpool.slow.max100具体获取此值的时候做了最小的限制 //注入配置变量Value(${xxl.job.triggerpool.fast.max})private int triggerPoolFastMax;Value(${xxl.job.triggerpool.slow.max})private int triggerPoolSlowMax;public int getTriggerPoolFastMax() {//小于200按200处理if (triggerPoolFastMax 200) {return 200;}return triggerPoolFastMax;}public int getTriggerPoolSlowMax() {//小于100按100处理if (triggerPoolSlowMax 100) {return 100;}return triggerPoolSlowMax;}4.初始化处理执行器注册或移除线程池更新执行器最新在线的守护线程
初始化此资源的入口为JobRegistryHelper.getInstance().start()主要初始化处理执行器注册、移除的线程池初始化更新自动注册执行器最新在线情况的守护线程心跳触发机制默认30秒执行器注册的周期也是30秒对大于心跳时间*390秒没有最新注册的执行器进行删除。看下start方法的源码 public void start(){// for registry or remove//创建处理执行器注册或者删除的线程池registryOrRemoveThreadPool new ThreadPoolExecutor(2,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueueRunnable(2000),new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {return new Thread(r, xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool- r.hashCode());}},new RejectedExecutionHandler() {Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn( xxl-job, registry or remove too fast, match threadpool rejected handler(run now).);}});// for monitor//创建更新执行器最新在线的守护线程registryMonitorThread new Thread(new Runnable() {Overridepublic void run() {//不销毁就一直执行while (!toStop) {try {// auto registry group//只处理自动注册的执行器组ListXxlJobGroup groupList XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);//查询到有记录if (groupList!null !groupList.isEmpty()) {// remove dead address (admin/executor)//从执行器注册的表里面查询大于心跳时间默认90秒没有过注册的记录进行删除表示这些执行器可能已经下线了ListInteger ids XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());if (ids!null ids.size()0) {//删除已经下线的执行器记录XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// fresh online address (admin/executor)//使用集合记录当前执行器在线情况key执行器AppNamevalue这个执行器分组下的执行器集合HashMapString, ListString appAddressMap new HashMapString, ListString();//查询在心跳时间默认90秒内有过注册的执行器ListXxlJobRegistry list XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());if (list ! null) {for (XxlJobRegistry item: list) {//处理是执行器类型的数据if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {String appname item.getRegistryKey();ListString registryList appAddressMap.get(appname);if (registryList null) {registryList new ArrayListString();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}//key执行器AppNamevalue这个执行器分组下的执行器集合多个执行器根据配置的appName进行分组appAddressMap.put(appname, registryList);}}}// fresh group address//刷新自动注册执行器分组里面当前在线的执行器列表for (XxlJobGroup group: groupList) {//根据key从当前在线执行器集合里面获取到某个执行器分组的在线集合ListString registryList appAddressMap.get(group.getAppname());String addressListStr null;//执行器分组在线集合不为空则重新设置下此执行器分组最新的在线情况若是为空则表示此执行器分组下已经没有在线的执行器了则给执行器在线分组设置为nullif (registryList!null !registryList.isEmpty()) {//排序Collections.sort(registryList);StringBuilder addressListSB new StringBuilder();//使用逗号进行当前执行器分组下在线执行器的数组组织for (String item:registryList) {addressListSB.append(item).append(,);}addressListStr addressListSB.toString();addressListStr addressListStr.substring(0, addressListStr.length()-1);}group.setAddressList(addressListStr);group.setUpdateTime(new Date());//更新执行器分组下当前在线的执行器数据XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error( xxl-job, job registry monitor thread error:{}, e);}}try {//默认休眠30秒与执行器心跳注册的时间保持一致TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error( xxl-job, job registry monitor thread error:{}, e);}}}logger.info( xxl-job, job registry monitor thread stop);}});//设置为守护线程registryMonitorThread.setDaemon(true);registryMonitorThread.setName(xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread);//启动线程registryMonitorThread.start();}registryOrRemoveThreadPool线程池是为了给执行器注册或者删除注册时使用registryMonitorThread守护线程会去查询注册方式为自动注册的任务组任务组数据存放在xxl_job_group表中address_type字段标识注册方式0自动注册address_list字段标识当前在线的执行器集合使用逗号连接app_name字段用于给注册的执行器分组自动注册的执行器需要带有所属的分组值根据分组值来确定此执行器属于哪个任务组。查询到自动注册的任务组后从执行器自动注册表xxl_job_registry中获取到最新的执行器在线情况执行器默认30秒注册一次对于大于90秒没有过注册的执行器把它从xxl_job_registry表中删除再从xxl_job_registry表中查询最新的执行器在线情况90秒内有过注册根据分组值组织在线情况最终更新xxl_job_group表的address_list字段值。
就是说registryMonitorThread守护线程会以30秒的休眠周期一直循环检查过期未注册的执行器并把它的注册记录删除然后重新更新任务组在线的执行器集合达到及时显示执行器上线、下线的检测。
5.初始化监控任务调度失败或执行失败的守护线程
初始化的入口为JobFailMonitorHelper.getInstance().start()初始化监控任务调度失败或者执行器执行失败日志的线程心跳触发机制默认10秒对失败的任务有配置报警邮件则发送报警邮件有配置重试次数大于0的则进行任务的重新调度。来看start方法源码 public void start(){//创建监控守护线程monitorThread new Thread(new Runnable() {Overridepublic void run() {// monitor//不停止一致运行while (!toStop) {try {//获取调度失败或者执行器执行失败的日志记录alarm_status为0告警状态0-默认、-1锁定状态、1-无需告警、2-告警成功、3-告警失败ListLong failLogIds XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);if (failLogIds!null !failLogIds.isEmpty()) {for (long failLogId: failLogIds) {// lock log//更新xxl_job_log表对应日志记录的alarm_status由0修改为-1int lockRet XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);if (lockRet 1) {//已经执行过更新continue;}//加载日志记录XxlJobLog log XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);//加载日志对应的任务信息XxlJobInfo info XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());// 1、fail retry monitor//任务若是配置了失败重试次数大于0则进行重试调用if (log.getExecutorFailRetryCount() 0) {//重试调用执行任务调度方式为重试重试次数为配置的次数减1,JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);//调度日志追加上重试调用日志信息String retryMsg brbrspan style\color:#F39C12;\ I18nUtil.getString(jobconf_trigger_type_retry) /spanbr;log.setTriggerMsg(log.getTriggerMsg() retryMsg);//更新调度日志信息XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);}// 2、fail alarm monitorint newAlarmStatus 0; // 告警状态0-默认、-1锁定状态、1-无需告警、2-告警成功、3-告警失败if (info ! null) {//发送报警邮件boolean alarmResult XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);newAlarmStatus alarmResult?2:3;} else {newAlarmStatus 1;}//更新日志记录的报警邮件是否发送成功情况XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);}}} catch (Exception e) {if (!toStop) {logger.error( xxl-job, job fail monitor thread error:{}, e);}}try {//休眠10心跳周期为10秒TimeUnit.SECONDS.sleep(10);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info( xxl-job, job fail monitor thread stop);}});//设置为守护线程monitorThread.setDaemon(true);monitorThread.setName(xxl-job, admin JobFailMonitorHelper);//启动线程monitorThread.start();}monitorThread守护线程会以10秒为休眠周期循环检查调用失败的日志文件然后对失败记录进行响应处理。
6.初始化处理执行器回调线程池监控任务执行结果丢失的守护线程
初始化的入口为JobCompleteHelper.getInstance().start()初始化处理执行器回调的线程池初始化监控执行器任务结果丢失的线程心跳机制触发默认60秒监控任务已经调度成功但是执行器一直没有反馈处理情况任务状态一直是“运行中”handle_code 0且调度开始时间到现在已经过去10分钟、且对应的执行器已经没有在心跳注册的记录已下线把这样的记录标记为执行失败。看下start方法源码 public void start(){// for callback//创建处理执行器回调的线程池callbackThreadPool new ThreadPoolExecutor(2,20,30L,TimeUnit.SECONDS,new LinkedBlockingQueueRunnable(3000),new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {return new Thread(r, xxl-job, admin JobLosedMonitorHelper-callbackThreadPool- r.hashCode());}},new RejectedExecutionHandler() {Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn( xxl-job, callback too fast, match threadpool rejected handler(run now).);}});// for monitor//创建守护线程monitorThread new Thread(new Runnable() {Overridepublic void run() {// wait for JobTriggerPoolHelper-inittry {//休眠50毫秒等待JobTriggerPoolHelper初始完成TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}// monitor//不销毁一直监听while (!toStop) {try {// 任务结果丢失处理调度记录停留在 运行中 状态超过10min且对应执行器心跳注册失败不在线则将本次调度主动标记失败Date losedTime DateUtil.addMinutes(new Date(), -10);//查询出已经调度成功但是执行器一直没有反馈处理成功任务状态一直是“运行中”handle_code 0且调度开始时间到现在已经过去10分钟、且对应的执行器已经没有心跳注册的记录ListLong losedJobIds XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);//把这样的记录标记为失败if (losedJobIds!null losedJobIds.size()0) {for (Long logId: losedJobIds) {XxlJobLog jobLog new XxlJobLog();jobLog.setId(logId);jobLog.setHandleTime(new Date());jobLog.setHandleCode(ReturnT.FAIL_CODE);jobLog.setHandleMsg( I18nUtil.getString(joblog_lost_fail) );//完成此任务并更新日志的状态值有子任务再调用子任务XxlJobCompleter.updateHandleInfoAndFinish(jobLog);}}} catch (Exception e) {if (!toStop) {logger.error( xxl-job, job fail monitor thread error:{}, e);}}try {//休眠周期是60秒心跳机制TimeUnit.SECONDS.sleep(60);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info( xxl-job, JobLosedMonitorHelper stop);}});//设置为守护线程monitorThread.setDaemon(true);monitorThread.setName(xxl-job, admin JobLosedMonitorHelper);//启动线程monitorThread.start();}callbackThreadPool线程池主要是给执行器执行任务结束后给调度中心的反馈处理monitorThread守护线程以60秒为休眠周期循序检测任务状态为运行中且调度时间已经大于10分钟并且执行器已经没有注册不在线的记录把它标记为执行失败否则此日志会一直处于运行中。
7.初始化计算每天调度情况统计、清理过期日志记录的守护线程
此初始化的入口为JobLogReportHelper.getInstance().start()初始化处理日志报表的守护线程心跳机制触发默认1分钟处理当前时间往前推两天这三天时间内的调度结果报表值把结果值按天存放到xxl_job_log_report表中配置的保存日志最大天数大于0则进行清理处理当前时间减去上次清理时间大于1天毫秒数则进行过期日志记录的删除。看下start源码 public void start(){//创建日志报表的守护线程logrThread new Thread(new Runnable() {Overridepublic void run() {// last clean log time//上次清理日志时间long lastCleanLogTime 0;//不销毁一直执行while (!toStop) {// 1、log-report refresh: refresh log report in 3 daystry {//处理当前时间往前推两天这三天时间内的调度结果值for (int i 0; i 3; i) {// todayCalendar itemDay Calendar.getInstance();itemDay.add(Calendar.DAY_OF_MONTH, -i);itemDay.set(Calendar.HOUR_OF_DAY, 0);itemDay.set(Calendar.MINUTE, 0);itemDay.set(Calendar.SECOND, 0);itemDay.set(Calendar.MILLISECOND, 0);//当前时间减去i天的000000时刻Date todayFrom itemDay.getTime();itemDay.set(Calendar.HOUR_OF_DAY, 23);itemDay.set(Calendar.MINUTE, 59);itemDay.set(Calendar.SECOND, 59);itemDay.set(Calendar.MILLISECOND, 999);//当前时间减去i天的235959时刻Date todayTo itemDay.getTime();// refresh log-report every minuteXxlJobLogReport xxlJobLogReport new XxlJobLogReport();xxlJobLogReport.setTriggerDay(todayFrom);xxlJobLogReport.setRunningCount(0);xxlJobLogReport.setSucCount(0);xxlJobLogReport.setFailCount(0);//根据起止日期从xxl_job_log日志表中查询这个时间段内总的执行次数、运行中次数、调度成功次数MapString, Object triggerCountMap XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);if (triggerCountMap!null triggerCountMap.size()0) {//总的执行次数int triggerDayCount triggerCountMap.containsKey(triggerDayCount)?Integer.valueOf(String.valueOf(triggerCountMap.get(triggerDayCount))):0;//运行中次数int triggerDayCountRunning triggerCountMap.containsKey(triggerDayCountRunning)?Integer.valueOf(String.valueOf(triggerCountMap.get(triggerDayCountRunning))):0;//调度成功次数int triggerDayCountSuc triggerCountMap.containsKey(triggerDayCountSuc)?Integer.valueOf(String.valueOf(triggerCountMap.get(triggerDayCountSuc))):0;//失败次数int triggerDayCountFail triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;xxlJobLogReport.setRunningCount(triggerDayCountRunning);xxlJobLogReport.setSucCount(triggerDayCountSuc);xxlJobLogReport.setFailCount(triggerDayCountFail);}// do refresh//把某一天的调度日志报表记录更新进去int ret XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);if (ret 1) { //若是之前没有添加过则进行插入XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);}}} catch (Exception e) {if (!toStop) {logger.error( xxl-job, job log report thread error:{}, e);}}// 2、log-clean: switch open once each day//配置的保存日志最大天数大于0则处理当前时间减去上次清理时间大于1天毫秒数则进行日志的清除if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()0 System.currentTimeMillis() - lastCleanLogTime 24*60*60*1000) {// expire-time//清理日志的时间Calendar expiredDay Calendar.getInstance();//当前时间减去配置的天数expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());expiredDay.set(Calendar.HOUR_OF_DAY, 0);expiredDay.set(Calendar.MINUTE, 0);expiredDay.set(Calendar.SECOND, 0);expiredDay.set(Calendar.MILLISECOND, 0);//得到清理的时间Date clearBeforeTime expiredDay.getTime();// clean expired log//循环处理所有的大于最大存放日期的日志ListLong logIds null;do {//查询调度日期在清理截止日期之前的日志记录logIds XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);if (logIds!null logIds.size()0) {//删除日志记录XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);}} while (logIds!null logIds.size()0);// update clean time//重新设置上次清理时间lastCleanLogTime System.currentTimeMillis();}try {//休眠1分钟心跳注册机制TimeUnit.MINUTES.sleep(1);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info( xxl-job, job log report thread stop);}});//设置为守护线程logrThread.setDaemon(true);logrThread.setName(xxl-job, admin JobLogReportHelper);//启动线程logrThread.start();}logrThread守护线程以60秒为休眠周期循序统计当前时间往前推2天这3天内按天统计任务总的执行成功、运行中、执行失败次数把结果存放到xxl_job_log_report表中供web首页显示web首页展示截图如下
还对日志记录进行过期清理虽然守护线程的循环周期是60秒但是对日志的清理方法一天只会执行一次每次执行都会重新设置上次清除时间lastCleanLogTime每次都判断当前时间与上次执行清理时间是否大于1天大于才进行日志的清理。在application.properties中配置日志的保存天数
xxl.job.logretentiondays30若是不想清除日志可以配置值小于0例如-1此处配置的日志天数需要和执行器配置的天数保存一致否则可能调度中心xxl_job_log日志表中还有记录但是执行器目录下已经删除了此任务对应的执行日志文件这样就会导致访问不到执行日志详情。
8.初始化预读和执行任务的守护线程
初始化的入口为JobScheduleHelper.getInstance().start()初始化定时守护线程每次固定休眠4到5秒为了防止在集群环境中任务被重复调度所以预读任务的时候使用数据库写锁的方式处理预读下次执行时间在当前时间5秒之内的任务对于下次执行时间到当前时间已经差着5秒以上的任务过期未执行过期调度策略为立即执行一次则进行任务调度对于下次执行时间在当前时间减去5秒内的任务进行调度对于下次执行时间在当前时间往后5秒内的任务放到map中map的key为任务的执行秒数value为这一秒需要执行的任务id集合。看下start方法源码 public void start(){// schedule threadscheduleThread new Thread(new Runnable() {Overridepublic void run() {try {//休眠5000 - System.currentTimeMillis()%1000毫秒最大值的情况为5000-0最小值的情况为5000-999//随机休眠4到5秒的范围TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}logger.info( init xxl-job admin scheduler success.);// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps 1000/50 20)//预读数量按每个任务50ms计算qps为20快线程池慢线程池最大线程数之和再乘以20即为1秒可以处理的最大任务量默认是6000int preReadCount (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Job//起始时间long start System.currentTimeMillis();//数据库连接Connection conn null;//连接是否自动提交Boolean connAutoCommit null;//预处理PreparedStatement preparedStatement null;boolean preReadSuc true;try {//获取数据库连接conn XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit conn.getAutoCommit();//关闭自动提交conn.setAutoCommit(false);//执行sql语句对xxl_job_lock添加写锁为了防止在集群环境中任务被重复调度所以使用写锁的方式处理preparedStatement conn.prepareStatement( select * from xxl_job_lock where lock_name schedule_lock for update );preparedStatement.execute();// tx start// 1、pre readlong nowTime System.currentTimeMillis();//查询预执行的任务且下次执行时间小于当前时间往后5秒最多查询可以处理的preReadCount数量ListXxlJobInfo scheduleList XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime PRE_READ_MS, preReadCount);if (scheduleList!null scheduleList.size()0) {// 2、push time-ringfor (XxlJobInfo jobInfo: scheduleList) {// time-ring jump//任务下次执行时间到当前时间已经差着5秒以上说明已经过了调度时间了if (nowTime jobInfo.getTriggerNextTime() PRE_READ_MS) {// 2.1、trigger-expire 5spass make next-trigger-timelogger.warn( xxl-job, schedule misfire, jobId jobInfo.getId());// 1、misfire match//获取到此任务配置的过期调度策略MisfireStrategyEnum misfireStrategyEnum MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);//若是立即执行一次则调用执行方法if (MisfireStrategyEnum.FIRE_ONCE_NOW misfireStrategyEnum) {// FIRE_ONCE_NOW 》 trigger//调用任务执行方法执行类型为调度过期补偿JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);logger.debug( xxl-job, schedule push trigger : jobId jobInfo.getId() );}// 2、fresh next//重新设置任务的下次执行时间和上次执行时间refreshNextValidTime(jobInfo, new Date());} else if (nowTime jobInfo.getTriggerNextTime()) {//下次执行时间在当前时间减去5秒之内// 2.2、trigger-expire 5sdirect-trigger make next-trigger-time// 1、trigger//调用任务执行方法执行类型为Cron触发JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);logger.debug( xxl-job, schedule push trigger : jobId jobInfo.getId() );// 2、fresh next//重新设置任务的下次执行时间和上次执行时间refreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read again//经过上面重新设置了下次执行时间新设置的下次执行时间还在当前时间加上5秒之内if (jobInfo.getTriggerStatus()1 nowTime PRE_READ_MS jobInfo.getTriggerNextTime()) {// 1、make ring second//计算结果值范围0到59之间int ringSecond (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ring//把任务id放到一个map集合中pushTimeRing(ringSecond, jobInfo.getId());// 3、fresh next//重新设置任务的下次执行时间和上次执行时间refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {//下次执行时间在当前时间往后延5秒之内// 2.3、trigger-pre-readtime-ring trigger make next-trigger-time// 1、make ring second//计算结果值范围0到59之间int ringSecond (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ring//把任务id放到一个map集合中pushTimeRing(ringSecond, jobInfo.getId());// 3、fresh next//重新设置任务的下次执行时间和上次执行时间refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、update trigger info//更新任务下次执行时间和上次执行时间for (XxlJobInfo jobInfo: scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {//没有进行预读处理preReadSuc false;}// tx stop} catch (Exception e) {if (!scheduleThreadToStop) {logger.error( xxl-job, JobScheduleHelper#scheduleThread error:{}, e);}} finally {// commitif (conn ! null) {try {//提交数据库释放写锁conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {//还原数据库连接的自动提交设置conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {//关闭连接conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}// close PreparedStatement//关闭预处理if (null ! preparedStatement) {try {preparedStatement.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}//计算花费的时间long cost System.currentTimeMillis()-start;// Wait seconds, align second//花费时间小于1秒则让程序休眠大于1秒则不休眠if (cost 1000) { // scan-overtime, not waittry {// pre-read period: success scan each second; fail skip this period;//线程休眠休眠时间计算若是有预读则使用1000减去0到9990到1秒之间没有预读则使用5000减去0到9994到5秒之间//查询任务的定时周期为5秒若是没有预读则休眠时间在0到1之间默认的4到5秒之间有预读则预读处理的时候已经把任务下次执行时间在当前时间5秒之内任务加到环map中了所以这里再休眠4到5秒要不然就是白跑一趟TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}logger.info( xxl-job, JobScheduleHelper#scheduleThread stop);}});//设置为守护线程scheduleThread.setDaemon(true);scheduleThread.setName(xxl-job, admin JobScheduleHelper#scheduleThread);//启动线程scheduleThread.start();
}scheduleThread守护线程以4到5秒为休眠周期循环加载下次执行时间在当前时间5秒内的任务为了防止在集群环境中任务被重复加载调度所以预读任务的时候使用数据库写锁的方式处理执行的sql语句
select * from xxl_job_lock where lock_name schedule_lock for update按快慢线程池的大小计算出可以预读的数量查询任务下次执行时间小于当前时间加5秒的记录对于下次执行时间到当前时间已经差着5秒以上的任务过期未执行过期调度策略为立即执行一次则进行任务的调度并重新设置下次执行时间对于下次执行时间在当前时间减去5秒之内调用任务执行方法并重新设置下次执行时间新设置的下次执行时间还在当前时间加上5秒之内则把此任务加到环形map中下次执行时间在当前时间往后延5秒之内未到执行时间则把此任务加到map中,并重新设置下次执行时间。上面花费时间小于1秒则让程序休眠大于1秒则不休眠休眠时间计算若是有预读则使用1000减去0到9990到1秒之间没有预读则使用5000减去0到9994到5秒之间查询任务的定时周期为5秒若是有预读则休眠时间在0到1之间默认的4到5秒之间没有预读则说明接下来的5秒内没有要执行的任务此处休眠4到5秒预读处理的时候已经把任务下次执行时间在当前时间5秒之内任务加到map中了。
9.初始化处理预读任务的守护线程
初始化的入口为JobScheduleHelper.getInstance().start()初始化处理预读任务的守护线程每次休眠周期在0到1秒之间执行放到map中的预执行任务根据当前秒数为key从map中取出任务进行调度。看下start方法源码 public void start(){// ring thread//创建环形线程用于处理上面定时线程预读任务周期5秒左右的时候对于下次执行时间在当前时间5秒内的任务使用此线程来进行调度ringThread new Thread(new Runnable() {Overridepublic void run() {while (!ringThreadToStop) {// align secondtry {//随机休眠1到1000毫秒在1秒范围内TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!ringThreadToStop) {logger.error(e.getMessage(), e);}}try {// second dataListInteger ringItemData new ArrayList();//获取当前的秒数int nowSecond Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长跨过刻度向前校验一个刻度for (int i 0; i 2; i) {//ringData存放的是预处理时当前时间5秒内需要执行的任务使用此map对象存放的集合在ringThread线程中进行处理//ringData的key是5秒内预处理的任务的秒数ListInteger tmpData ringData.remove( (nowSecond60-i)%60 );if (tmpData ! null) {ringItemData.addAll(tmpData);}}// ring triggerlogger.debug( xxl-job, time-ring beat : nowSecond Arrays.asList(ringItemData) );//当前这一秒下有任务要处理if (ringItemData.size() 0) {// do trigger//循环处理这一秒下的任务for (int jobId: ringItemData) {// do trigger//调用任务执行方法执行类型为Cron触发JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);}// clearringItemData.clear();}} catch (Exception e) {if (!ringThreadToStop) {logger.error( xxl-job, JobScheduleHelper#ringThread error:{}, e);}}}logger.info( xxl-job, JobScheduleHelper#ringThread stop);}});//设置为守护线程ringThread.setDaemon(true);ringThread.setName(xxl-job, admin JobScheduleHelper#ringThread);//启动线程ringThread.start();}
ringThread守护线程以0到1秒之间的休眠周期循环处理预读5秒内加到map中的任务根据当前的秒数为key从map中取出这一秒需要执行的任务进行调度。
10.初始化资源汇总说明图
1初始化的线程池
2初始化的守护线程
二.主动发起请求
调度中心主动向执行器发起的请求可以从调度中心客户端ExecutorBizClient类找到此类存在xxl-job的公共核心xxl-job-core工程中目录结构为com.xxl.job.core.biz.client从类里面可以看到包含beat、idleBeat、run、kill、log五个方法。看下ExecutorBizClient的源码
/*** 调度中心-》调用执行器的客户端供调度中心使用*/
public class ExecutorBizClient implements ExecutorBiz {public ExecutorBizClient() {}public ExecutorBizClient(String addressUrl, String accessToken) {this.addressUrl addressUrl;this.accessToken accessToken;// validif (!this.addressUrl.endsWith(/)) {this.addressUrl this.addressUrl /;}}private String addressUrl ;private String accessToken;private int timeout 3;//心跳检测执行器是否在线用于故障转移方式时的调用测试Overridepublic ReturnTString beat() {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrlbeat, accessToken, timeout, , String.class);}//心跳检测执行器是否忙碌用于忙碌转移方式时的调用测试Overridepublic ReturnTString idleBeat(IdleBeatParam idleBeatParam){//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrlidleBeat, accessToken, timeout, idleBeatParam, String.class);}//调用执行器运行Overridepublic ReturnTString run(TriggerParam triggerParam) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl run, accessToken, timeout, triggerParam, String.class);}//停止执行器的执行Overridepublic ReturnTString kill(KillParam killParam) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl kill, accessToken, timeout, killParam, String.class);}//查询执行器产生的执行日志信息Overridepublic ReturnTLogResult log(LogParam logParam) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl log, accessToken, timeout, logParam, LogResult.class);}
}此客户端类提供了5个调用执行器的方法当方法被调用时会拼接远程执行器的接口地址进行远程调用。当需要向某个执行器发起请求时需要为此执行器创建一个客户端并把此客户端存放到map中key为执行器地址value为客户端当下次再需要向此执行器发起请求时直接从map中获取客户端即可。创建客户端时需要把执行器地址和token作为参数传递进来这样在发起远程调用时直接拼接url和传递token就行。看下根据执行器地址获取客户端的源码
ExecutorBiz executorBiz XxlJobScheduler.getExecutorBiz(address)此方法调用到XxlJobScheduler类并使用ExecutorBiz父接口来接收客户端。看下getExecutorBiz方法源码 //使用集合记录执行器地址和它的客户端key执行器地址value调用执行器的客户端private static ConcurrentMapString, ExecutorBiz executorBizRepository new ConcurrentHashMapString, ExecutorBiz();public static ExecutorBiz getExecutorBiz(String address) throws Exception {// validif (addressnull || address.trim().length()0) {return null;}// load-cacheaddress address.trim();//已经创建过则直接使用ExecutorBiz executorBiz executorBizRepository.get(address);if (executorBiz ! null) {return executorBiz;}// set-cache//没有创建过则创建executorBiz new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());//放到map集合中供下次使用executorBizRepository.put(address, executorBiz);return executorBiz;}1.beat调用
beat调用是调度中心检测执行器是否在线用于故障转移方式时的调用测试在源码中的调用位置
public class ExecutorRouteFailover extends ExecutorRouter {Overridepublic ReturnTString route(TriggerParam triggerParam, ListString addressList) {StringBuffer beatResultSB new StringBuffer();//遍历执行器地址for (String address : addressList) {// beatReturnTString beatResult null;try {//根据调用地址获取它对应的执行器客户端ExecutorBiz executorBiz XxlJobScheduler.getExecutorBiz(address);//调用beat方法发送请求beatResult executorBiz.beat();} catch (Exception e) {logger.error(e.getMessage(), e);beatResult new ReturnTString(ReturnT.FAIL_CODE, e );}beatResultSB.append( (beatResultSB.length()0)?brbr:).append(I18nUtil.getString(jobconf_beat) ).append(braddress).append(address).append(brcode).append(beatResult.getCode()).append(brmsg).append(beatResult.getMsg());// beat success//执行器还在线则使用此执行器地址进行调用if (beatResult.getCode() ReturnT.SUCCESS_CODE) {beatResult.setMsg(beatResultSB.toString());//能调通的执行器地址beatResult.setContent(address);return beatResult;}}return new ReturnTString(ReturnT.FAIL_CODE, beatResultSB.toString());}
}当路由策略为故障转移时在执行器集合中选择能够调通没有故障的执行器进行任务的调用。确定某个执行器是否有故障只用调用一下它的beat接口有反馈就是没有故障没有反馈就是有故障。
2.idleBeat调用
idleBeat调用是检测执行器是否忙碌用于忙碌转移方式时的调用测试在源码中的调用位置
public class ExecutorRouteBusyover extends ExecutorRouter {Overridepublic ReturnTString route(TriggerParam triggerParam, ListString addressList) {StringBuffer idleBeatResultSB new StringBuffer();//遍历所有执行器for (String address : addressList) {// beatReturnTString idleBeatResult null;try {//根据地址获取调用执行器的客户端ExecutorBiz executorBiz XxlJobScheduler.getExecutorBiz(address);//执行调用idleBeatResult executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));} catch (Exception e) {logger.error(e.getMessage(), e);idleBeatResult new ReturnTString(ReturnT.FAIL_CODE, e );}idleBeatResultSB.append( (idleBeatResultSB.length()0)?brbr:).append(I18nUtil.getString(jobconf_idleBeat) ).append(braddress).append(address).append(brcode).append(idleBeatResult.getCode()).append(brmsg).append(idleBeatResult.getMsg());// beat success//调用成功表示此执行器当前不处于忙碌状态if (idleBeatResult.getCode() ReturnT.SUCCESS_CODE) {idleBeatResult.setMsg(idleBeatResultSB.toString());idleBeatResult.setContent(address);return idleBeatResult;}}return new ReturnTString(ReturnT.FAIL_CODE, idleBeatResultSB.toString());}
}当路由策略为忙碌转移时在执行器集合中选择当前没有处理此任务的执行器执行任务若是当前执行器正在执行此任务任务的上一次调度执行器还没有完成又来一次调度则返回失败直到找到没有处理此任务的执行器进行调用。注意忙碌转移是针对此任务执行器是否还在执行不是指执行器是否有在执行任务例如执行器当前正在执行其他任务也算不忙碌。
3.run调用
run调用是指调用执行器执行任务xxl-job提供了6中类型的调度方式看下调度方式枚举类TriggerTypeEnum源码
//调度方式枚举类
public enum TriggerTypeEnum {//手动触发MANUAL(I18nUtil.getString(jobconf_trigger_type_manual)),//Cron触发CRON(I18nUtil.getString(jobconf_trigger_type_cron)),//失败重试触发RETRY(I18nUtil.getString(jobconf_trigger_type_retry)),//父任务触发PARENT(I18nUtil.getString(jobconf_trigger_type_parent)),//API触发API(I18nUtil.getString(jobconf_trigger_type_api)),//调度过期补偿MISFIRE(I18nUtil.getString(jobconf_trigger_type_misfire));private TriggerTypeEnum(String title){this.title title;}private String title;public String getTitle() {return title;}
}MANUAL手动触发一次调度用在web页面点击执行一次按钮时触发
CRON到执行时间了自动进行任务调度触发用在预读守护线程加载预读任务、处理预读任务的守护线程到时间时的触发
RETRY任务调度失败或执行失败后由监控任务调度失败或执行失败的守护线程触发
PARENT父任务触发当调度中心收到执行器执行成功的反馈、处理任务结果丢失记录、停止执行器执行的反馈这三种情况下更新完执行结果后若是此任务有配置子任务则触发子任务的执行
APIAPI触发目前xxl-job中没有使用到此种类型
MISFIRE调度过期补偿用在预读守护线程加载下次执行时间在当前时间5秒内任务时对于下次执行时间已经在当前时间-5秒之前的任务超时未执行触发一次补偿调度。
源码中的调用我们就从手动触发执行一次开始看起当在web界面中点击执行一次某个任务页面截图如下
调用到的控制类是JobInfoController看下接口方法源码 //触发调度RequestMapping(/trigger)ResponseBodypublic ReturnTString triggerJob(int id, String executorParam, String addressList) {if (executorParam null) {executorParam ;}//调度类型为手动触发JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);return ReturnT.SUCCESS;}处理任务调度的类是JobTriggerPoolHelper看下他的trigger源码 private static JobTriggerPoolHelper helper new JobTriggerPoolHelper();/*** 执行任务调度* param jobId 任务id* param triggerType 调度类型* param failRetryCount 失败重试次数大于等于0才生效* 0: use this param* 0: use param from job info config* param executorShardingParam //执行器分片信息* param executorParam //任务参数* param addressList //执行器地址*/public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);}JobTriggerPoolHelper类先创建了自身对象再调用addTrigger添加调度任务方法看下addTrigger源码 //使用volatile修饰变量是为了在多线程下每个线程能及时拿到最新的minTim值private volatile long minTim System.currentTimeMillis()/60000; // ms min 计算出来的是分钟//jobTimeoutCountMap是以一分钟为口径进行统计的一分钟内某个任务调度超过500毫秒的次数根据此次数来选择使用快小于10次、慢大于10次线程池执行此任务private volatile ConcurrentMapInteger, AtomicInteger jobTimeoutCountMap new ConcurrentHashMap();public void addTrigger(final int jobId,final TriggerTypeEnum triggerType,final int failRetryCount,final String executorShardingParam,final String executorParam,final String addressList) {// choose thread pool//选择使用快的还是慢的线程池执行此任务ThreadPoolExecutor triggerPool_ fastTriggerPool;//使用jobTimeoutCountMap来存放任务超时次数集合key任务idvalue次数它统计的口径是一分钟内的调度数据AtomicInteger jobTimeoutCount jobTimeoutCountMap.get(jobId);//在1分钟内此任务有10次以上超过500毫米才调度完成使用慢线程池处理if (jobTimeoutCount!null jobTimeoutCount.get() 10) { // job-timeout 10 times in 1 min//使用慢线程池处理triggerPool_ slowTriggerPool;}// trigger//线程池执行任务triggerPool_.execute(new Runnable() {Overridepublic void run() {//调度开始时间long start System.currentTimeMillis();try {// do trigger//开始进行调度XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {// check timeout-count-map//调度结束后的时间-分钟long minTim_now System.currentTimeMillis()/60000;//调度结束后的分钟数不等于设置的分钟数if (minTim ! minTim_now) {//重新设置分钟数作为下一个统计口径的时间点minTim minTim_now;//清空map集合已经过了minTim这一分钟的统计口径jobTimeoutCountMap是以一分钟为口径进行统计的jobTimeoutCountMap.clear();}// incr timeout-count-map//计算一共花费了多少时间long cost System.currentTimeMillis()-start;//调度时间大于500毫秒if (cost 500) { // ob-timeout threshold 500ms//putIfAbsent:向map中添加记录若是存在此key的记录则返回value若是不存在则插入插入的时候返回的值为nullAtomicInteger timeoutCount jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));//已经存在此key的值timeoutCount才不等于nullif (timeoutCount ! null) {//使用AtomicInteger线程安全的方式把次数加1cas自旋的方式先比对在加1timeoutCount.incrementAndGet();}}}}});}处理此任务有快慢两个线程池可供选择默认选择快线程池执行选择慢线程池的条件在1分钟内此任务有10次以上超过500毫米才调度完成使用慢线程池处理。这里使用volatile来修饰minTim为了在多线程下每个线程能及时拿到最新的minTim值使用jobTimeoutCountMap来存放某个任务在一分钟内调度超过500毫秒的次数也是使用volatile来修饰。当任务执行完成判断当前的分钟数是否还等于minTim若是等于说明还在minTim这分钟内若是不等于大于说明已经过了minTim这分钟的统计维度需要把jobTimeoutCountMap清空并把当前分钟数赋值给minTim调度时间大于500毫秒使用putIfAbsent方法向map中添加记录若是存在此key的记录则返回value不进行插入若是不存在则插入插入的时候返回的值为nullput方法与putIfAbsent方法都会把旧的值返回不同之处是当key存在put方法会进行覆盖而putIfAbsent不会进行覆盖。已经存在此key的值把原有的AtomicInteger值使用线程安全的方式次数加1不存在则插入的AtomicInteger值就是1。
最终调用XxlJobTrigger.trigger()执行调度方法看下trigger()方法源码 public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {// load data//加载任务数据XxlJobInfo jobInfo XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (jobInfo null) {logger.warn( trigger fail, jobId invalidjobId{}, jobId);return;}//有新设置的执行参数则进行参数的覆盖if (executorParam ! null) {jobInfo.setExecutorParam(executorParam);}//调度失败重试次数int finalFailRetryCount failRetryCount0?failRetryCount:jobInfo.getExecutorFailRetryCount();//执行器分组信息XxlJobGroup group XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressList//有设置新的执行器地址则进行覆盖if (addressList!null addressList.trim().length()0) {group.setAddressType(1);group.setAddressList(addressList.trim());}// sharding param//分片参数int[] shardingParam null;if (executorShardingParam!null){String[] shardingArr executorShardingParam.split(/);if (shardingArr.length2 isNumeric(shardingArr[0]) isNumeric(shardingArr[1])) {shardingParam new int[2];shardingParam[0] Integer.valueOf(shardingArr[0]);shardingParam[1] Integer.valueOf(shardingArr[1]);}}//路由策略是分片广播if (ExecutorRouteStrategyEnum.SHARDING_BROADCASTExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) group.getRegistryList()!null !group.getRegistryList().isEmpty() shardingParamnull) {//分片广播则需要向所有的注册器都进行调用for (int i 0; i group.getRegistryList().size(); i) {//处理调度processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {//非分片广播if (shardingParam null) {shardingParam new int[]{0, 1};}//处理调度processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}trigger方法只是做一下调用的前置处理根据任务id查询出任务若是有新的执行参数则覆盖xxl_job_info表中配置的参数查询出执行器分组信息获取到有哪些执行器可以调用参数涉及到分片信息所以默认创建只有一个分片的数组索引为0若是此任务配置的路由策略是分片广播则所有的执行器都要执行任务根据有多少个执行器来确定分为多少片调用执行器的时候传递当前执行器是第几个分片总共有多少个分片执行器在处理任务对应的具体方法时处理的逻辑为先查询这次任务涉及到的总记录数需要按某个字段进行排序然后用总记录数除以总的执行器数得到每个执行器处理的平均执行数最后一个执行器的执行数量为总记录数-平均执行数乘以执行器数量-1然后可以根据当前执行器所属的分片数来查询到此执行器需要处理的记录范围例如mysql的limit start,end语句start为分片索引*平均执行数end为执行数量。广播分片执行器处理举例说明 //获取当前分片序号int shardIndex XxlJobHelper.getShardIndex();//获取总分片数int shardTotal XxlJobHelper.getShardTotal();//总记录条数--查询数据库int targetTotal xxService.getTargetTotal();//查询记录的起始位置int start shardIndex;//查询记录的offsetint end 1;//总记录大于分片数量if(targetTotal shardTotal){//计算每个分片平均处理的数量int avgTotal targetTotal/shardTotal;//数据查询起始start shardIndex*avgTotal;//数据的offsetend avgTotal;//最后一个执行器if(shardIndex shardTotal-1) {//总数量-前面几个执行器执行的数量end targetTotal-(avgTotal*(shardTotal-1);} }//使用start和end去查询需要处理的数据//拼接mysql语句limit start end//对查询到的记录进行处理... 任务的处理调用processTrigger方法看下processTrigger源码 private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){// param//阻塞处理策略ExecutorBlockStrategyEnum blockStrategy ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy//路由策略ExecutorRouteStrategyEnum executorRouteStrategyEnum ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy//当路由策略为分片广播组织分片参数String shardingParam (ExecutorRouteStrategyEnum.SHARDING_BROADCASTexecutorRouteStrategyEnum)?String.valueOf(index).concat(/).concat(String.valueOf(total)):null;// 1、save log-id//新建一条日志信息添加上执行时间XxlJobLog jobLog new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);logger.debug( xxl-job trigger start, jobId:{}, jobLog.getId());// 2、init trigger-param//构造调度任务的参数TriggerParam triggerParam new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init address//获取到执行器的地址String address null;ReturnTString routeAddressResult null;if (group.getRegistryList()!null !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST executorRouteStrategyEnum) {//路由为分片广播的方式根据分片的索引index获取到执行器的地址if (index group.getRegistryList().size()) {address group.getRegistryList().get(index);} else {address group.getRegistryList().get(0);}} else {//根据配置的路由策略从注册执行器列表匹配出此次调度的执行器地址路由策略包含随机、故障转移、忙碌转移等routeAddressResult executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() ReturnT.SUCCESS_CODE) {address routeAddressResult.getContent();}}} else {//执行器地址为空异常routeAddressResult new ReturnTString(ReturnT.FAIL_CODE, I18nUtil.getString(jobconf_trigger_address_empty));}// 4、trigger remote executorReturnTString triggerResult null;//执行器地址不为空在进行调度if (address ! null) {//执行调度triggerResult runExecutor(triggerParam, address);} else {triggerResult new ReturnTString(ReturnT.FAIL_CODE, null);}// 5、collection trigger info//构造调度执行器的调度-日志信息StringBuffer triggerMsgSb new StringBuffer();triggerMsgSb.append(I18nUtil.getString(jobconf_trigger_type)).append().append(triggerType.getTitle());triggerMsgSb.append(br).append(I18nUtil.getString(jobconf_trigger_admin_adress)).append().append(IpUtil.getIp());triggerMsgSb.append(br).append(I18nUtil.getString(jobconf_trigger_exe_regtype)).append().append( (group.getAddressType() 0)?I18nUtil.getString(jobgroup_field_addressType_0):I18nUtil.getString(jobgroup_field_addressType_1) );triggerMsgSb.append(br).append(I18nUtil.getString(jobconf_trigger_exe_regaddress)).append().append(group.getRegistryList());triggerMsgSb.append(br).append(I18nUtil.getString(jobinfo_field_executorRouteStrategy)).append().append(executorRouteStrategyEnum.getTitle());if (shardingParam ! null) {triggerMsgSb.append((shardingParam));}triggerMsgSb.append(br).append(I18nUtil.getString(jobinfo_field_executorBlockStrategy)).append().append(blockStrategy.getTitle());triggerMsgSb.append(br).append(I18nUtil.getString(jobinfo_field_timeout)).append().append(jobInfo.getExecutorTimeout());triggerMsgSb.append(br).append(I18nUtil.getString(jobinfo_field_executorFailRetryCount)).append().append(finalFailRetryCount);triggerMsgSb.append(brbrspan style\color:#00c0ef;\ I18nUtil.getString(jobconf_trigger_run) /spanbr).append((routeAddressResult!nullrouteAddressResult.getMsg()!null)?routeAddressResult.getMsg()brbr:).append(triggerResult.getMsg()!null?triggerResult.getMsg():);// 6、save log trigger-info//设置日志其他相关字段供调度失败再次调度时候使用jobLog.setExecutorAddress(address);jobLog.setExecutorHandler(jobInfo.getExecutorHandler());jobLog.setExecutorParam(jobInfo.getExecutorParam());jobLog.setExecutorShardingParam(shardingParam);jobLog.setExecutorFailRetryCount(finalFailRetryCount);//jobLog.setTriggerTime();//设置调度结果状态值jobLog.setTriggerCode(triggerResult.getCode());//设置调度信息jobLog.setTriggerMsg(triggerMsgSb.toString());//更新日志记录XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);logger.debug( xxl-job trigger end, jobId:{}, jobLog.getId());}processTrigger方法新建了一条执行日志记录插入到xxl_job_log表中构造调度任务的参数实体TriggerParam作为调用执行器的传递参数封装获取执行此次任务的执行器地址若是路由策略为分片广播的方式根据分片的索引index从执行器集合中获取执行器地址若是其他路由方式则使用对应的策略获取到执行器地址例如随机、故障转移、忙碌转移等故障转移就是上面介绍的beat调用忙碌转移就是上面介绍的idleBeat调用然后拿着构造好的参数类TriggerParam、匹配到的执行器地址address执行任务的调用runExecutor等到执行器返回调用结果后构造调度执行器的详细日志信息把调用的结果状态值和日志信息更新到一开始插入的日志表中此日志记录着调用需要的所有参数这样在进行失败重调的时候参数直接从日志记录中取。注意这里执行器只是返回是否调度成功不返回具体是否执行成功执行情况是等待执行器主动调用调度中心进行反馈。
任务的具体调度是runExecutor方法看下runExecutor源码 public static ReturnTString runExecutor(TriggerParam triggerParam, String address){ReturnTString runResult null;try {//根据执行器地址获取调度执行器的客户端ExecutorBiz executorBiz XxlJobScheduler.getExecutorBiz(address);//执行调度方法runResult executorBiz.run(triggerParam);} catch (Exception e) {logger.error( xxl-job trigger error, please check if the executor[{}] is running., address, e);runResult new ReturnTString(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}StringBuffer runResultSB new StringBuffer(I18nUtil.getString(jobconf_trigger_run) );runResultSB.append(braddress).append(address);runResultSB.append(brcode).append(runResult.getCode());runResultSB.append(brmsg).append(runResult.getMsg());runResult.setMsg(runResultSB.toString());return runResult;}
runExecutor方法根据执行器地址获取到调用客户端然后执行此客户端的run方法即调用到ExecutorBizClient类的run方法此run方法发起远程http调用到此一次完整的调用流程走完。run方法的源码 //调用执行器运行Overridepublic ReturnTString run(TriggerParam triggerParam) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl run, accessToken, timeout, triggerParam, String.class);}4.kill调用
kill调用是调度中心对执行器正在处理的任务进行停止处理当任务调度成功后还没有收到执行器的反馈调度中心可以调用kill来停止执行器的执行。在web 中的操作界面如下 点击终止任务对应的接口为/joblog/logKill看下logKill接口的源码 RequestMapping(/logKill)ResponseBodypublic ReturnTString logKill(int id){// base checkXxlJobLog log xxlJobLogDao.load(id);XxlJobInfo jobInfo xxlJobInfoDao.loadById(log.getJobId());//任务不存在if (jobInfonull) {return new ReturnTString(500, I18nUtil.getString(jobinfo_glue_jobid_unvalid));}//调用执行器没有成功if (ReturnT.SUCCESS_CODE ! log.getTriggerCode()) {return new ReturnTString(500, I18nUtil.getString(joblog_kill_log_limit));}// request of killReturnTString runResult null;try {//根据执行器地址获取对应的客户端ExecutorBiz executorBiz XxlJobScheduler.getExecutorBiz(log.getExecutorAddress());//调用方法runResult executorBiz.kill(new KillParam(jobInfo.getId()));} catch (Exception e) {logger.error(e.getMessage(), e);runResult new ReturnTString(500, e.getMessage());}//停止任务执行成功if (ReturnT.SUCCESS_CODE runResult.getCode()) {//把执行器处理状态设置为失败log.setHandleCode(ReturnT.FAIL_CODE);log.setHandleMsg( I18nUtil.getString(joblog_kill_log_byman): (runResult.getMsg()!null?runResult.getMsg():));log.setHandleTime(new Date());//更新日志信息执行结果完成任务XxlJobCompleter.updateHandleInfoAndFinish(log);return new ReturnTString(runResult.getMsg());} else {return new ReturnTString(500, runResult.getMsg());}}调用执行器停止执行之前先根据日志id获取到日志记录校验当前任务是否还在校验执行器是否调度成功只有调度成功才能进行停止执行的调用根据执行器地址获取对应的客户端执行器地址在调度成功的时候已经写到日志记录中从日志记录中取出执行地址即可调用停止方法停止任务执行成功把日志记录的执行器处理状态设置为失败更新日志信息执行结果完成任务。调用kill方法即调用到ExecutorBizClient类的kill方法此kill方法发起远程http调用。kill方法的源码 //停止执行器的执行Overridepublic ReturnTString kill(KillParam killParam) {//发起远程http调用return XxlJobRemotingUtil.postBody(addressUrl kill, accessToken, timeout, killParam, String.class);}5.log调用
log调用为执行器web页面查看某个日志对应的执行器产生的执行日志文件进行一次任务调度调度中心侧会产生一条日志记录存放到xxl_job_log表中执行器处理任务的时候会产生自己的执行日志执行器的处理日志存在于部署执行器的某个目录下。此log调用就是根据日志id和调度时间从执行器中加载执行日志文件执行日志文件的存储规则为默认的目录/调度时间例2023-03-04/日志id.log。
web界面中查看执行日志的入口截图 显示执行日志的界面及发起请求日志的接口截图 执行器中存放执行日志文件的截图
加载执行日志文件的接口为logDetailCat接口看下logDetailCat源码 //查询具体执行明细需要调用到此任务具体执行的那台机器去获取RequestMapping(/logDetailCat)ResponseBodypublic ReturnTLogResult logDetailCat(String executorAddress, long triggerTime, long logId, int fromLineNum){try {ExecutorBiz executorBiz XxlJobScheduler.getExecutorBiz(executorAddress);ReturnTLogResult logResult executorBiz.log(new LogParam(triggerTime, logId, fromLineNum));// is endif (logResult.getContent()!null logResult.getContent().getFromLineNum() logResult.getContent().getToLineNum()) {XxlJobLog jobLog xxlJobLogDao.load(logId);//处理状态为200表示执行完成500表示执行异常也结束0位未执行完成if (jobLog.getHandleCode() 0) {//日志已经加载完成logResult.getContent().setEnd(true);}}return logResult;} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnTLogResult(ReturnT.FAIL_CODE, e.getMessage());}}根据执行器地址获取操作执行器调用的客户端向执行器发起请求传递调度时间、日志id这样执行器端就可以拼接出此日志对应的执行器文件地址然后加载文件并根据传入的起始行数进行加载传入起始行数是因为查看执行日志的时候执行器未必已经处理完成当执行器未处理完成则前端使用定时器去调用logDetailCat接口并传递新的起始行若是执行器已经处理完成则返回给前端一个end为true的标识前端不再调用logDetailCat接口。调用log方法即调用到ExecutorBizClient类的log方法此log方法发起远程http调用。log方法的源码 //查询执行器产生的执行日志信息Overridepublic ReturnTLogResult log(LogParam logParam) {//发起http调用return XxlJobRemotingUtil.postBody(addressUrl log, accessToken, timeout, logParam, LogResult.class);}三.接收请求处理
调度中心会接收执行器的请求接收哪些请求可以从调度中心的api类JobApiController中看出JobApiController类位于com.xxl.job.admin.controller包下包含的接口处理为callback、registry、registryRemove三类。看下JobApiController类源码
Controller
RequestMapping(/api)
public class JobApiController {Resourceprivate AdminBiz adminBiz; //具体类型为AdminBizImpl/*** api** param uri* param data* return*/RequestMapping(/{uri})ResponseBodyPermissionLimit(limitfalse)public ReturnTString api(HttpServletRequest request, PathVariable(uri) String uri, RequestBody(required false) String data) {// valid//只支持post方式if (!POST.equalsIgnoreCase(request.getMethod())) {return new ReturnTString(ReturnT.FAIL_CODE, invalid request, HttpMethod not support.);}if (urinull || uri.trim().length()0) {return new ReturnTString(ReturnT.FAIL_CODE, invalid request, uri-mapping empty.);}//调用调度中心若是配置了token则需要从执行器的request中获取到token值在执行器传递token值时使用XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN作为key传递此处也按这个key取值if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!null XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()0 !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {return new ReturnTString(ReturnT.FAIL_CODE, The access token is wrong.);}// services mapping//根据接口的结尾匹配具体是哪个方法//callback方法执行器回调调度中心的方法if (callback.equals(uri)) {ListHandleCallbackParam callbackParamList GsonTool.fromJson(data, List.class, HandleCallbackParam.class);return adminBiz.callback(callbackParamList);} else if (registry.equals(uri)) {//registry方法执行器向调度中心进行在线注册的方法默认30秒调用一次心跳注册机制RegistryParam registryParam GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registry(registryParam);} else if (registryRemove.equals(uri)) {//registryRemove方法执行器结束在bean销毁的时候会调用销毁执行器在线记录的方法RegistryParam registryParam GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registryRemove(registryParam);} else {return new ReturnTString(ReturnT.FAIL_CODE, invalid request, uri-mapping( uri ) not found.);}}
}JobApiController类使用接口后缀通配符的方式接收执行器的调用只支持post方式调用调度中心若是配置了token则需要从执行器的request请求中获取到token值在执行器传递token值时使用XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN作为key传递此处也按这个key取值根据接口的结尾匹配具体是哪个方法。
处理具体请求都是AdminBiz接口类使用注入的方式进行引入说明引入的是AdminBiz接口的具体实现类并且此类需要注册为bean对象。实现AdminBiz接口的类是AdminBizClient和AdminBizImpl只有AdminBizImpl类注册为bean对象使用Service修饰Service注解再使用Component所以此处注入的AdminBiz具体类是AdminBizImpl。
1.callback请求
callback请求是执行器把执行结果反馈给调度中心的方法调度中心收到此反馈之后更新日志记录的执行状态、执行消息结束一次任务调度若是有子任务则进行子任务的调度。看下callback请求的入口源码 //callback方法执行器回调调度中心的方法if (callback.equals(uri)) {//接收参数转成list集合ListHandleCallbackParam callbackParamList GsonTool.fromJson(data, List.class, HandleCallbackParam.class);return adminBiz.callback(callbackParamList);}接收参数转成HandleCallbackParam实体调用callback方法。看下AdminBizImpl类的callback方法源码 //响应执行器反馈的方法Overridepublic ReturnTString callback(ListHandleCallbackParam callbackParamList) {//JobCompleteHelper调度中心独有处理类return JobCompleteHelper.getInstance().callback(callbackParamList);}处理反馈的具体类是JobCompleteHelper看下JobCompleteHelper类的callback源码 public ReturnTString callback(ListHandleCallbackParam callbackParamList) {//使用反馈线程池处理反馈记录callbackThreadPool.execute(new Runnable() {Overridepublic void run() {//循环处理所有的执行结果for (HandleCallbackParam handleCallbackParam: callbackParamList) {ReturnTString callbackResult callback(handleCallbackParam);logger.debug( JobApiController.callback {}, handleCallbackParam{}, callbackResult{},(callbackResult.getCode() ReturnT.SUCCESS_CODE?success:fail), handleCallbackParam, callbackResult);}}});return ReturnT.SUCCESS;}使用反馈线程池处理反馈记录循环处理所有的执行结果调用类里的callback方法看下callback源码 //调度中心对执行器反馈的处理private ReturnTString callback(HandleCallbackParam handleCallbackParam) {// valid log item//检查日志信息XxlJobLog log XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());if (log null) {return new ReturnTString(ReturnT.FAIL_CODE, log item not found.);}if (log.getHandleCode() 0) {return new ReturnTString(ReturnT.FAIL_CODE, log repeate callback.); // avoid repeat callback, trigger child job etc}// handle msg//在原有执行日志的基础上追加上反馈日志StringBuffer handleMsg new StringBuffer();if (log.getHandleMsg()!null) {handleMsg.append(log.getHandleMsg()).append(br);}if (handleCallbackParam.getHandleMsg() ! null) {handleMsg.append(handleCallbackParam.getHandleMsg());}// success, save log//设置执行时间、执行结果状态值、执行日志log.setHandleTime(new Date());log.setHandleCode(handleCallbackParam.getHandleCode());log.setHandleMsg(handleMsg.toString());//完成此任务并更新日志的状态值有子任务再调用子任务XxlJobCompleter.updateHandleInfoAndFinish(log);return ReturnT.SUCCESS;}此方法先检查日志信息在原有执行日志的基础上追加上反馈日志设置执行时间、执行结果状态值、执行日志完成此任务并更新日志的状态值有子任务再调用子任务。看下完成任务更新日志的方法updateHandleInfoAndFinish源码 public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {// finish// 完成此任务有子任务再调用子任务finishJob(xxlJobLog);// text最大64kb 避免长度过长if (xxlJobLog.getHandleMsg().length() 15000) {xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) );}// fresh handle//更新日志的执行器处理情况信息return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);}完成此任务有子任务再调用子任务更新日志的执行器处理情况信息。看下完成任务若是有子任务再调度子任务的finishJob源码 private static void finishJob(XxlJobLog xxlJobLog){// 1、handle success, to trigger child jobString triggerChildMsg null;//任务执行完成if (XxlJobContext.HANDLE_CODE_SUCCESS xxlJobLog.getHandleCode()) {XxlJobInfo xxlJobInfo XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId());//查询此任务下是否还有子任务有子任务则执行子任务if (xxlJobInfo!null xxlJobInfo.getChildJobId()!null xxlJobInfo.getChildJobId().trim().length()0) {triggerChildMsg brbrspan style\color:#00c0ef;\ I18nUtil.getString(jobconf_trigger_child_run) /spanbr;//子任务id使用逗号拼接此处使用逗号进行分割String[] childJobIds xxlJobInfo.getChildJobId().split(,);//循环调用子任务执行for (int i 0; i childJobIds.length; i) {//子任务id合法int childJobId (childJobIds[i]!null childJobIds[i].trim().length()0 isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1;if (childJobId 0) {//执行子任务调用类型为父类调用JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null);ReturnTString triggerChildResult ReturnT.SUCCESS;// add msgtriggerChildMsg MessageFormat.format(I18nUtil.getString(jobconf_callback_child_msg1),(i1),childJobIds.length,childJobIds[i],(triggerChildResult.getCode()ReturnT.SUCCESS_CODE?I18nUtil.getString(system_success):I18nUtil.getString(system_fail)),triggerChildResult.getMsg());} else {triggerChildMsg MessageFormat.format(I18nUtil.getString(jobconf_callback_child_msg2),(i1),childJobIds.length,childJobIds[i]);}}}}if (triggerChildMsg ! null) {xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() triggerChildMsg );}}若是任务执行完成查询此任务下是否还有子任务有子任务则执行子任务子任务id使用逗号拼接此处使用逗号进行分割子任务id合法执行子任务调用类型为父类调用。web页面中设置子任务的截图
2.registry请求
registry请求为执行器启动或者心跳机制默认30秒向调度中心注册在线情况的行为这样调度中心在进行调度的时候才能够知道哪些执行器是在线的只有在线的执行器才能响应调度。看下registry请求的入口源码
else if (registry.equals(uri)) {//registry方法执行器向调度中心进行在线注册的方法默认30秒调用一次心跳注册机制RegistryParam registryParam GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registry(registryParam);} 接收参数转成RegistryParam实体调用registry方法。看下AdminBizImpl类的registry方法源码 Overridepublic ReturnTString registry(RegistryParam registryParam) {//JobRegistryHelper调度中心独有处理类return JobRegistryHelper.getInstance().registry(registryParam);}处理反馈的具体类是JobRegistryHelper看下JobRegistryHelper类的registry源码 //响应执行器注册的方法public ReturnTString registry(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnTString(ReturnT.FAIL_CODE, Illegal Argument.);}// async execute//使用注册或删除注册的线程池执行registryOrRemoveThreadPool.execute(new Runnable() {Overridepublic void run() {//先调用更新的方法int ret XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret 1) {//没有记录则进行插入XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// fresh//此方法是一个空方法刷新执行器的最新在线情况已经由registryMonitorThread守护线程执行freshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;}使用注册或删除注册的线程池执行此次任务执行器心跳注册信息存放在xxl_job_registry表中注册的时候若是此执行器对应的记录已经存在则更新它的最新注册时间若是不存在则进行插入。执行器属于哪个任务组是根据它的registry_key字段值来确定的registryMonitorThread守护线程默认30秒执行一次清理xxl_job_registry表把大于90秒没有过注册的执行器记录删除然后把xxl_job_registry最新的执行器记录按registry_key分组拼接执行器地址集合根据registry_key值等于xxl_job_group表app_name字段为条件把最新执行器地址更新到address_list字段中address_list是对某个分组下当前在线执行器地址的合集使用逗号拼接。web页面显示任务组当前在线的执行器集合截图
3.registryRemove请求
registryRemove请求为执行器下线的时候告知调度中心自己下线了需要从注册表中移除此执行器否则调度的时候没法给出响应。看下registryRemove请求的入口源码
else if (registryRemove.equals(uri)) {//registryRemove方法执行器结束在bean销毁的时候会调用销毁执行器在线记录的方法RegistryParam registryParam GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registryRemove(registryParam);}接收参数转成RegistryParam实体调用registryRemove方法。看下AdminBizImpl类的registryRemove方法源码 Overridepublic ReturnTString registryRemove(RegistryParam registryParam) {//JobRegistryHelper调度中心独有处理类return JobRegistryHelper.getInstance().registryRemove(registryParam);}处理移除的具体类是JobRegistryHelper看下JobRegistryHelper类的registryRemove源码 //响应执行器删除注册的方法public ReturnTString registryRemove(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnTString(ReturnT.FAIL_CODE, Illegal Argument.);}// async execute//使用注册或删除注册的线程池执行registryOrRemoveThreadPool.execute(new Runnable() {Overridepublic void run() {//从执行器注册表中删除记录int ret XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());if (ret 0) {//此方法是一个空方法刷新执行器的最新在线情况已经由registryMonitorThread守护线程执行freshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;}使用注册或删除注册的线程池执行此次处理把执行器的注册记录从xxl_job_registry删除在下次registryMonitorThread守护线程处理的时候会重新组织执行器在线集合到时候此执行器会被剔除。
四.程序结束销毁处理
程序启动的时候初始化了4个线程池、6个守护线程当程序结束的时候需要销毁这些资源。资源销毁的入口类为XxlJobAdminConfig。
1.销毁入口类
XxlJobAdminConfig类是销毁的入口类是因为它实现了DisposableBean接口重写了destroy方法。当bean被销毁的时候会执行destroy方法可以从这里作为销毁处理的入口。看下XxlJobAdminConfig类销毁相关的源码
Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {// 实现DisposableBean接口重写它的bean销毁方法Overridepublic void destroy() throws Exception {xxlJobScheduler.destroy();}
}XxlJobAdminConfig被Component注解修饰在程序启动的时候会加载到spring容器中此时XxlJobAdminConfig就是一个bean对象实现了DisposableBean接口即bean销毁的接口就是当程序停止的时候会销毁bean这样在销毁XxlJobAdminConfig的时候可以从这里进行资源的清理。
2.资源销毁处理
销毁资源调用到XxlJobScheduler类的destroy()看下destroy方法源码 //销毁调度中心资源public void destroy() throws Exception {// stop-schedule//停止预读线程、环形处理任务线程JobScheduleHelper.getInstance().toStop();// admin log report stop//停止日志报表守护线程JobLogReportHelper.getInstance().toStop();// admin lose-monitor stop//销毁处理执行器反馈的线程池、停止没法完成任务监听的守护线程JobCompleteHelper.getInstance().toStop();// admin fail-monitor stop//停止监听失败任务再进行重试调度、发报警邮件的守护线程JobFailMonitorHelper.getInstance().toStop();// admin registry stop//销毁处理执行器注册或者删除的线程池、停止监听执行器是否在线的守护线程JobRegistryHelper.getInstance().toStop();// admin trigger pool stop//销毁处理任务调度的快、慢线程池JobTriggerPoolHelper.toStop();}此destroy销毁方法就是为了销毁初始化init时创建的线程池、守护线程下面逐个介绍下销毁过程。
1JobScheduleHelper.getInstance().toStop()
停止预读守护线程、环形处理任务守护线程看下源码 public void toStop(){// 1、stop schedule//停止守护线程的while条件scheduleThreadToStop true;try {//休眠1秒TimeUnit.SECONDS.sleep(1); // wait} catch (InterruptedException e) {logger.error(e.getMessage(), e);}//中断线程if (scheduleThread.getState() ! Thread.State.TERMINATED){// interrupt and waitscheduleThread.interrupt();try {scheduleThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}// if has ring data//是否还有未处理完的环形预处理任务boolean hasRingData false;if (!ringData.isEmpty()) {for (int second : ringData.keySet()) {ListInteger tmpData ringData.get(second);if (tmpData!null tmpData.size()0) {hasRingData true;break;}}}//有未处理完的预读任务if (hasRingData) {try {//休眠8秒让预读处理任务处理完成TimeUnit.SECONDS.sleep(8);} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}// stop ring (wait job-in-memory stop)//停止环形任务线程的while条件ringThreadToStop true;try {//休眠一秒TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {logger.error(e.getMessage(), e);}//中断环形预处理线程if (ringThread.getState() ! Thread.State.TERMINATED){// interrupt and waitringThread.interrupt();try {ringThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}logger.info( xxl-job, JobScheduleHelper stop);}停止守护线程的while条件scheduleThreadToStop是被volatile修饰的被volatile修饰的字段有总线嗅探感知机制当scheduleThreadToStop的值在某个线程中被改变时会把改变的结果值及时写到主线程然后其他引用了此变量的线程会感知到变化并把自己副本的此变量值失效重新读取最新的值。还有未处理完的预处理任务则让程序休眠8秒8秒已经足够处理预读任务了因为预读的任务是5秒内的。
2JobLogReportHelper.getInstance().toStop()
停止日志报表守护线程看下源码 public void toStop(){//跳出while语句toStop true;// interrupt and wait//中断线程logrThread.interrupt();try {//等待线程处理完成logrThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}3JobCompleteHelper.getInstance().toStop()
销毁处理执行器回调的线程池、停止没法完成任务监听的守护线程看下源码 public void toStop(){//跳出while语句toStop true;// stop registryOrRemoveThreadPool//销毁线程池callbackThreadPool.shutdownNow();// stop monitorThread (interrupt and wait)//中断线程monitorThread.interrupt();try {monitorThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}4JobFailMonitorHelper.getInstance().toStop()
停止监听失败任务再进行重试调度、发报警邮件的守护线程看下源码 public void toStop(){//跳出while语句toStop true;// interrupt and wait//中断线程monitorThread.interrupt();try {monitorThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}5JobRegistryHelper.getInstance().toStop()
销毁处理执行器注册或者删除的线程池、停止监听执行器是否在线的守护线程看下源码 public void toStop(){//跳出while语句toStop true;// stop registryOrRemoveThreadPool//销毁线程池registryOrRemoveThreadPool.shutdownNow();// stop monitir (interrupt and wait)//中断线程registryMonitorThread.interrupt();try {registryMonitorThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}
6JobTriggerPoolHelper.toStop()
销毁处理任务调度的快、慢线程池看下源码 public static void toStop() {helper.stop();}public void stop() {//triggerPool.shutdown();//销毁线程池fastTriggerPool.shutdownNow();slowTriggerPool.shutdownNow();logger.info( xxl-job trigger thread pool shutdown success.);}
执行器
执行器是对任务的具体执行是任务逻辑处理的具体实现提供响应调度中心调度也有主动向调度中心发起请求。执行器一般就是开发业务代码的系统某些模块需要使用到定时器处理功能项目引入xxl-job-core依赖即可作为执行器开发使用。
一.程序启动初始化
程序启动后会做很多的资源初始化初始化netty来监听某个端口调度中心调用此netty的端口地址即可与执行器建立连接。资源初始化的入口类为XxlJobSpringExecutor类我们从这个类来看初始化过程。
1.初始化入口类
之所以说入口类为XxlJobSpringExecutor是因为我们在执行器侧配置xxl-job的配置文件时使用XxlJobSpringExecutor实体来接收配置信息并把XxlJobSpringExecutor注册为Bean对象有了Bean对象我们对资源的初始化即可从这里入手。来看下执行器的配置类源码
Configuration
public class XxlJobConfig {private Logger logger LoggerFactory.getLogger(XxlJobConfig.class);//调度中心地址Value(${xxl.job.admin.addresses})private String adminAddresses;//token值Value(${xxl.job.accessToken})private String accessToken;//所属的执行器分组Value(${xxl.job.executor.appname})private String appname;//执行器地址Value(${xxl.job.executor.address})private String address;//ipValue(${xxl.job.executor.ip})private String ip;//netty监听的端口Value(${xxl.job.executor.port})private int port;//执行日志存放的目录Value(${xxl.job.executor.logpath})private String logPath;//执行日志最多存放天数Value(${xxl.job.executor.logretentiondays})private int logRetentionDays;//注册xxlJobExecutor的beanBeanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info( xxl-job config init.);XxlJobSpringExecutor xxlJobSpringExecutor new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}
}此配置类接收了application.properties配置文件中的配置信息创建了一个XxlJobSpringExecutor实体类把接收到的配置信息都赋值到此实体类中并把此XxlJobSpringExecutor实体注册成Bean对象。来看下XxlJobSpringExecutor实体源码
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {private static final Logger logger LoggerFactory.getLogger(XxlJobSpringExecutor.class);// 实现了SmartInitializingSingleton接口只适用于单列bean在bean实例初始化完成后会调用afterSingletonsInstantiated方法Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository/*initJobHandlerRepository(applicationContext);*/// init JobHandler Repository (for method)//初始化任务方法处理所有Bean中使用XxlJob注解标识的方法initJobHandlerMethodRepository(applicationContext);// refresh GlueFactory//重新设置GlueFactory的类型为SpringGlueFactoryGlueFactory.refreshInstance(1);// super starttry {//调用到XxlJobExecutor类的start方法对一些资源进行初始化super.start();} catch (Exception e) {throw new RuntimeException(e);}}// 实现DisposableBean接口重写它的bean销毁方法Overridepublic void destroy() {super.destroy();}private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext null) {return;}// init job handler from method//从程序上下文中获取到所有的bean名称集合String[] beanDefinitionNames applicationContext.getBeanNamesForType(Object.class, false, true);//遍历bean集合for (String beanDefinitionName : beanDefinitionNames) {//根据bean名称从程序上下文获取到此bean对象Object bean applicationContext.getBean(beanDefinitionName);MapMethod, XxlJob annotatedMethods null; // referred to org.springframework.context.event.EventListenerMethodProcessor.processBeantry {//对Bean对象进行方法过滤查询到方法被XxlJob注解修饰是则放到annotatedMethods集合中annotatedMethods MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookupXxlJob() {Overridepublic XxlJob inspect(Method method) {//判断方法被XxlJob注解修饰才返回return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error(xxl-job method-jobhandler resolve error for bean[ beanDefinitionName ]., ex);}//当前遍历的bean没有被XxlJob注解修饰则调过处理if (annotatedMethodsnull || annotatedMethods.isEmpty()) {continue;}//循环处理当前Bean下被XxlJob修饰的方法for (Map.EntryMethod, XxlJob methodXxlJobEntry : annotatedMethods.entrySet()) {//执行的方法Method executeMethod methodXxlJobEntry.getKey();//XxlJob注解类XxlJob xxlJob methodXxlJobEntry.getValue();// regist//注册此任务处理器registJobHandler(xxlJob, bean, executeMethod);}}}// ---------------------- applicationContext ----------------------private static ApplicationContext applicationContext;//实现ApplicationContextAware接口获取上下文得到加载到spring容器中的所有bean对象Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {XxlJobSpringExecutor.applicationContext applicationContext;}public static ApplicationContext getApplicationContext() {return applicationContext;}
}XxlJobSpringExecutor类继承了XxlJobExecutor类接收配置信息的字段都是定义在XxlJobExecutor类中的XxlJobSpringExecutor类实现了ApplicationContextAware接口重写它的setApplicationContext方法即可得到spring上下文实现了SmartInitializingSingleton接口重写它的afterSingletonsInstantiated方法在bean实例初始化完成后会调用afterSingletonsInstantiated方法这个方法就是初始化的真正入口。
2.初始化处理任务的方法
调度中心添加任务的时候需要指定此任务由执行器的哪个处理任务方法来执行因为一个执行器里面可以定义多个处理任务的方法。调度中心web界面配置处理任务的方法截图
执行器使用XxlJob注解来修饰每一个处理任务的方法XxlJob注解提供三个可选配置value值用于匹配调度中心创建任务时的JobHandler值init是配置处理任务之前的初始方法destroy是配置处理任务之后的销毁工作。看下XxlJob源码
Target({ElementType.METHOD})
Retention(RetentionPolicy.RUNTIME)
Inherited
public interface XxlJob {/*** jobhandler name*/String value();/*** init handler, invoked when JobThread init*/String init() default ;/*** destroy handler, invoked when JobThread destroy*/String destroy() default ;
}要想在程序启动时获取到配置了哪些处理任务的方法即被XxlJob修饰的方法需要把配置任务的类设置成Bean对象即使用Component修饰这样可以根据Bean对象来找方法内包含XxlJob注解的方法把这些方法保存到一个集合当收到调度中心发来的执行任务命令时可以从这个集合中找到它对应的方法然后执行此方法。看下配置具体处理任务的源码
Component
public class SampleXxlJob {private static Logger logger LoggerFactory.getLogger(SampleXxlJob.class);/*** 1、简单任务示例Bean模式*/XxlJob(demoJobHandler)public void demoJobHandler() throws Exception {XxlJobHelper.log(XXL-JOB, Hello World.);for (int i 0; i 5; i) {XxlJobHelper.log(beat at: i);TimeUnit.SECONDS.sleep(2);}// default success}/*** 2、分片广播任务*/XxlJob(shardingJobHandler)public void shardingJobHandler() throws Exception {// 分片参数int shardIndex XxlJobHelper.getShardIndex();int shardTotal XxlJobHelper.getShardTotal();XxlJobHelper.log(分片参数当前分片序号 {}, 总分片数 {}, shardIndex, shardTotal);// 业务逻辑for (int i 0; i shardTotal; i) {if (i shardIndex) {XxlJobHelper.log(第 {} 片, 命中分片开始处理, i);} else {XxlJobHelper.log(第 {} 片, 忽略, i);}}}/*** 生命周期任务示例任务初始化与销毁时支持自定义相关逻辑*/XxlJob(value demoJobHandler2, init init, destroy destroy)public void demoJobHandler2() throws Exception {XxlJobHelper.log(XXL-JOB, Hello World.);}public void init(){logger.info(init);}public void destroy(){logger.info(destroy);}
}处理被XxlJob修饰方法的入口为XxlJobSpringExecutor类实现了SmartInitializingSingleton接口重写它的afterSingletonsInstantiated方法当Bean对象实例化初始化后会执行此方法此方法内包含解析XxlJob的方法initJobHandlerMethodRepository看下此方法的源码 private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext null) {return;}// init job handler from method//从程序上下文中获取到所有的bean名称集合String[] beanDefinitionNames applicationContext.getBeanNamesForType(Object.class, false, true);//遍历bean集合for (String beanDefinitionName : beanDefinitionNames) {//根据bean名称从程序上下文获取到此bean对象Object bean applicationContext.getBean(beanDefinitionName);MapMethod, XxlJob annotatedMethods null; // referred to org.springframework.context.event.EventListenerMethodProcessor.processBeantry {//对Bean对象进行方法过滤查询到方法被XxlJob注解修饰是则放到annotatedMethods集合中annotatedMethods MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookupXxlJob() {Overridepublic XxlJob inspect(Method method) {//判断方法被XxlJob注解修饰才返回return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error(xxl-job method-jobhandler resolve error for bean[ beanDefinitionName ]., ex);}//当前遍历的bean没有被XxlJob注解修饰则调过处理if (annotatedMethodsnull || annotatedMethods.isEmpty()) {continue;}//循环处理当前Bean下被XxlJob修饰的方法for (Map.EntryMethod, XxlJob methodXxlJobEntry : annotatedMethods.entrySet()) {//执行的方法Method executeMethod methodXxlJobEntry.getKey();//XxlJob注解类XxlJob xxlJob methodXxlJobEntry.getValue();// regist//注册此任务处理器registJobHandler(xxlJob, bean, executeMethod);}}}此方法从程序上下文中获取到所有的bean名称集合遍历bean集合根据bean名称从程序上下文获取到此bean对象对Bean对象进行方法过滤查询到方法被XxlJob注解修饰的记录放到annotatedMethods集合中循环处理当前Bean下被XxlJob修饰的方法注册此任务处理方法。注册任务处理方法为registJobHandler看下registJobHandler源码 protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){if (xxlJob null) {return;}//获取注解XxlJob(demoJobHandler)配置的值String name xxlJob.value();//make and simplify the variables since theyll be called several times later//获取此Bean对象的类Class? clazz bean.getClass();//获取方法名称String methodName executeMethod.getName();if (name.trim().length() 0) {throw new RuntimeException(xxl-job method-jobhandler name invalid, for[ clazz # methodName ] .);}//判断是否已经有名称为name值的XxlJobif (loadJobHandler(name) ! null) {throw new RuntimeException(xxl-job jobhandler[ name ] naming conflicts.);}//方法关闭安全检查executeMethod.setAccessible(true);// init and destroyMethod initMethod null;Method destroyMethod null;//注解XxlJob是否有配置init属性if (xxlJob.init().trim().length() 0) {try {//通过反射机制获取到init方法initMethod clazz.getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException(xxl-job method-jobhandler initMethod invalid, for[ clazz # methodName ] .);}}//注解XxlJob是否有配置destroy属性if (xxlJob.destroy().trim().length() 0) {try {//通过反射机制获取到destroy方法destroyMethod clazz.getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException(xxl-job method-jobhandler destroyMethod invalid, for[ clazz # methodName ] .);}}// registry jobhandler// 把此被XxlJob注解修饰的方法注册到任务处理器中new MethodJobHandler创建一个任务处理器方法registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}此方法获取注解XxlJob配置的value值此值与调度中心的JobHandler对应一个执行器内此value是唯一的检查是否配置了init和destory属性若是配置了则使用Bean的反射机制获取到具体的方法。对于解析到的这些值使用MethodJobHandler实体来存放他们然后把此处理器添加到一个map集合中key为XxlJob配置的value值value为创建的MethodJobHandler实体。看下具体注册的registJobHandler源码: //job处理器集合keyXxlJob注解的value值value此任务执行的对象包含Bean对象执行的方法、初始方法、销毁方法private static ConcurrentMapString, IJobHandler jobHandlerRepository new ConcurrentHashMapString, IJobHandler();public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info( xxl-job register jobhandler success, name:{}, jobHandler:{}, name, jobHandler);//把任务添加处理器集合中后续当需要处理某个XxlJob定义的任务时直接从jobHandlerRepository集合按key取出直接调用它的执行方法即可return jobHandlerRepository.put(name, jobHandler);}使用ConcurrentMap来存储这些处理方法当执行器被调度的时候根据任务配置的JobHandler为key从此map中获取到具体的处理类。
3.初始化执行日志目录
在afterSingletonsInstantiated()方法调用了初始其他资源的方法super.start()因为初始化入口类XxlJobSpringExecutor继承了XxlJobExecutor所以此start方法调用到XxlJobExecutor类的方法。看下start方法源码 public void start() throws Exception {// init logpath//初始化执行日志目录XxlJobFileAppender.initLogPath(logPath);// init invoker, admin-client//初始化操作调度中心的客户端initAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThread//初始化清除日志文件的守护线程清除周期为1天按配置的保留文件天数进行过期文件的清除JobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThread//初始化调度反馈线程若反馈阻塞队列有值则进行反馈并把反馈结果写入日志文件中若是反馈失败则把记录写入到反馈失败日志中初始化一个重试失败反馈的线程进行失败心跳重试反馈TriggerCallbackThread.getInstance().start();// init executor-server//初始化netty服务监听端口的调用情况做出响应处理把当前执行器注册到调度中心中初始化一个注册线程并指定时间进行心跳调用注册方法initEmbedServer(address, ip, port, appname, accessToken);}start方法包含了很多其他的初始化方法需要的参数logPath、adminAddresses等是在创建XxlJobSpringExecutor类时已经赋上值这些值的来源就是application.properties中配置的值。看下初始化执行日志目录的initLogPath方法源码 private static String logBasePath /data/applogs/xxl-job/jobhandler;private static String glueSrcPath logBasePath.concat(/gluesource);//初始化执行日志目录public static void initLogPath(String logPath){// initif (logPath!null logPath.trim().length()0) {logBasePath logPath;}// mk base dir//创建目录File logPathDir new File(logBasePath);//不存在则进行创建if (!logPathDir.exists()) {logPathDir.mkdirs();}logBasePath logPathDir.getPath();// mk glue dir//创建目录File glueBaseDir new File(logPathDir, gluesource);//不存在则进行创建if (!glueBaseDir.exists()) {glueBaseDir.mkdirs();}glueSrcPath glueBaseDir.getPath();}public static String getLogPath() {return logBasePath;}public static String getGlueSrcPath() {return glueSrcPath;}使用logBasePath记录执行日志的存储目录glueSrcPath记录像shell模式这样的需要把运行脚本组织为bash.sh这样的文件的目录。若是用户配置了logPath目录则进行logBasePath的覆盖没有配置使用默认的目录。
执行日志存储截图
运行文件存储截图
4.初始化操作调度中心的客户端
执行器需要与调度中心进行交互调度中心可能是集群部署的所以需要使用集合存放所有的调度中心客户端执行器调用调度中心需要知道地址、token值所以客户端类存放了调度中心的地址和tokentoken值需要与调度中心配置的保存一致否则进行调度的时候验证不通过。这里把操作调度中心的客户端都初始化好后面需要调用的时候直接取客户端发起请求即可。看下初始化调度中心的方法initAdminBizList源码 //存放所有的调用调度中心的客户端private static ListAdminBiz adminBizList;//初始化连接调度中心的客户端private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {//调度中心的地址使用逗号进行分割if (adminAddresses!null adminAddresses.trim().length()0) {for (String address: adminAddresses.trim().split(,)) {if (address!null address.trim().length()0) {//创建调用调度中心的客户端AdminBiz adminBiz new AdminBizClient(address.trim(), accessToken);if (adminBizList null) {adminBizList new ArrayListAdminBiz();}adminBizList.add(adminBiz);}}}}public static ListAdminBiz getAdminBizList(){return adminBizList;}adminAddresses是application.properties配置文件中定义的调度中心地址当调度中心为集群时使用逗号连接此处使用逗号就行分割创建所有的调度中心客户端类AdminBizClient并把它存放到list集合中。
5.初始化清除日志文件的守护线程
执行日志文件支持设置保存天数此处的天数需要与调度中心设置的日志记录保存天数一致否则调度中心查看某个日志记录的执行日志时会查不到。这里说下日志记录是调度中心存放在表xxl_job_log中的记录执行日志是执行器执行任务产生的任务文件存放在执行器部署服务器的目录下。看下JobLogFileCleanThread类下初始化的start方法源码 public void start(final long logRetentionDays){// limit min value//日志存留天数需要大于3才有效果if (logRetentionDays 3 ) {return;}//创建一个线程localThread new Thread(new Runnable() {Overridepublic void run() {//没有停止线程while (!toStop) {try {// clean log dir, over logRetentionDays//获取磁盘下的日志文件集合File[] childDirs new File(XxlJobFileAppender.getLogPath()).listFiles();if (childDirs!null childDirs.length0) {// today//获取今天的时间Calendar todayCal Calendar.getInstance();todayCal.set(Calendar.HOUR_OF_DAY,0);todayCal.set(Calendar.MINUTE,0);todayCal.set(Calendar.SECOND,0);todayCal.set(Calendar.MILLISECOND,0);Date todayDate todayCal.getTime();for (File childFile: childDirs) {// valid//判断是否为目录日志文件是按日期存放的例2023-02-25/1.logif (!childFile.isDirectory()) {continue;}//文件需要包含-if (childFile.getName().indexOf(-) -1) {continue;}// file create dateDate logFileCreateDate null;try {SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd);//把以日期格式命名的文件名转成日期格式logFileCreateDate simpleDateFormat.parse(childFile.getName());} catch (ParseException e) {logger.error(e.getMessage(), e);}if (logFileCreateDate null) {continue;}//当前时间-文件创建时间的差值毫秒大于logRetentionDays指定的日志保留天数logRetentionDays * (24 * 60 * 60 * 1000)是把天数转成毫秒if ((todayDate.getTime()-logFileCreateDate.getTime()) logRetentionDays * (24 * 60 * 60 * 1000) ) {//递归删除此文件夹及它下面的文件FileUtil.deleteRecursively(childFile);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {//休眠一天时间TimeUnit.DAYS.sleep(1);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info( xxl-job, executor JobLogFileCleanThread thread destroy.);}});//设置为守护线程localThread.setDaemon(true);localThread.setName(xxl-job, executor JobLogFileCleanThread);//启动守护线程localThread.start();}配置的日志保存天数要大于3才生效创建一个休眠周期为1天的守护线程循环进行判断清理每次清理时获取磁盘下的日志文件集合日志文件的存放格式为2023-02-25/1.log在具体日志的外层加了一个日志的日期文件夹这里只处理文件夹判断文件夹名为日期格式包含-然后把文件夹名称转成日期使用当前日期减去文件夹转成的日期若是差值大于需要保存的天数则递归删除此文件夹及它下面的文件。
6.初始化向调度中心反馈执行结果的守护线程
调度中心调用执行器的执行方法后执行器没有立即进行任务的执行先给调用中心返回调度成功把此调度任务添加到队列中当任务被执行线程取出来执行结束后把执行结果放到反馈队列中此守护线程就是把反馈队列中的反馈信息反馈到调度中心。看下TriggerCallbackThread类初始化反馈执行结果守护线程的start源码 public void start() {// valid//检查是否有调用调度中心的客户端if (XxlJobExecutor.getAdminBizList() null) {logger.warn( xxl-job, executor callback config fail, adminAddresses is null.);return;}// callback//创建反馈回调线程triggerCallbackThread new Thread(new Runnable() {Overridepublic void run() {// normal callback//只要不停止就一直循环获取while(!toStop){try {//使用take方法出队take和put方法不互斥读写分离分别使用takeLock/putLock进行加锁HandleCallbackParam callback getInstance().callBackQueue.take();//回调参数类不为空则处理回调if (callback ! null) {// callback list param//定义一个集合接收callBackQueue队列中的所有回调类ListHandleCallbackParam callbackParamList new ArrayListHandleCallbackParam();//drainTo方法为把callBackQueue队列中的所有值转移到新的callbackParamList集合中经过此方法调用此时callBackQueue为空callbackParamList接收到队列里面的所有元素int drainToNum getInstance().callBackQueue.drainTo(callbackParamList);//一开始出队列的的对象也要加入到集合中callbackParamList.add(callback);// callback, will retry if errorif (callbackParamList!null callbackParamList.size()0) {//处理回调doCallback(callbackParamList);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}// last callback//当停止反馈线程后把当前callBackQueue反馈队列里面还没有反馈完的记录进行反馈try {ListHandleCallbackParam callbackParamList new ArrayListHandleCallbackParam();int drainToNum getInstance().callBackQueue.drainTo(callbackParamList);if (callbackParamList!null callbackParamList.size()0) {doCallback(callbackParamList);}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info( xxl-job, executor callback thread destroy.);}});//设置为守护线程triggerCallbackThread.setDaemon(true);triggerCallbackThread.setName(xxl-job, executor TriggerCallbackThread);//启动线程triggerCallbackThread.start();
}没有调用调度中心客户端则不进行线程创建反馈守护线程没有休眠周期一直循环从反馈队列callBackQueue中取值当有反馈记录直接执行反馈当停止反馈线程后把当前callBackQueue反馈队列里面还没有反馈完的记录进行反馈。
7.初始化重试反馈失败记录的守护线程
执行器向调度中心反馈执行结果的时候可能网络问题或者调度中心重启了导致反馈失败反馈失败的记录执行器会放到反馈失败目录文件下存放反馈失败文件地址定义源码 //回调失败日志目录private static String failCallbackFilePath XxlJobFileAppender.getLogPath().concat(File.separator).concat(callbacklog).concat(File.separator);//回调失败日志文件名private static String failCallbackFileName failCallbackFilePath.concat(xxl-job-callback-{x}).concat(.log);在执行器配置的日志目录下加上callbacklog目录日志文件名为.log的格式。看下TriggerCallbackThread类初始化此守护线程的源码 public void start() {// valid//检查是否有调用调度中心的客户端if (XxlJobExecutor.getAdminBizList() null) {logger.warn( xxl-job, executor callback config fail, adminAddresses is null.);return;}// retry//重试回调上面回调线程triggerCallbackThread调用失败的记录按休眠时间进行循环triggerRetryCallbackThread new Thread(new Runnable() {Overridepublic void run() {while(!toStop){try {//重试反馈一开始进行反馈并失败的记录retryFailCallbackFile();} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {//默认休眠30秒TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info( xxl-job, executor retry callback thread destroy.);}});//设置为守护线程triggerRetryCallbackThread.setDaemon(true);//启动线程triggerRetryCallbackThread.start();}没有调用调度中心客户端则不进行线程创建重试失败反馈守护默认休眠30秒循环调用重试失败反馈方法retryFailCallbackFile看下它的源码 //重试反馈一开始进行反馈并失败的记录private void retryFailCallbackFile(){// valid//检查存放失败反馈的文件目录是否为空File callbackLogPath new File(failCallbackFilePath);if (!callbackLogPath.exists()) {return;}//callbackLogPath是一个目录若是一个文件则删除此文件if (callbackLogPath.isFile()) {callbackLogPath.delete();}//callbackLogPath是一个目录、并且此目录下有文件才放行if (!(callbackLogPath.isDirectory() callbackLogPath.list()!null callbackLogPath.list().length0)) {return;}// load and clear file, retry//遍历处理回调错误日志for (File callbaclLogFile: callbackLogPath.listFiles()) {//把文件转成byte数组byte[] callbackParamList_bytes FileUtil.readFileContent(callbaclLogFile);// avoid empty file//若是空文件则删除if(callbackParamList_bytes null || callbackParamList_bytes.length 1){callbaclLogFile.delete();continue;}//把byte数组转成list集合一开始就是把list集合转成byte数组存放到文件中的现在就是反向转一下ListHandleCallbackParam callbackParamList (ListHandleCallbackParam) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);//删除文件callbaclLogFile.delete();//调用反馈的方法doCallback(callbackParamList);}}检查存放失败反馈的文件目录是否为空不为空则遍历处理反馈失败日志若是空文件则删除从日志文件中读出byte数组把byte数组转成list集合一开始就是把list集合转成byte数组存放到文件中的现在就是反向转一下拿到记录后调用反馈方法。
8.初始化守护线程并创建netty服务监听端口调用处理调用的线程池
执行器使用netty服务来接收调度中心的调用netty是非常优秀的异步、基于事件驱动的网络应用框架。netty收到调用使用线程池来处理调用的具体实现。来看下初始化的入口initEmbedServer方法源码 private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {// fill ip port//监听端口有配置则使用配置的端口没有配置则查找一个没有被占用的端口port port0?port: NetUtil.findAvailablePort(9999);//执行器的ip有配置则使用配置没有配置则获取本地ip地址ip (ip!nullip.trim().length()0)?ip: IpUtil.getIp();// generate address//若是本地机器的地址没有配置则使用上面获取到的本地ip、本地端口组织address有配置则使用配置的地址if (addressnull || address.trim().length()0) {//得到ip端口的连接信息String ip_port_address IpUtil.getIpPort(ip, port); // registry-addressdefault use address to registry , otherwise use ip:port if address is null//组织address地址信息address http://{ip_port}/.replace({ip_port}, ip_port_address);}// accessToken//没有加token信息则输出警告日志信息if (accessTokennull || accessToken.trim().length()0) {logger.warn( xxl-job accessToken is empty. To ensure system security, please set the accessToken.);}// start//创建一个基于netty的监听服务器监听port端口embedServer new EmbedServer();//启动此监听服务器创建一个守护线程创建一个netty服务监听port端口创建一个自定义处理器来处理netty服务被调用时的响应处理类//使用线程池来处理netty的服务调用根据服务请求的uri来具体处理调用请求处理结束后向调用方响应处理结果//把当前执行器注册到调度中心中embedServer.start(address, port, appname, accessToken);}执行器自己的ip和netty的端口支持在application.properties配置文件中配置若是没有配置ip则支持获取自身的ip地址支持ipv4和ipv6的网络netty监听的端口若是没有配置则从9999到65535之间使用线性探测法找到一个没有被占用的端口。注意netty的端口和执行器项目的端口不是一回事。创建一个EmbedServer类调用它的start方法看下start方法源码 public void start(final String address, final int port, final String appname, final String accessToken) {//创建执行器处理具体调用的类executorBiz new ExecutorBizImpl();//创建一个守护线程thread new Thread(new Runnable() {Overridepublic void run() {// param//bossGroup线程组用于监听客户端的连接EventLoopGroup bossGroup new NioEventLoopGroup();//workerGroup线程组用于处理连接读写事件EventLoopGroup workerGroup new NioEventLoopGroup();//创建线程池处理netty服务的调用ThreadPoolExecutor bizThreadPool new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,new LinkedBlockingQueueRunnable(2000),new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {return new Thread(r, xxl-job, EmbedServer bizThreadPool- r.hashCode());}},new RejectedExecutionHandler() {Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {throw new RuntimeException(xxl-job, EmbedServer bizThreadPool is EXHAUSTED!);}});try {// start server//创建nettyServerBootstrap bootstrap new ServerBootstrap();//设置netty属性bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) //使用非阻塞的服务端信道类型.childHandler(new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel channel) throws Exception { //处理连接、读写事件的处理类channel.pipeline()//使用addLast向netty的channel信道中注册handler.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle 读空闲时长、写空闲时长、读写空闲时长、单位.addLast(new HttpServerCodec())//服务器的编解码器遵从http协议HttpServerCodec类已经包含了HttpRequestDecoder解码器, HttpResponseEncoder编码器.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request reponse to FULL netty提供的http消息聚合器通过它可以把HttpMessage和HttpContent聚合成一个完整的FullHttpRequest或FullHttpResponse.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));//自定义处理器当监听的端口被调用时使用自定义处理器进行具体的实现}}).childOption(ChannelOption.SO_KEEPALIVE, true);//启用心跳保活机制Tcp会监控连接是否有效当连接处于空闲状态超过了2个小时本地的tcp会发送一个数据包给远程的Socket如果远程没有响应则Tcp会持续尝试11分钟直到响应为止若是12分钟还是没有响应则tcp会尝试关闭此Socket连接// bind//绑定监听的信道端口ChannelFuture future bootstrap.bind(port).sync();logger.info( xxl-job remoting server start success, nettype {}, port {}, EmbedServer.class, port);// start registry//把当前执行器注册到调度中心中startRegistry(appname, address);// wait util stop//防止代码运行结束调用finally中定义的关闭netty的方法一直阻塞着防止进程结束future.channel().closeFuture().sync();} catch (InterruptedException e) {logger.info( xxl-job remoting server stop.);} catch (Exception e) {logger.error( xxl-job remoting server error., e);} finally {// stoptry {//关闭netty的线程组workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}}}});//设置为守护线程用户线程结束-》守护线程结束-》jvm结束thread.setDaemon(true); // daemon, service jvm, user thread leave daemon leave jvm leave//启动线程thread.start();}start方法里面创建executorBiz当有调度的时候由此类来进行具体的任务实现创建一个守护线程线程的run方法里面创建了bizThreadPool线程池此线程池用来处理调度任务创建netty服务并绑定netty服务监听的端口号创建EmbedHttpServerHandler类来处理netty被调用的处理绑定HttpObjectAggregator使用FullHttpRequest来接收参数。看下netty核心处理类EmbedHttpServerHandler的源码 public static class EmbedHttpServerHandler extends SimpleChannelInboundHandlerFullHttpRequest {private static final Logger logger LoggerFactory.getLogger(EmbedHttpServerHandler.class);private ExecutorBiz executorBiz; //处理netty调用的实现类private String accessToken; //token值private ThreadPoolExecutor bizThreadPool;//线程池public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {this.executorBiz executorBiz;this.accessToken accessToken;this.bizThreadPool bizThreadPool;}//继承了SimpleChannelInboundHandler则重写他的channelRead0方法当netty监听的端口被调用时会调用到自定义处理类的channelRead0方法Overrideprotected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {// request parse//final byte[] requestBytes ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);//获取请求的参数信息String requestData msg.content().toString(CharsetUtil.UTF_8);//获取请求的结尾地址String uri msg.uri();//请求方式HttpMethod httpMethod msg.method();//复用tcp连接boolean keepAlive HttpUtil.isKeepAlive(msg);//从请求头中根据key获取token信息String accessTokenReq msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);// invoke//使用线程池执行此任务bizThreadPool.execute(new Runnable() {Overridepublic void run() {// do invoke//执行请求处理Object responseObj process(httpMethod, uri, requestData, accessTokenReq);// to json//执行结果转成json格式字符串String responseJson GsonTool.toJson(responseObj);// write response//把执行结果向调用端响应writeResponse(ctx, keepAlive, responseJson);}});}//执行请求处理private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// valid//只支持post方式if (HttpMethod.POST ! httpMethod) {return new ReturnTString(ReturnT.FAIL_CODE, invalid request, HttpMethod not support.);}//结尾地址为空if (uri null || uri.trim().length() 0) {return new ReturnTString(ReturnT.FAIL_CODE, invalid request, uri-mapping empty.);}//比对请求方传递的token值是否正确if (accessToken ! null accessToken.trim().length() 0 !accessToken.equals(accessTokenReq)) {return new ReturnTString(ReturnT.FAIL_CODE, The access token is wrong.);}// services mappingtry {//根据请求的结尾地址调用对应的方法进行处理switch (uri) {case /beat://调度中心进行心跳检测return executorBiz.beat();case /idleBeat://调度中心检测执行器是否忙碌IdleBeatParam idleBeatParam GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);case /run://调度中心调度执行器执行任务TriggerParam triggerParam GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);case /kill://调度中心调度执行器停止任务处理KillParam killParam GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);case /log://调度中心查询执行日志信息LogParam logParam GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);default:return new ReturnTString(ReturnT.FAIL_CODE, invalid request, uri-mapping( uri ) not found.);}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnTString(ReturnT.FAIL_CODE, request error: ThrowableUtil.toString(e));}}/*** write response*/private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {// write response//响应的结果值FullHttpResponse response new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson)//设置响应头部格式response.headers().set(HttpHeaderNames.CONTENT_TYPE, text/html;charsetUTF-8); // HttpHeaderValues.TEXT_PLAIN.toString()response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());if (keepAlive) {response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);}//使用信道的上下文向请求方写入、刷洗响应信息ctx.writeAndFlush(response);}Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {logger.error( xxl-job provider netty_http server caught exception, cause);ctx.close();}Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {ctx.channel().close(); // beat 3N, close if idlelogger.debug( xxl-job provider netty_http server close an idle channel.);} else {super.userEventTriggered(ctx, evt);}}}EmbedHttpServerHandler类是netty自定义处理类因为netty的信道绑定了new HttpObjectAggregator所以使用FullHttpRequest来接收参数继承了SimpleChannelInboundHandler则重写他的channelRead0方法当netty监听的端口被调用时会调用到自定义处理类的channelRead0方法使用bizThreadPool线程池处理此请求使用FullHttpRequest来获取请求的参数、uri、token等信息对token值进行校验匹配uri地址对每个uri应该做的响应由executorBiz来处理把响应结果使用信道的上下文向请求方写入、刷洗响应信息。
9.初始化注册执行器在线情况的守护线程
在初始化netty结束后有一行调用执行器注册到调度中心的代码startRegistry(appname, address)此方法会创建一个守护线程看下startRegistry源码 public void startRegistry(final String appname, final String address) {// start registry//把当前执行器注册到调度中心中ExecutorRegistryThread.getInstance().start(appname, address);}执行注册的具体类是ExecutorRegistryThread看下它的start源码 public void start(final String appname, final String address){// valid//校验执行器名称不能为空if (appnamenull || appname.trim().length()0) {logger.warn( xxl-job, executor registry config fail, appname is null.);return;}//校验调用调度中心的客户端不能为空if (XxlJobExecutor.getAdminBizList() null) {logger.warn( xxl-job, executor registry config fail, adminAddresses is null.);return;}//创建注册线程registryThread new Thread(new Runnable() {Overridepublic void run() {// registry//当停止的之后才跳出while循环while (!toStop) {try {//构造注册请求参数RegistryParam registryParam new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);//遍历所有的调用调度中心的客户端向所有的调度中心注册上此执行器for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//使用具体的实现类AdminBizClient调用注册方法ReturnTString registryResult adminBiz.registry(registryParam);//执行器调用调度中心的注册方法成功if (registryResult!null ReturnT.SUCCESS_CODE registryResult.getCode()) {registryResult ReturnT.SUCCESS;logger.debug( xxl-job registry success, registryParam:{}, registryResult:{}, new Object[]{registryParam, registryResult});break;} else {logger.info( xxl-job registry fail, registryParam:{}, registryResult:{}, new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info( xxl-job registry error, registryParam:{}, registryParam, e);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {//默认休眠30秒继续向调度中心中注册当前执行器在线的信息心跳的方式TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (InterruptedException e) {if (!toStop) {logger.warn( xxl-job, executor registry thread interrupted, error msg:{}, e.getMessage());}}}// registry remove//删除执行器注册信息当程序停止或者调用了stop方法之后会跳出上面的while循环try {//构造删除注册请求参数RegistryParam registryParam new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);//遍历所有的调用调度中心的客户端向所有的调度中心删除此执行器for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//使用具体的实现类AdminBizClient调用删除执行器的方法ReturnTString registryResult adminBiz.registryRemove(registryParam);if (registryResult!null ReturnT.SUCCESS_CODE registryResult.getCode()) {registryResult ReturnT.SUCCESS;logger.info( xxl-job registry-remove success, registryParam:{}, registryResult:{}, new Object[]{registryParam, registryResult});break;} else {logger.info( xxl-job registry-remove fail, registryParam:{}, registryResult:{}, new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info( xxl-job registry-remove error, registryParam:{}, registryParam, e);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info( xxl-job, executor registry thread destroy.);}});//设置为守护线程registryThread.setDaemon(true);registryThread.setName(xxl-job, executor ExecutorRegistryThread);//启动线程registryThread.start();}registryThread注册守护线程默认休眠30秒循环向调度中心执行注册方法告诉调度中心自己还在线。当线程停止即toStop为true时向调度中心发起移除注册的请求即告诉调度中心自己下线了。
10.初始化资源汇总说明图 二.主动发起请求
执行器需要与调度中心进行交互执行器主动发起请求包含的方法可以从它的客户端类AdminBizClient看出此类存在xxl-job的公共核心xxl-job-core工程中目录结构为com.xxl.job.core.biz.client从类里面可以看到包含callback、registry、registryRemove三个方法。看下AdminBizClient的源码
/*** admin api test* 执行器-》调用调度中心的客户端供执行器使用* author xuxueli 2017-07-28 22:14:52*/
public class AdminBizClient implements AdminBiz {public AdminBizClient() {}public AdminBizClient(String addressUrl, String accessToken) {this.addressUrl addressUrl;this.accessToken accessToken;// validif (!this.addressUrl.endsWith(/)) {this.addressUrl this.addressUrl /;}}private String addressUrl ;private String accessToken;private int timeout 3;//此方法为执行器-》调度中心的反馈方法Overridepublic ReturnTString callback(ListHandleCallbackParam callbackParamList) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrlapi/callback, accessToken, timeout, callbackParamList, String.class);}//此方法为执行器-》调度中心的注册方法把当前执行器注册到调度中心中Overridepublic ReturnTString registry(RegistryParam registryParam) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl api/registry, accessToken, timeout, registryParam, String.class);}//此方法为执行器-》调度中心的删除执行器方法把当前执行器从调度中心注册列表中删除Overridepublic ReturnTString registryRemove(RegistryParam registryParam) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl api/registryRemove, accessToken, timeout, registryParam, String.class);}}此客户端类提供了3个调用调度中心的方法当方法被调用时会拼接远程调度中心的接口地址进行远程调用。有多少个调度中心就有多少个调用客户端类在程序初始化的时候这些调度中心客户端类已经初始化好放到list集合中当需要调用的时候直接遍历这些客户端集合对每个客户端执行调用方法即可。获取调用客户端集合并遍历调用的样例源码 for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//使用具体的实现类AdminBizClient调用删除执行器的方法ReturnTString registryResult adminBiz.registryRemove(registryParam);if (registryResult!null ReturnT.SUCCESS_CODE registryResult.getCode()) {registryResult ReturnT.SUCCESS;logger.info( xxl-job registry-remove success, registryParam:{}, registryResult:{}, new Object[]{registryParam, registryResult});break;} else {logger.info( xxl-job registry-remove fail, registryParam:{}, registryResult:{}, new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info( xxl-job registry-remove error, registryParam:{}, registryParam, e);}}}//获取调用客户端public static ListAdminBiz getAdminBizList(){return adminBizList;}1.callback调用
callback调用是执行器把任务的执行结果值反馈给调度中心在源码中的调用位置为反馈守护线程查询到反馈队列里面有值或者反馈失败重试守护线程检测到有反馈失败的记录时发起的反馈调用。看下反馈的调用入口源码 while(!toStop){try {//使用take方法出队take和put方法不互斥读写分离分别使用takeLock/putLock进行加锁HandleCallbackParam callback getInstance().callBackQueue.take();//回调参数类不为空则处理回调if (callback ! null) {// callback list param//定义一个集合接收callBackQueue队列中的所有回调类ListHandleCallbackParam callbackParamList new ArrayListHandleCallbackParam();//drainTo方法为把callBackQueue队列中的所有值转移到新的callbackParamList集合中经过此方法调用此时callBackQueue为空callbackParamList接收到队列里面的所有元素int drainToNum getInstance().callBackQueue.drainTo(callbackParamList);//一开始出队列的的对象也要加入到集合中callbackParamList.add(callback);// callback, will retry if errorif (callbackParamList!null callbackParamList.size()0) {//处理反馈doCallback(callbackParamList);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}守护线程从callBackQueue反馈队列里面取值当有需要反馈的记录则取出callBackQueue队列里面的所有值调用反馈方法处理反馈的方法为doCallback看下它的源码 private void doCallback(ListHandleCallbackParam callbackParamList){boolean callbackRet false;// callback, will retry if error//遍历调用调度中心的客户端for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//每个客户端都进行调用ReturnTString callbackResult adminBiz.callback(callbackParamList);//回调成功if (callbackResult!null ReturnT.SUCCESS_CODE callbackResult.getCode()) {callbackLog(callbackParamList, br----------- xxl-job job callback finish.);callbackRet true;//有一个调度中心执行成功了则退出反馈调用只用调用到一个就行break;} else {//回调失败callbackLog(callbackParamList, br----------- xxl-job job callback fail, callbackResult: callbackResult);}} catch (Exception e) {//回调错误callbackLog(callbackParamList, br----------- xxl-job job callback error, errorMsg: e.getMessage());}}//回调有失败的情况if (!callbackRet) {//把这些失败回调都追加到回调失败日志中appendFailCallbackFile(callbackParamList);}}此方法遍历所有调用调度中心的客户端执行反馈信息调用若是反馈失败则把这些反馈记录写到反馈失败磁盘目录下把反馈的日志信息写入到执行器日志文件中。这里分析一下写入到执行日志的方法callbackLog源码 private void callbackLog(ListHandleCallbackParam callbackParamList, String logContent){for (HandleCallbackParam callbackParam: callbackParamList) {//根据日期、日志id创建日志文件的存放目录使用日期格式xxxx-xx-xx)得到日志文件名logId.logString logFileName XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId());//使用InheritableThreadLocal记录日志文件名的线程内部变量XxlJobContext.setXxlJobContext(new XxlJobContext(-1,null,logFileName,-1,-1));XxlJobHelper.log(logContent);}}每一条反馈记录都对应着一次任务调度使用任务的调度时间日志id即可组织出执行日志的结尾目录和文件名。创建一个XxlJobContext实体接收文件名此XxlJobContext类提供一个setXxlJobContext的方法看下此方法的源码 //使用InheritableThreadLocal来作为线程内部变量与ThreadLocal相比InheritableThreadLocal可以在子线程中调用到父线程的线程内部变量private static InheritableThreadLocalXxlJobContext contextHolder new InheritableThreadLocalXxlJobContext(); // support for child thread of job handler)public static void setXxlJobContext(XxlJobContext xxlJobContext){contextHolder.set(xxlJobContext);}public static XxlJobContext getXxlJobContext(){return contextHolder.get();}
设置进来的XxlJobContext对象使用InheritableThreadLocal修饰把此变量设置为线程变量这样在此线程处理的后面直接调用get方法即可获取到这个线程前面流程设置的XxlJobContext对象是线程隔离的。看下它写入日志的方法XxlJobHelper.log(logContent)源码 public static boolean log(String appendLogPattern, Object ... appendLogArguments) {//按格式进行占位符号的替代FormattingTuple ft MessageFormatter.arrayFormat(appendLogPattern, appendLogArguments);//获取到日志信息String appendLog ft.getMessage();/*appendLog appendLogPattern;if (appendLogArguments!null appendLogArguments.length0) {appendLog MessageFormat.format(appendLogPattern, appendLogArguments);}*///获取调用者的堆栈信息可以获取到调用者的类名callInfo.getClassName()、方法名callInfo.getMethodName()StackTraceElement callInfo new Throwable().getStackTrace()[1];//处理日志详情return logDetail(callInfo, appendLog);}按格式进行占位符号的替代获取到日志信息获取调用者的堆栈信息可以获取到调用者的类名callInfo.getClassName()、方法名callInfo.getMethodName()在追加日志的时候需要带上调用的类名等信息。看下处理日志详情的方法logDetail源码 private static boolean logDetail(StackTraceElement callInfo, String appendLog) {//从InheritableThreadLocal中获取到内部线程变量值获取到上面设置的日志文件信息XxlJobContext xxlJobContext XxlJobContext.getXxlJobContext();if (xxlJobContext null) {return false;}/*// yyyy-MM-dd HH:mm:ss [ClassName]-[MethodName]-[LineNumber]-[ThreadName] log;StackTraceElement[] stackTraceElements new Throwable().getStackTrace();StackTraceElement callInfo stackTraceElements[1];*///组织日志信息StringBuffer stringBuffer new StringBuffer();stringBuffer.append(DateUtil.formatDateTime(new Date())).append( ).append([ callInfo.getClassName() # callInfo.getMethodName() ]).append(-).append([ callInfo.getLineNumber() ]).append(-).append([ Thread.currentThread().getName() ]).append( ).append(appendLog!null?appendLog:);String formatAppendLog stringBuffer.toString();// appendlog//获取日志文件名称String logFileName xxlJobContext.getJobLogFileName();if (logFileName!null logFileName.trim().length()0) {//把日志信息追加到某个日志文件名下XxlJobFileAppender.appendLog(logFileName, formatAppendLog);return true;} else {logger.info( {}, formatAppendLog);return false;}}通过XxlJobContext.getXxlJobContext()即可获取之前设置进去的线程变量从此变量里面获取到日志的文件名组织日志信息把日志信息追加到此日志文件下。看下追加日志的方法appendLog源码 public static void appendLog(String logFileName, String appendLog) {// log fileif (logFileNamenull || logFileName.trim().length()0) {return;}File logFile new File(logFileName);//日志文件xx.log不存在则进行创建if (!logFile.exists()) {try {//创建文件logFile.createNewFile();} catch (IOException e) {logger.error(e.getMessage(), e);return;}}// logif (appendLog null) {appendLog ;}appendLog \r\n;// append file content//把日志信息追加到日志文件中FileOutputStream fos null;try {fos new FileOutputStream(logFile, true);fos.write(appendLog.getBytes(utf-8));fos.flush();} catch (Exception e) {logger.error(e.getMessage(), e);} finally {if (fos ! null) {try {fos.close();} catch (IOException e) {logger.error(e.getMessage(), e);}}}} 当反馈失败的时候需要把失败信息写入到存放反馈失败的目录文件中看下处理反馈失败的源码appendFailCallbackFile //把这些失败回调都追加到反馈失败日志中private void appendFailCallbackFile(ListHandleCallbackParam callbackParamList){// validif (callbackParamListnull || callbackParamList.size()0) {return;}// append file//将对象转成byte数组byte[] callbackParamList_bytes JdkSerializeTool.serialize(callbackParamList);//创建反馈错误日志文件-以时间为名称File callbackLogFile new File(failCallbackFileName.replace({x}, String.valueOf(System.currentTimeMillis())));//若是此文件已经存在if (callbackLogFile.exists()) {/*for (int i 0; i 100; i) {callbackLogFile new File(failCallbackFileName.replace({x}, String.valueOf(System.currentTimeMillis()).concat(-).concat(String.valueOf(i)) ));if (!callbackLogFile.exists()) {break;}}*///使用时间序号的方式获取到唯一的文件名int fileIndex 0;while(true) {callbackLogFile new File(failCallbackFileName.replace({x}, String.valueOf(System.currentTimeMillis()).concat(-).concat(String.valueOf(fileIndex)) ));if (!callbackLogFile.exists()) {break;}}}//把错误反馈日志文件写入到错误日志中FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);}把反馈失败的集合转成byte数组创建反馈失败日志文件-以时间为名称若是同一时间下有重复则加上序号处理把反馈失败日志信息写入到失败日志文件中。这样在重试反馈失败记录的守护线程下次执行的时候就能加载到此失败记录并进行重试反馈。
2.registry调用
registry调用为执行器调用调度中心更新执行器最新在线时间调度中心收到请求后会更新xxl_job_registry表的update_time字段这样在调度中心进行定时清理离线执行器时不会把此执行器删除。源码中的调用位置为 while (!toStop) {try {//构造注册请求参数RegistryParam registryParam new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);//遍历所有的调用调度中心的客户端向所有的调度中心注册上此执行器for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//使用具体的实现类AdminBizClient调用注册方法ReturnTString registryResult adminBiz.registry(registryParam);//执行器调用调度中心的注册方法成功if (registryResult!null ReturnT.SUCCESS_CODE registryResult.getCode()) {registryResult ReturnT.SUCCESS;logger.debug( xxl-job registry success, registryParam:{}, registryResult:{}, new Object[]{registryParam, registryResult});break;} else {logger.info( xxl-job registry fail, registryParam:{}, registryResult:{}, new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info( xxl-job registry error, registryParam:{}, registryParam, e);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {//默认休眠30秒继续向调度中心中注册当前执行器在线的信息心跳的方式TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (InterruptedException e) {if (!toStop) {logger.warn( xxl-job, executor registry thread interrupted, error msg:{}, e.getMessage());}}}执行器默认休眠30秒循环调用注册方法。
3.registryRemove调用
当执行器下线后需要通知调度中心从xxl_job_registry表删除此执行器的注册记录这样在守护线程下次检查某个任务组的在线执行器时能够及时的把此执行器剔除。源码中的使用位置 //删除执行器注册信息当程序停止或者调用了stop方法之后会跳出上面的while循环try {//构造注册请求参数RegistryParam registryParam new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);//遍历所有的调用调度中心的客户端向所有的调度中心删除此执行器for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//使用具体的实现类AdminBizClient调用删除执行器的方法ReturnTString registryResult adminBiz.registryRemove(registryParam);if (registryResult!null ReturnT.SUCCESS_CODE registryResult.getCode()) {registryResult ReturnT.SUCCESS;logger.info( xxl-job registry-remove success, registryParam:{}, registryResult:{}, new Object[]{registryParam, registryResult});break;} else {logger.info( xxl-job registry-remove fail, registryParam:{}, registryResult:{}, new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info( xxl-job registry-remove error, registryParam:{}, registryParam, e);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info( xxl-job, executor registry thread destroy.);}当注册执行器的守护线程被停止时就会跳出while循环然后执行移除执行器注册的方法。
三.接收请求处理
执行器接收调用中心的请求是用netty来监听的具体接收哪些请求可以从EmbedServer类的内部类EmbedHttpServerHandler中查看此类存在xxl-job的公共核心xxl-job-core工程中目录结构为com.xxl.job.core.server具体的请求可以从内部类EmbedHttpServerHandler的process方法看出包含beat、idleBeat、run、kill、log五个方法。看下process方法源码 private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// valid//只支持post方式if (HttpMethod.POST ! httpMethod) {return new ReturnTString(ReturnT.FAIL_CODE, invalid request, HttpMethod not support.);}//结尾地址为空if (uri null || uri.trim().length() 0) {return new ReturnTString(ReturnT.FAIL_CODE, invalid request, uri-mapping empty.);}//比对请求方传递的token值是否正确if (accessToken ! null accessToken.trim().length() 0 !accessToken.equals(accessTokenReq)) {return new ReturnTString(ReturnT.FAIL_CODE, The access token is wrong.);}// services mappingtry {//根据请求的结尾地址调用对应的方法进行处理switch (uri) {case /beat://调度中心进行心跳检测return executorBiz.beat();case /idleBeat://调度中心检测执行器是否忙碌IdleBeatParam idleBeatParam GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);case /run://调度中心调度执行器执行任务TriggerParam triggerParam GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);case /kill://调度中心调度执行器停止任务处理KillParam killParam GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);case /log://调度中心查询执行日志信息LogParam logParam GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);default:return new ReturnTString(ReturnT.FAIL_CODE, invalid request, uri-mapping( uri ) not found.);}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnTString(ReturnT.FAIL_CODE, request error: ThrowableUtil.toString(e));}}方法只支持post方式具体处理类executorBiz的具体实现是ExecutorBizImpl类在程序启动初始化时已经创建。
1.beat请求
beat请求是调度中心确认执行器是否在线的接口若是能正常调通则表示执行器在线若是调不通则表示执行器已经离线是调度中心在使用故障转移路由模式时会调用。看下beat源码 Overridepublic ReturnTString beat() {return ReturnT.SUCCESS;}直接返回成功能调通就是成功。
2.idleBeat请求
idleBeat请求是调度中心确实执行器是否忙碌的接口当执行器还在处理此任务的上一次调度那这次调度就不选择此执行器处理这是调度中心使用忙碌转移路由模式时会调用。看下idleBeat源码 //响应调度中心确认执行器是否忙碌Overridepublic ReturnTString idleBeat(IdleBeatParam idleBeatParam) {// isRunningOrHasQueueboolean isRunningOrHasQueue false;//根据任务id获取处理此任务的线程类JobThread jobThread XxlJobExecutor.loadJobThread(idleBeatParam.getJobId());//线程类存在且正在运行或者还有未处理完的任务队列if (jobThread ! null jobThread.isRunningOrHasQueue()) {//标记为trueisRunningOrHasQueue true;}//为true表示此执行器现在正在处理这个任务的上一次调度if (isRunningOrHasQueue) {return new ReturnTString(ReturnT.FAIL_CODE, job thread is running or has trigger queue.);}return ReturnT.SUCCESS;}判断是否有线程正在执行此任务若是有则返回忙碌若是没有则返回成功。根据任务id获取对应线程类和是否忙碌的介绍放在run请求中。
3.run请求
run请求是执行器响应调度中心运行任务的接口对具体的任务进行执行。看下run源码 //响应调度中心执行任务Overridepublic ReturnTString run(TriggerParam triggerParam) {// load oldjobHandler jobThread//根据任务id获取任务线程类从jobThreadRepository中获取key任务idvalue任务线程类JobThread jobThread XxlJobExecutor.loadJobThread(triggerParam.getJobId());//从任务线程类获取绑定的任务处理器IJobHandler jobHandler jobThread!null?jobThread.getHandler():null;String removeOldReason null;// validjobHandler jobThread//获取任务的运行模式GlueTypeEnum glueTypeEnum GlueTypeEnum.match(triggerParam.getGlueType());//bean模式if (GlueTypeEnum.BEAN glueTypeEnum) {// new jobhandler//获取执行器任务handler使用XxlJob修饰的值从集合jobHandlerRepository中获取keyXxlJob注解的value值value此任务执行的对象包含Bean对象执行的方法、初始方法、销毁方法//程序启动的时候所有被XxlJob修饰的处理类都添加到jobHandlerRepository集合中了IJobHandler newJobHandler XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());// valid old jobThread//上一次此任务id绑定的任务处理器不等于此次执行的任务处理器if (jobThread!null jobHandler ! newJobHandler) {// change handler, need kill old threadremoveOldReason change jobhandler or glue type, and terminate the old job thread.;//线程设置为nulljobThread null;//线程绑定的处理器也设置为nulljobHandler null;}// valid handler//给任务处理器重新赋值if (jobHandler null) {jobHandler newJobHandler;if (jobHandler null) {return new ReturnTString(ReturnT.FAIL_CODE, job handler [ triggerParam.getExecutorHandler() ] not found.);}}} else if (GlueTypeEnum.GLUE_GROOVY glueTypeEnum) {// valid old jobThreadif (jobThread ! null !(jobThread.getHandler() instanceof GlueJobHandler ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()triggerParam.getGlueUpdatetime() )) {// change handler or gluesource updated, need kill old threadremoveOldReason change job source or glue type, and terminate the old job thread.;jobThread null;jobHandler null;}// valid handlerif (jobHandler null) {try {IJobHandler originJobHandler GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());jobHandler new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnTString(ReturnT.FAIL_CODE, e.getMessage());}}} else if (glueTypeEnum!null glueTypeEnum.isScript()) {// valid old jobThreadif (jobThread ! null !(jobThread.getHandler() instanceof ScriptJobHandler ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()triggerParam.getGlueUpdatetime() )) {// change script or gluesource updated, need kill old threadremoveOldReason change job source or glue type, and terminate the old job thread.;jobThread null;jobHandler null;}// valid handlerif (jobHandler null) {jobHandler new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));}} else {return new ReturnTString(ReturnT.FAIL_CODE, glueType[ triggerParam.getGlueType() ] is not valid.);}// executor block strategy//任务id对应的线程不为空if (jobThread ! null) {//获取阻塞处理策略ExecutorBlockStrategyEnum blockStrategy ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);//丢弃后续调度if (ExecutorBlockStrategyEnum.DISCARD_LATER blockStrategy) {// discard when running//线程正在运行或队列里面还有任务则丢弃此次任务调度if (jobThread.isRunningOrHasQueue()) {return new ReturnTString(ReturnT.FAIL_CODE, block strategy effectExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY blockStrategy) {//覆盖之前调度// kill running jobThread//线程正在运行或队列里面还有任务则覆盖之前调度if (jobThread.isRunningOrHasQueue()) {removeOldReason block strategy effect ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();//任务线程设置为nulljobThread null;}} else {// just queue trigger}}// replace thread (new or exists invalid)//经过上面的校验处理此任务id对应的任务线程类还是为空if (jobThread null) {jobThread XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}// push data to queue//把任务放到调度队列里面ReturnTString pushResult jobThread.pushTriggerQueue(triggerParam);//返回调度结果return pushResult;}当执行器收到调度任务请求时会看能不能复用线程处理类新建的线程处理类会放到map集合中key任务idvalue任务线程类看下根据任务id获取任务线程类的方法loadJobThread源码 //存放任务、任务线程类集合key任务idvalue任务线程类private static ConcurrentMapInteger, JobThread jobThreadRepository new ConcurrentHashMapInteger, JobThread();//根据任务id加载此任务的处理线程类public static JobThread loadJobThread(int jobId){return jobThreadRepository.get(jobId);}使用ConcurrentMap来存放创建的线程处理类JobThread使用之前先看能否复用已经创建过并且还存在则复用。JobThread类绑定了它的处理类handler当能通过任务id获取到JobThread类则可以获取到handler类handler类是程序启动时解析XxlJob注解初始化好的处理任务方法的封装。看下JobThread类的部分源码
public class JobThread extends Thread{private static Logger logger LoggerFactory.getLogger(JobThread.class);private int jobId; //任务idprivate IJobHandler handler;//处理器private LinkedBlockingQueueTriggerParam triggerQueue; //存放执行任务的阻塞队列private SetLong triggerLogIdSet; //去重调度日志private volatile boolean toStop false;private String stopReason;private boolean running false; // if running jobprivate int idleTimes 0; // 停止线程的中断标识
}JobThread继承了Thread可以使用到线程的特性进行方法的运行绑定了任务id、处理器使用阻塞队列存放待处理的任务。
获取任务的运行模式例如bean模式是使用XxlJob来进行具体任务的实现shell模式是可执行文件的方式来实现根据运行模式来创建IJobHandler。IJobHandler是一个抽象父类它的子类包含3个截图如下
我们这里就分析bean这种模式其他模式类似看下处理bean这种模式的源码 if (GlueTypeEnum.BEAN glueTypeEnum) {// new jobhandler//获取执行器任务handler使用XxlJob修饰的值从集合jobHandlerRepository中获取keyXxlJob注解的value值value此任务执行的对象包含Bean对象执行的方法、初始方法、销毁方法//程序启动的时候所有的被XxlJob修饰的处理类都添加到jobHandlerRepository集合中了IJobHandler newJobHandler XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());// valid old jobThread//上一次此任务id绑定的任务处理器不等于此次执行的任务处理器if (jobThread!null jobHandler ! newJobHandler) {// change handler, need kill old threadremoveOldReason change jobhandler or glue type, and terminate the old job thread.;//线程设置为nulljobThread null;//线程绑定的处理器也设置为nulljobHandler null;}// valid handler//给任务处理器重新赋值if (jobHandler null) {jobHandler newJobHandler;if (jobHandler null) {return new ReturnTString(ReturnT.FAIL_CODE, job handler [ triggerParam.getExecutorHandler() ] not found.);}}}根据任务的jobHandler值从程序启动就初始化好的被XxlJob修饰的处理方法中匹配到此jobHandler对应的handler看下获取handler的源码 //job处理器集合keyXxlJob注解的value值value此任务执行的对象包含Bean对象执行的方法、初始方法、销毁方法 private static ConcurrentMapString, IJobHandler jobHandlerRepository new ConcurrentHashMapString, IJobHandler();public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info( xxl-job register jobhandler success, name:{}, jobHandler:{}, name, jobHandler);//把任务添加处理器集合中后续当需要处理某个XxlJob定义的任务时直接从jobHandlerRepository集合用key取出直接调用它的执行方法即可return jobHandlerRepository.put(name, jobHandler);}从jobHandlerRepository集合中根据key获取handler。
当能复用JobThread但是上传任务绑定的执行handler不等于这次的handler就是说上次此任务设置的jobHandler为test1这次设置的为test2对于这样的情况需要重新创建一个新的JobThread类使jobThread等于null后面会判断jobThread为null则进行JobThread创建。
当能复用JobThread说明可能上次的调度还没有处理完成此时需要根据配置的阻塞处理策略来进行处理。当策略为丢弃后续调度且任务线程正在运行或者任务队列里面还有未处理的任务则不执行这次调度丢弃此次调度优先保证上次调度执行完成当策略为覆盖之前调度且任务线程正在运行或者任务队列里面还有未处理的任务则使jobThread为null在重新创建JobThread的时候会对任务id之间绑定的JobThread进行中断这样就能达到覆盖之前调度。源码中的体现 if (jobThread ! null) {//获取阻塞处理策略ExecutorBlockStrategyEnum blockStrategy ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);//丢弃后续调度if (ExecutorBlockStrategyEnum.DISCARD_LATER blockStrategy) {// discard when running//线程正在运行或队列里面还有任务则丢弃此次任务调度if (jobThread.isRunningOrHasQueue()) {return new ReturnTString(ReturnT.FAIL_CODE, block strategy effectExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY blockStrategy) {//覆盖之前调度// kill running jobThread//线程正在运行或队列里面还有任务则覆盖之前调度if (jobThread.isRunningOrHasQueue()) {removeOldReason block strategy effect ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();//任务线程设置为nulljobThread null;}} else {// just queue trigger}} 经过上面的校验若是没法复用JobThread需要新创建一个来看创建JobThread的源码 if (jobThread null) {jobThread XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}创建方法在XxlJobExecutor类的registJobThread看下它的源码 //存放任务、任务线程集合key任务idvalue任务线程类private static ConcurrentMapInteger, JobThread jobThreadRepository new ConcurrentHashMapInteger, JobThread();//注册一个任务线程public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread new JobThread(jobId, handler);//启动线程开始运行JobThread重写的run方法newJobThread.start();logger.info( xxl-job regist JobThread success, jobId:{}, handler:{}, new Object[]{jobId, handler});//ConcurrentMap的put方法当key重复的时候会返回旧的值但是会把新的值进行覆盖putIfAbsent是key重复则返回旧的值但是不进行覆盖JobThread oldJobThread jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, maps put method return the old value!!!//当新建的任务线程已经存在则把原来的线程中断if (oldJobThread ! null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}
创建了一个JobThread类并绑定了它的任务id和handlerJobThread类继承了Thread调用start方法即会执行JobThread类里的run方法。把创建的JobThread存放到map中使用ConcurrentMap的put方法当key重复的时候会返回旧的值但是会把新的值进行覆盖当有旧值时把旧的线程进行中断这样就达到了阻塞处理策略为覆盖之前调度的需求。
处理好JobThread类后往它的任务队列里面存此次的调度源码为 ReturnTString pushResult jobThread.pushTriggerQueue(triggerParam);看下pushTriggerQueue源码方法 public ReturnTString pushTriggerQueue(TriggerParam triggerParam) {// avoid repeat//调度日志id检验是否重复if (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info( repeate trigger job, logId:{}, triggerParam.getLogId());return new ReturnTString(ReturnT.FAIL_CODE, repeate trigger job, logId: triggerParam.getLogId());}//日志id添加到集合中triggerLogIdSet.add(triggerParam.getLogId());//调度参数实体添加到调度队列中triggerQueue.add(triggerParam);return ReturnT.SUCCESS;}任务存到任务队列triggerQueue中这次run请求处理完成 可以给执行器反馈调度结果。
因为JobThread调用了start方法会执行它的run方法看下run方法源码 //线程调用start()方法后会执行run方法Overridepublic void run() {// inittry {//先执行初始化方法handler.init();} catch (Throwable e) {logger.error(e.getMessage(), e);}// execute//不停止线程则一直执行while(!toStop){//任务运行状态设置为falserunning false;//次数加1idleTimes;TriggerParam triggerParam null;try {// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)//从阻塞队列里面移除队首元素若是当前队列没有元素则进行等待等待时间为3秒triggerParam triggerQueue.poll(3L, TimeUnit.SECONDS);//获取到元素if (triggerParam!null) {//标记任务为运行状态truerunning true;//重置次数idleTimes 0;//set集合中移除这个日志id用于去重判断triggerLogIdSet.remove(triggerParam.getLogId());// log filename, like logPath/yyyy-MM-dd/9999.log//创建执行任务的文件目录名String logFileName XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// init job context//把执行任务的变量对象设置为线程内部变量后面取参数等操作的时候可以从这这里取XxlJobContext.setXxlJobContext(xxlJobContext);// execute//添加日志会从上面设置的线程内部变量xxlJobContext中取到文件名称然后追加上日志XxlJobHelper.log(br----------- xxl-job job execute start -----------br----------- Param: xxlJobContext.getJobParam());//有设置任务超时时间if (triggerParam.getExecutorTimeout() 0) {// limit timeout//创建一个任务线程Thread futureThread null;try {//任务需要有返回值所以使用CallableFutureTaskBoolean futureTask new FutureTaskBoolean(new CallableBoolean() {Overridepublic Boolean call() throws Exception {//使用子线程处理任务的时候需要再设置一下线程变量否则拿不到上面设置的线程变量// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);//执行处理器的方法若是需要接收参数可以使用XxlJobHelper.getJobParam方法获取这个方法也是从线程内部变量XxlJobContext中获取的变量handler.execute();return true;}});futureThread new Thread(futureTask);futureThread.start();//在给定的时间内需要处理完成处理不完成抛出超时异常Boolean tempResult futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log(br----------- xxl-job job execute timeout);XxlJobHelper.log(e);// handle result//任务处理超时给线程内部变量XxlJobContext的handleCode字段设置为502XxlJobHelper.handleTimeout(job execute timeout );} finally {//中断线程futureThread.interrupt();}} else {//没有设置任务超时时间直接调用// just executehandler.execute();}// valid execute handle dataif (XxlJobContext.getXxlJobContext().getHandleCode() 0) {//xxlJobContext.setHandleCode为500并把执行错信息追加到xxlJobContext.setHandleMsgXxlJobHelper.handleFail(job handle result lost.);} else {String tempHandleMsg XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg (tempHandleMsg!nulltempHandleMsg.length()50000)?tempHandleMsg.substring(0, 50000).concat(...):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}//把日志信息追加到日志文件中使用线程内部变量从XxlJobContext中获取到当前处理任务的日志目录往日志目录中追加日志XxlJobHelper.log(br----------- xxl-job job execute end(finish) -----------br----------- Result: handleCode XxlJobContext.getXxlJobContext().getHandleCode() , handleMsg XxlJobContext.getXxlJobContext().getHandleMsg());} else {//次数大于30次并且任务队列里面没有待处理的任务则把次任务线程中断、删除if (idleTimes 30) {if(triggerQueue.size() 0) { // avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, excutor idel times over limit.);}}}} catch (Throwable e) {if (toStop) {//把日志信息追加到日志文件中使用线程内部变量从XxlJobContext中获取到当前处理任务的日志目录往日志目录中追加日志XxlJobHelper.log(br----------- JobThread toStop, stopReason: stopReason);}// handle resultStringWriter stringWriter new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg stringWriter.toString();//xxlJobContext.setHandleCode为500并把执行错信息追加到xxlJobContext.setHandleMsgXxlJobHelper.handleFail(errorMsg);//把日志信息追加到日志文件中使用线程内部变量从XxlJobContext中获取到当前处理任务的日志目录往日志目录中追加日志XxlJobHelper.log(br----------- JobThread Exception: errorMsg br----------- xxl-job job execute end(error) -----------);} finally {//调度参数不为空说明进行过处理if(triggerParam ! null) {// callback handler info//线程没有停止if (!toStop) {// commonm//向反馈队列中添加执行结果反馈线程会向调度中心进行反馈TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// is killed//处理线程停止了把反馈参数添加到反馈队列中TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason [job running, killed] ));}}}}// callback trigger request in queue//当处理线程停止而任务队列里面还有未处理完的任务则向调度中心反馈执行失败信息while(triggerQueue !null triggerQueue.size()0){TriggerParam triggerParam triggerQueue.poll();if (triggerParam!null) {// is killed//向反馈线程的队列中加入反馈参数TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason [job not executed, in the job queue, killed.]));}}// destroytry {//执行销毁方法handler.destroy();} catch (Throwable e) {logger.error(e.getMessage(), e);}logger.info( xxl-job JobThread stoped, hashCode:{}, Thread.currentThread());} 若是任务的处理方法配置了init则执行init方法从阻塞队列里面移除队首元素若是当前队列没有元素则进行等待等待时间为3秒因为是先启动的线程再往阻塞队列存的任务。当拿到要处理的任务标记任务为运行状态true重置次数idleTimes为0当idleTimes大于30次表示有30次没有获取到要处理的任务时间已经大于90秒每次取任务最多等3秒30次则最大为90秒则中断此线程类。获取任务的执行文件地址创建XxlJobContext对象接收参数还是使用它的线程内部变量方式对后续日志追加时能拿到日志文件地址。
当任务设置了执行超时时间则使用FutureTask来创建一个任务再创建一个内部线程把XxlJobContext添加到子线程中这也是为何要使用InheritableThreadLocal来修饰XxlJobContext而不是ThreadLocal的原因InheritableThreadLocal可以在子线程中调用到父线程设置的内部变量而ThreadLocal只能在一个线程内共享内部变量。使用FutureTask.get方法设置给定的时间内需要处理完成处理不完成抛出超时异常。若是没有设置超时则进行正常的调用即可把执行的结果写入到执行日志文件中。
当JobThread执行完任务任务队列中已经没有待处理的任务了空跑30次以上则进行JobThread的销毁。源码为 //次数大于30次并且任务队列里面没有待处理的任务则把次任务线程中断、删除if (idleTimes 30) {if(triggerQueue.size() 0) { // avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, excutor idel times over limit.);}}调用删除JobThread的方法源码为 //移除某个任务的处理线程并中断此线程的执行public static JobThread removeJobThread(int jobId, String removeOldReason){JobThread oldJobThread jobThreadRepository.remove(jobId);if (oldJobThread ! null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();return oldJobThread;}return null;}从map集合中删除此记录并中断线程的运行。
当任务处理完成需要往反馈队列里面存放反馈记录这个存放动作是在finally中看下源码
finally {//调度参数不为空说明进行过处理if(triggerParam ! null) {// callback handler info//线程没有停止if (!toStop) {// commonm//向反馈线程中添加执行结果反馈线程会向调度中心进行反馈TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// is killed//处理线程停止了把反馈参数添加到反馈队列中TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason [job running, killed] ));}}
}当此处从任务队列中获取到值即triggerParam不等于空当线程没有停止向反馈队列中添加执行结果反馈线程会向调度中心进行反馈若是线程已经停止了则向反馈队列里面添加处理失败的标识。
若是线程被中断了例如覆盖之前调度这样的阻塞策略则会跳出while循环而任务队列里面还有未处理完的任务则把这些任务放到反馈队列中并标记任务执行失败看下处理的源码 //当处理线程停止而任务队列里面还有未处理完的任务则向调度中心反馈执行失败信息while(triggerQueue !null triggerQueue.size()0){TriggerParam triggerParam triggerQueue.poll();if (triggerParam!null) {// is killed//向反馈线程的队列中加入回调参数TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason [job not executed, in the job queue, killed.]));}} 若是任务的处理方法配置了destroy则执行destroy方法。此时再来看判断JobThread类是否在运行或者还有未处理完的任务方法isRunningOrHasQueue源码 public boolean isRunningOrHasQueue() {//线程正在运行或者调度队列里面还有未处理完的任务return running || triggerQueue.size()0;}当任务运行时running会设置为truetriggerQueue为阻塞队列的任务集合。
这里分析下handler的execute方法为何能够执行到具体的方法先看MethodJobHandler类部分源码
public class MethodJobHandler extends IJobHandler {private final Object target; //Bean对象-包含XxlJob注解的对象private final Method method; //执行的方法private Method initMethod; //初始化方法private Method destroyMethod; //销毁方法public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {this.target target;this.method method;this.initMethod initMethod;this.destroyMethod destroyMethod;}//执行处理的方法被XxlJob修饰的方法Overridepublic void execute() throws Exception {//方法中有定义参数则执行的时候带有参数Class?[] paramTypes method.getParameterTypes();if (paramTypes.length 0) {method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types} else {method.invoke(target);}}
}target字段存放的是Bean对象即有XxlJob修饰方法的整个类并且这个类是注册为Bean对象的。method、initMethod、destroyMethod都是Method类型是通过target这个bean使用反射生成的看下部分生成源码 //获取此Bean对象的classClass? clazz bean.getClass();Method initMethod null;//注解XxlJob是否有配置init属性if (xxlJob.init().trim().length() 0) {try {//通过反射机制获取到init方法initMethod clazz.getDeclaredMethod(xxlJob.init());//方法关闭安全检查initMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException(xxl-job method-jobhandler initMethod invalid, for[ clazz # methodName ] .);}}然后使用method.invoke反射的方法执行方法。
4.kill请求
kill请求是调度中心调用执行器停止任务的处理接口用于对调度中心已经调度成功执行器还没有执行反馈的任务进行停止处理。调用的源码位置 //响应调度中心停止执行器执行某个任务Overridepublic ReturnTString kill(KillParam killParam) {// kill handlerThread, and create new one//根据任务id获取线程JobThread jobThread XxlJobExecutor.loadJobThread(killParam.getJobId());if (jobThread ! null) {//执行删除线程的方法XxlJobExecutor.removeJobThread(killParam.getJobId(), scheduling center kill job.);return ReturnT.SUCCESS;}return new ReturnTString(ReturnT.SUCCESS_CODE, job thread already killed.);}根据任务id获取到处理此任务的JobThread类然后调用停止此JobThread线程类的方法并把它从map集合中删除。
5.log请求
log请求是调度中心查看执行器执行日志的接口调度中心的日志表xxl_job_log记录着处理此次任务的执行器地址当需要查看执行日志时会调用此执行器进行响应。调用的源码 //响应调度中心获取某个任务的执行日志Overridepublic ReturnTLogResult log(LogParam logParam) {// log filename: logPath/yyyy-MM-dd/9999.logString logFileName XxlJobFileAppender.makeLogFileName(new Date(logParam.getLogDateTim()), logParam.getLogId());//根据行数读取日志LogResult logResult XxlJobFileAppender.readLog(logFileName, logParam.getFromLineNum());return new ReturnTLogResult(logResult);}根据任务的调度时间和日志id组织日志文件目录和文件名按起始行读取日志信息。 public static LogResult readLog(String logFileName, int fromLineNum){// valid log fileif (logFileNamenull || logFileName.trim().length()0) {return new LogResult(fromLineNum, 0, readLog fail, logFile not found, true);}//根据日志目录创建文件File logFile new File(logFileName);if (!logFile.exists()) {return new LogResult(fromLineNum, 0, readLog fail, logFile not exists, true);}// read fileStringBuffer logContentBuffer new StringBuffer();int toLineNum 0;LineNumberReader reader null;try {//读取文件reader new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), utf-8));String line null;while ((line reader.readLine())!null) {toLineNum reader.getLineNumber(); // [from, to], start as 1//读取的行大于起始行才作为结果if (toLineNum fromLineNum) {//逐行拼接日志记录logContentBuffer.append(line).append(\n);}}} catch (IOException e) {logger.error(e.getMessage(), e);} finally {if (reader ! null) {try {reader.close();} catch (IOException e) {logger.error(e.getMessage(), e);}}}// result//构造结果实体LogResult logResult new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false);return logResult;}四.程序结束销毁处理
程序启动的时候初始化5个守护线程、1个netty服务、1个map集合、1个list集合、1个线程池处理任务时创建的线程当程序结束的时候需要销毁这些资源。资源销毁的入口类为XxlJobSpringExecutor。
1.销毁入口类
XxlJobSpringExecutor类是销毁的入口类是因为它实现了DisposableBean接口重写了destroy方法。当bean被销毁的时候会执行destroy方法可以从这里作为销毁处理的入口。看下XxlJobSpringExecutor类销毁相关的源码
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {// 实现DisposableBean接口重写它的bean销毁方法Overridepublic void destroy() {super.destroy();}
}//注册xxlJobExecutor的beanBeanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info( xxl-job config init.);XxlJobSpringExecutor xxlJobSpringExecutor new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}XxlJobSpringExecutor类创建后使用Bean修饰注册为bean对象它还实现了DisposableBean接口重写destroy销毁方法最终调用到的是父类XxlJobExecutor的destroy方法。
2.资源销毁处理
销毁资源的处理类为XxlJobExecutor的destroy看下它的源码 //当bean销毁时调用此方法public void destroy(){// destroy executor-server//销毁netty服务停止注册线程、向调度中心调用删除此执行器stopEmbedServer();// destroy jobThreadRepository//销毁任务线程if (jobThreadRepository.size() 0) {for (Map.EntryInteger, JobThread item: jobThreadRepository.entrySet()) {JobThread oldJobThread removeJobThread(item.getKey(), web container destroy and kill the job.);// wait for job thread push result to callback queueif (oldJobThread ! null) {try {oldJobThread.join();} catch (InterruptedException e) {logger.error( xxl-job, JobThread destroy(join) error, jobId:{}, item.getKey(), e);}}}jobThreadRepository.clear();}//销毁记录着被XxlJob修饰的方法集合jobHandlerRepository.clear();// destroy JobLogFileCleanThread//销毁周期为一天的清除文件的线程JobLogFileCleanThread.getInstance().toStop();// destroy TriggerCallbackThread//销毁执行器执行反馈的线程、执行器执行失败反馈的线程TriggerCallbackThread.getInstance().toStop();}销毁资源的方法都在destroy中现在逐个介绍下销毁过程。
1stopEmbedServer()
销毁netty服务停止注册守护线程、向调度中心调用删除此执行器。看下源码 //销毁netty服务private void stopEmbedServer() {// stop provider factoryif (embedServer ! null) {try {//销毁netty服务embedServer.stop();} catch (Exception e) {logger.error(e.getMessage(), e);}}}销毁netty服务看它的stop方法源码 //销毁netty服务public void stop() throws Exception {// destroy server thread//启动时候创建的线程还存活则进行中断if (thread ! null thread.isAlive()) {//中断线程thread.interrupt();}// stop registry//停止注册stopRegistry();logger.info( xxl-job remoting server destroy success.);}初始化netty服务时创建的守护线程若是还存活则进行中断当此守护线程中断后通过此线程创建的netty服务也随之销毁在finally方法中关闭netty资源源码如下
finally {// stoptry {//关闭netty的线程组workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}
}看下停止注册服务的方法stopRegistry源码 //停止注册public void stopRegistry() {// stop registry//调用执行器注册线程类的停止方法ExecutorRegistryThread.getInstance().toStop();}停止方法最终由ExecutorRegistryThread的toStop执行看下源码 //执行器注册线程类的停止方法public void toStop() {//停止标识为true则上面使用心跳注册机制的while会跳出循环然后执行移除此执行器注册信息toStop true;// interrupt and wait//中断注册线程if (registryThread ! null) {registryThread.interrupt();try {registryThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}当把注册守护线程循环条件toStop设置为true后会跳出while循环停止自动注册然后会执行移除此执行器注册信息的接口最后中断注册守护线程。
2jobThreadRepository清理
jobThreadRepository是一个存放处理任务线程类JobThread的map集合每个JobThread都是一个线程类都需要中断线程然后清空map集合。
3jobHandlerRepository.clear()
jobHandlerRepository是一个存放任务处理器类IJobHandler的map集合每个IJobHandler都是一个可执行的处理类需要清空此map。
4JobLogFileCleanThread.getInstance().toStop()
销毁休眠周期为一天的清除文件守护线程看下源码 public void toStop() {//停止标识为true则上面while的条件不满足跳出循环toStop true;if (localThread null) {return;}// interrupt and wait//中断清除日志文件的守护线程localThread.interrupt();try {localThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}5TriggerCallbackThread.getInstance().toStop()
销毁执行反馈的守护线程、执行失败反馈的守护线程看下源码 public void toStop(){//标识为true则上面的while条件不符合跳出循环toStop true;// stop callback, interrupt and wait//销毁回调线程if (triggerCallbackThread ! null) { // support empty admin addresstriggerCallbackThread.interrupt();try {triggerCallbackThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}// stop retry, interrupt and wait//销毁重试调度反馈线程if (triggerRetryCallbackThread ! null) {triggerRetryCallbackThread.interrupt();try {triggerRetryCallbackThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}当toStop为true时会跳出循环处理反馈消息的逻辑若反馈队列里面还有未反馈的记录则进行最后的反馈中断线程使用了triggerCallbackThread.join()就是得等线程运行结束。停止线程之前运行的最后反馈源码 //当停止反馈线程后把当前callBackQueue反馈队列里面还没有反馈完的记录进行反馈try {ListHandleCallbackParam callbackParamList new ArrayListHandleCallbackParam();int drainToNum getInstance().callBackQueue.drainTo(callbackParamList);if (callbackParamList!null callbackParamList.size()0) {doCallback(callbackParamList);}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}Redisson优化分布式锁问题
xxl-job为了防止集群部署调度中心时任务被重复加载使用mysql的写锁机制进行控制每次预加载任务时都创建一个mysql连接并对表xxl_job_lock加锁加锁成功才进行任务的预读处理这样就能保证集群环境下每次只会有一台机器加锁成功。预读任务加载完毕后还需要释放锁关闭mysql连接非常浪费资源、加大数据库的压力。对于分布式加锁的问题此处使用主流的Redisson分布式锁进行优化优化的步骤如下
1引入依赖
按着项目定义的规范版本号都是定义在父工程中所以在父项目的pom.xml中定义Redisson的版本号
redisson.version3.16.4/redisson.version 在调度中心xxl-job-admin的pom.xml中引入需要的Redisson、redis依赖引入redis是为了获取到redis的连接信息 !-- redis --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependency!-- redisson --dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion${redisson.version}/version/dependency2配置redis连接信息
在xxl-job-admin项目的application.properties配置文件中添加redis的连接信息
### redis
spring.redis.database0
spring.redis.host127.0.0.1
spring.redis.port6379
spring.redis.timeout3000
spring.redis.lettuce.pool.max-active20
spring.redis.lettuce.pool.max-idle10
spring.redis.lettuce.pool.max-wait-1
spring.redis.lettuce.pool.min-idle03创建Redisson客户端
在xxl-job-admin项目的com.xxl.job.admin.core包下创建一个包redis新建一个RedissonConfig配置类定义Redisson客户端。此处使用的是redis单节点的方式若是其它例如集群、哨兵模式请参考官网进行创建
Configuration
EnableConfigurationProperties(value RedisProperties.class)
public class RedissonConfig {//创建redisson客户端此时默认使用单节点Beanpublic RedissonClient redissonClient(RedisProperties redisProperties){Config config new Config();config.useSingleServer().setAddress(redis://redisProperties.getHost():redisProperties.getPort());config.useSingleServer().setDatabase(redisProperties.getDatabase());config.useSingleServer().setPassword(redisProperties.getPassword());config.useSingleServer().setTimeout((int)redisProperties.getTimeout().getSeconds()*1000);RedissonClient redisson Redisson.create(config);return redisson;}}4原有代码改造
需要把Redisson客户端注入到处理预读任务的JobScheduleHelper类中由于Redisson客户端是一个Bean对象而JobScheduleHelper是一个普通类所以需要在创建JobScheduleHelper时传递参数的方式实现。调用JobScheduleHelper类最前置的类是XxlJobAdminConfig类此类也是一个Bean对象是初始化资源的入口类我们把Redisson客户端注入到此类中并在执行init初始化方法时把Redisson客户端传递下去。
XxlJobAdminConfig.java修改的地方如下 //注入redisson客户端ResourceRedissonClient redissonClient;public void afterPropertiesSet() throws Exception {//初始化调度中心资源--添加参数xxlJobScheduler.init(redissonClient);}XxlJobScheduler类的init方法接收参数并在创建JobScheduleHelper类时把Redisson客户端作为参数传递进去XxlJobScheduler.java修改的地方如下 //初始化调度中心资源--接收参数public void init(RedissonClient redissonClient) throws Exception {//把redissonClient作为参数传递过去JobScheduleHelper.getInstance(redissonClient).start();}//销毁调度中心资源public void destroy() throws Exception {//停止预读线程、环形处理任务线程JobScheduleHelper.getInstance(null).toStop();}JobScheduleHelper类之前使用饿汉式的方式创建此时需要改为接收参数式的懒汉式创建JobScheduleHelper.java修改的地方如下 //redisson客户端private RedissonClient redissonClient;private static JobScheduleHelper instance null;//接收参数式的懒汉式创建对象public static JobScheduleHelper getInstance(RedissonClient redissonClient){if(instance null) {synchronized (JobScheduleHelper.class){if(instance null) {instance new JobScheduleHelper(redissonClient);}}}return instance;}//创建对象时注入redisson客户端public JobScheduleHelper(RedissonClient redissonClient){this.redissonClient redissonClient;}5替代加锁机制
把创建mysql连接的代码修改为获取redisson锁加锁成功才进行预读流程处理把关闭mysql连接的代码修改为关闭redisson锁。
while (!scheduleThreadToStop) {//起始时间long start System.currentTimeMillis();//获取redisson锁RLock lock redissonClient.getLock(preReadJob);try {//尝试加锁boolean res lock.tryLock(30, TimeUnit.SECONDS);if(res) {//获取到锁进行处理long nowTime System.currentTimeMillis();//处理预读流程...}} catch (Exception e) {} finally {//释放锁if (lock.isLocked() lock.isHeldByCurrentThread()) {lock.unlock();}}