xxl-job调度中心、执行器源码详解
文章目录
- 简介
- 调度中心
- 一.程序启动初始化
- 1.初始化入口类
- 2.初始化I18n
- 3.初始化快慢调度线程池
- 4.初始化处理执行器注册或移除线程池+更新执行器最新在线的守护线程
- 5.初始化监控任务调度失败或执行失败的守护线程
- 6.初始化处理执行器回调线程池+监控任务执行结果丢失的守护线程
- 7.初始化计算每天调度情况统计、清理过期日志记录的守护线程
- 8.初始化预读和执行任务的守护线程
- 9.初始化处理预读任务的守护线程
- 10.初始化资源汇总说明图
- 二.主动发起请求
- 1.beat调用
- 2.idleBeat调用
- 3.run调用
- 4.kill调用
- 5.log调用
- 三.接收请求处理
- 1.callback请求
- 2.registry请求
- 3.registryRemove请求
- 四.程序结束销毁处理
- 1.销毁入口类
- 2.资源销毁处理
- 执行器
- 一.程序启动初始化
- 1.初始化入口类
- 2.初始化处理任务的方法
- 3.初始化执行日志目录
- 4.初始化操作调度中心的客户端
- 5.初始化清除日志文件的守护线程
- 6.初始化向调度中心反馈执行结果的守护线程
- 7.初始化重试反馈失败记录的守护线程
- 8.初始化守护线程并创建netty服务监听端口调用+处理调用的线程池
- 9.初始化注册执行器在线情况的守护线程
- 10.初始化资源汇总说明图
- 二.主动发起请求
- 1.callback调用
- 2.registry调用
- 3.registryRemove调用
- 三.接收请求处理
- 1.beat请求
- 2.idleBeat请求
- 3.run请求
- 4.kill请求
- 5.log请求
- 四.程序结束销毁处理
- 1.销毁入口类
- 2.资源销毁处理
- Redisson优化分布式锁问题
简介
xxl-job是一款基于java开发的分布式任务调度平台,集成非常简单,官网下载工程后,调度中心配置上mysql数据源,把默认需要的表导入到数据库中,调度中心项目打成jar包,直接启动,调度平台就创建完成。执行器为具体业务开发项目,只需引入xxl-job-core依赖,配置上调度中心地址、执行日志存放目录,创建执行器对象,使用@XxlJob即可定义调度的具体任务。实现对任务调度的可视化操作方式,操作非常简单,此处针对重要流程进行源码分析,具体使用详情可以参考官网。
调度中心
调度中心是对任务的管理,任务执行状态、执行结果、执行日志进行监控的平台,是一个web工程,用户可以方便的进行任务管理。支持邮件报警,支持CRON和固定速度两种调度方式,支持Bean、shell脚本、php等多种运行模式,支持随机、轮询、故障转移等多种路由策略,支持子任务的连带执行,支持忽略、立即执行一次两种调度过期策略,支持单机串行、丢弃后续调度、覆盖之前调度三种阻塞处理策略。
一.程序启动初始化
程序启动后会做很多的资源初始化,创建需要的守护线程,资源初始化的入口类为JobAlarmer类和XxlJobAdminConfig类,我们从这两个类来看初始化过程。
1.初始化入口类
JobAlarmer类和XxlJobAdminConfig类会作为初始化入口类是因为它们被@Component修饰,在spring
容器中注册为Bean对象,并实现了InitializingBean接口,此接口有一个方法afterPropertiesSet(),在Bean初始化完并把参数注入成功后会调用afterPropertiesSet(属性设置之后)方法,在此方法进行的资源初始化。JobAlarmer类还实现了ApplicationContextAware接口,此接口有一个方法setApplicationContext,会把spring容器上下文设置到此方法中,我们可以定义一个变量来接收ApplicationContext,这样就可以获取到spring容器中注册的bean对象。JobAlarmer类是为了从spring容器中获取到定义的报警类,当需要报警时,调用所有的报警类执行报警方法,报警类支持自定义扩展,扩展方式只需实现JobAlarm接口,并把自定义扩展类设置为Bean(@Component修饰),重写alarm方法,完成具体扩展方式的报警处理。
看下JobAlarmer类的源码:
@Component
public class JobAlarmer implements ApplicationContextAware, InitializingBean {private static Logger logger = LoggerFactory.getLogger(JobAlarmer.class);private ApplicationContext applicationContext;private List<JobAlarm> jobAlarmList; //存放报警的实现类,可以进行扩展,实现JobAlarm接口,并把实现类注册为bean即可//实现ApplicationContextAware接口,获取上下文,得到加载到spring容器中的所有bean对象@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}//实现了InitializingBean接口,在Bean初始化完并把参数注入成功后会调用afterPropertiesSet()@Overridepublic void afterPropertiesSet() throws Exception {//从spring容器中获取JobAlarm类型的bean,并存放到list集合中Map<String, JobAlarm> serviceBeanMap = applicationContext.getBeansOfType(JobAlarm.class);if (serviceBeanMap != null && serviceBeanMap.size() > 0) {jobAlarmList = new ArrayList<JobAlarm>(serviceBeanMap.values());}}/*** job alarm* 发送预警邮件* @param info* @param jobLog* @return*/public boolean alarm(XxlJobInfo info, XxlJobLog jobLog) {boolean result = false;//报警的集合类不为空if (jobAlarmList!=null && jobAlarmList.size()>0) {result = true; // success means all-successfor (JobAlarm alarm: jobAlarmList) {boolean resultItem = false;try {//执行报警方法resultItem = alarm.doAlarm(info, jobLog);} catch (Exception e) {logger.error(e.getMessage(), e);}if (!resultItem) {result = false;}}}return result;}}
看下XxlJobAdminConfig类的部分源码:
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {private XxlJobScheduler xxlJobScheduler;//实现了InitializingBean接口,在Bean初始化完并把参数注入成功后会调用afterPropertiesSet()@Overridepublic void afterPropertiesSet() throws Exception {xxlJobScheduler = new XxlJobScheduler();//初始化调度中心资源xxlJobScheduler.init();}
}
2.初始化I18n
系统web页面的文字支持三种类型:en(英文)、zh_CN(中文)、zh_TC(中文繁体),具体使用哪种类型在application.propreties配置文件中约定,配置项如下:
xxl.job.i18n=zh_CN
初始化资源的入口为XxlJobAdminConfig类的afterPropertiesSet()方法,此方法创建了XxlJobScheduler类,执行它的init方法,在它的init方法中有初始化i18n的方法initI18n(),来看下initI18n()源码:
//初始化i18n,用于不同语言的字符显示private void initI18n(){//对阻塞处理策略枚举类重新设置title值,连带着初始化了I18nfor (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {//根据选择的i18n类型,从i18n配置文件中根据key加载对应的文本item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));}}
此方法对阻塞处理策略枚举类重新设置title值,title的值需要从i18n字典中匹配,在读取I18n字典的时候,初始化了i18n。看下根据key从I18n中获取值的方法源码:
public static String getString(String key) {//加载i18n的字典文件,从此字典文件中根据key获取value值return loadI18nProp().getProperty(key);}private static Properties prop = null;//根据选择的i18n类型,加载对应的配置文件public static Properties loadI18nProp(){if (prop != null) {return prop;}try {// build i18n prop//获取配置的i18n类型String i18n = XxlJobAdminConfig.getAdminConfig().getI18n();//根据类型拼接出需要的字典文件名String i18nFile = MessageFormat.format("i18n/message_{0}.properties", i18n);// load prop//根据文件目录加载资源Resource resource = new ClassPathResource(i18nFile);EncodedResource encodedResource = new EncodedResource(resource,"UTF-8");//加载properties配置文件信息prop = PropertiesLoaderUtils.loadProperties(encodedResource);} catch (IOException e) {logger.error(e.getMessage(), e);}return prop;}
获取字典项Properties,若是第一次调用,则Properties为空,此时会从配置文件中获取到配置的i18n类型,根据类型拼接出需要的字典文件名,然后加载字典文件,字典文件是properties类型存放在resources/i18n目录下:
然后根据key从Properties中获取到value值,第一次加载后,后面直接从prop中取值。
3.初始化快慢调度线程池
为了优化调度效率,定义了快慢调度线程池,快慢线程池的区别在于最大线程数、阻塞队列的大小;快线程池的最大线程数默认为200,小于200的按200处理,阻塞队列为1000;慢线程池的最大线程数默认为100,小于100的按100处理,阻塞队列为2000。执行任务调度时该选哪种线程池来执行的依据:在1分钟内,此任务有10次超过500毫米才调度完成,使用慢线程池处理,否则使用快线程池处理。
初始化此资源的入口为JobTriggerPoolHelper.toStart()方法,看下此toStart()源码:
private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();//初始化调度线程池public static void toStart() {//调用JobTriggerPoolHelper的start方法helper.start();}
JobTriggerPoolHelper类中创建了自身对象helper,调用自身的start方法,看下start方法源码:
private ThreadPoolExecutor fastTriggerPool = null;private ThreadPoolExecutor slowTriggerPool = null;public void start(){//初始化快的调度线程池fastTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(1000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());}});//初始化慢的调度线程池,与快的不同是阻塞队列的大小为2000、最大线程数slowTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());}});}
此方法初始化了快慢线程池,从配置文件中获取到快慢线程池的最大线程数,在application.properties配置文件中约定此值:
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
具体获取此值的时候做了最小的限制:
//注入配置变量@Value("${xxl.job.triggerpool.fast.max}")private int triggerPoolFastMax;@Value("${xxl.job.triggerpool.slow.max}")private int triggerPoolSlowMax;public int getTriggerPoolFastMax() {//小于200按200处理if (triggerPoolFastMax < 200) {return 200;}return triggerPoolFastMax;}public int getTriggerPoolSlowMax() {//小于100按100处理if (triggerPoolSlowMax < 100) {return 100;}return triggerPoolSlowMax;}
4.初始化处理执行器注册或移除线程池+更新执行器最新在线的守护线程
初始化此资源的入口为:JobRegistryHelper.getInstance().start(),主要初始化处理执行器注册、移除的线程池,初始化更新自动注册执行器最新在线情况的守护线程,心跳触发机制(默认30秒,执行器注册的周期也是30秒),对大于心跳时间*3(90秒)没有最新注册的执行器进行删除。看下start方法的源码:
public void start(){// for registry or remove//创建处理执行器注册或者删除的线程池registryOrRemoveThreadPool = new ThreadPoolExecutor(2,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");}});// for monitor//创建更新执行器最新在线的守护线程registryMonitorThread = new Thread(new Runnable() {@Overridepublic void run() {//不销毁就一直执行while (!toStop) {try {// auto registry group//只处理自动注册的执行器组List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);//查询到有记录if (groupList!=null && !groupList.isEmpty()) {// remove dead address (admin/executor)//从执行器注册的表里面查询大于心跳时间(默认90秒)没有过注册的记录,进行删除,表示这些执行器可能已经下线了List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());if (ids!=null && ids.size()>0) {//删除已经下线的执行器记录XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// fresh online address (admin/executor)//使用集合记录当前执行器在线情况,key:执行器AppName,value:这个执行器分组下的执行器集合HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();//查询在心跳时间(默认90秒)内有过注册的执行器List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());if (list != null) {for (XxlJobRegistry item: list) {//处理是执行器类型的数据if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {String appname = item.getRegistryKey();List<String> registryList = appAddressMap.get(appname);if (registryList == null) {registryList = new ArrayList<String>();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}//key:执行器AppName,value:这个执行器分组下的执行器集合,多个执行器根据配置的appName进行分组appAddressMap.put(appname, registryList);}}}// fresh group address//刷新自动注册执行器分组里面当前在线的执行器列表for (XxlJobGroup group: groupList) {//根据key从当前在线执行器集合里面获取到某个执行器分组的在线集合List<String> registryList = appAddressMap.get(group.getAppname());String addressListStr = null;//执行器分组在线集合不为空,则重新设置下此执行器分组最新的在线情况;若是为空,则表示此执行器分组下已经没有在线的执行器了,则给执行器在线分组设置为nullif (registryList!=null && !registryList.isEmpty()) {//排序Collections.sort(registryList);StringBuilder addressListSB = new StringBuilder();//使用逗号进行当前执行器分组下在线执行器的数组组织for (String item:registryList) {addressListSB.append(item).append(",");}addressListStr = addressListSB.toString();addressListStr = addressListStr.substring(0, addressListStr.length()-1);}group.setAddressList(addressListStr);group.setUpdateTime(new Date());//更新执行器分组下当前在线的执行器数据XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}try {//默认休眠30秒,与执行器心跳注册的时间保持一致TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");}});//设置为守护线程registryMonitorThread.setDaemon(true);registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");//启动线程registryMonitorThread.start();}
registryOrRemoveThreadPool线程池是为了给执行器注册或者删除注册时使用,registryMonitorThread守护线程会去查询注册方式为自动注册的任务组,任务组数据存放在xxl_job_group表中,address_type字段标识注册方式(0自动注册),address_list字段标识当前在线的执行器集合(使用逗号连接),app_name字段用于给注册的执行器分组,自动注册的执行器需要带有所属的分组值,根据分组值来确定此执行器属于哪个任务组。查询到自动注册的任务组后,从执行器自动注册表xxl_job_registry中获取到最新的执行器在线情况(执行器默认30秒注册一次),对于大于90秒没有过注册的执行器,把它从xxl_job_registry表中删除;再从xxl_job_registry表中查询最新的执行器在线情况(90秒内有过注册),根据分组值组织在线情况,最终更新xxl_job_group表的address_list字段值。
就是说registryMonitorThread守护线程会以30秒的休眠周期一直循环检查过期未注册的执行器,并把它的注册记录删除,然后重新更新任务组在线的执行器集合,达到及时显示执行器上线、下线的检测。
5.初始化监控任务调度失败或执行失败的守护线程
初始化的入口为:JobFailMonitorHelper.getInstance().start(),初始化监控任务调度失败或者执行器执行失败日志的线程,心跳触发机制(默认10秒),对失败的任务,有配置报警邮件则发送报警邮件;有配置重试次数大于0的,则进行任务的重新调度。来看start方法源码:
public void start(){//创建监控守护线程monitorThread = new Thread(new Runnable() {@Overridepublic void run() {// monitor//不停止一致运行while (!toStop) {try {//获取调度失败或者执行器执行失败的日志记录,alarm_status为0,告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);if (failLogIds!=null && !failLogIds.isEmpty()) {for (long failLogId: failLogIds) {// lock log//更新xxl_job_log表对应日志记录的alarm_status由0修改为-1int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);if (lockRet < 1) {//已经执行过更新continue;}//加载日志记录XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);//加载日志对应的任务信息XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());// 1、fail retry monitor//任务若是配置了失败重试次数大于0,则进行重试调用if (log.getExecutorFailRetryCount() > 0) {//重试调用执行任务,调度方式为重试,重试次数为配置的次数减1,JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);//调度日志追加上重试调用日志信息String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";log.setTriggerMsg(log.getTriggerMsg() + retryMsg);//更新调度日志信息XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);}// 2、fail alarm monitorint newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败if (info != null) {//发送报警邮件boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);newAlarmStatus = alarmResult?2:3;} else {newAlarmStatus = 1;}//更新日志记录的报警邮件是否发送成功情况XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);}}try {//休眠10,心跳周期为10秒TimeUnit.SECONDS.sleep(10);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");}});//设置为守护线程monitorThread.setDaemon(true);monitorThread.setName("xxl-job, admin JobFailMonitorHelper");//启动线程monitorThread.start();}
monitorThread守护线程会以10秒为休眠周期循环检查调用失败的日志文件,然后对失败记录进行响应处理。
6.初始化处理执行器回调线程池+监控任务执行结果丢失的守护线程
初始化的入口为:JobCompleteHelper.getInstance().start(),初始化处理执行器回调的线程池,初始化监控执行器任务结果丢失的线程,心跳机制触发(默认60秒),监控任务已经调度成功,但是执行器一直没有反馈处理情况,任务状态一直是“运行中”(handle_code = 0),且调度开始时间到现在已经过去10分钟、且对应的执行器已经没有在心跳注册的记录(已下线),把这样的记录标记为执行失败。看下start方法源码:
public void start(){// for callback//创建处理执行器回调的线程池callbackThreadPool = new ThreadPoolExecutor(2,20,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(3000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");}});// for monitor//创建守护线程monitorThread = new Thread(new Runnable() {@Overridepublic void run() {// wait for JobTriggerPoolHelper-inittry {//休眠50毫秒,等待JobTriggerPoolHelper初始完成TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}// monitor//不销毁一直监听while (!toStop) {try {// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本次调度主动标记失败;Date losedTime = DateUtil.addMinutes(new Date(), -10);//查询出已经调度成功,但是执行器一直没有反馈处理成功,任务状态一直是“运行中”(handle_code = 0),且调度开始时间到现在已经过去10分钟、且对应的执行器已经没有心跳注册的记录List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);//把这样的记录标记为失败if (losedJobIds!=null && losedJobIds.size()>0) {for (Long logId: losedJobIds) {XxlJobLog jobLog = new XxlJobLog();jobLog.setId(logId);jobLog.setHandleTime(new Date());jobLog.setHandleCode(ReturnT.FAIL_CODE);jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );//完成此任务,并更新日志的状态值,有子任务再调用子任务XxlJobCompleter.updateHandleInfoAndFinish(jobLog);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);}}try {//休眠周期是60秒,心跳机制TimeUnit.SECONDS.sleep(60);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");}});//设置为守护线程monitorThread.setDaemon(true);monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");//启动线程monitorThread.start();}
callbackThreadPool线程池主要是给执行器执行任务结束后,给调度中心的反馈处理;monitorThread守护线程,以60秒为休眠周期循序检测任务状态为运行中,且调度时间已经大于10分钟,并且执行器已经没有注册(不在线)的记录,把它标记为执行失败,否则此日志会一直处于运行中。
7.初始化计算每天调度情况统计、清理过期日志记录的守护线程
此初始化的入口为:JobLogReportHelper.getInstance().start(),初始化处理日志报表的守护线程,心跳机制触发(默认1分钟),处理当前时间往前推两天,这三天时间内的调度结果报表值,把结果值按天存放到xxl_job_log_report表中;配置的保存日志最大天数大于0则进行清理处理,当前时间减去上次清理时间大于1天(毫秒数)则进行过期日志记录的删除。看下start源码:
public void start(){//创建日志报表的守护线程logrThread = new Thread(new Runnable() {@Overridepublic void run() {// last clean log time//上次清理日志时间long lastCleanLogTime = 0;//不销毁一直执行while (!toStop) {// 1、log-report refresh: refresh log report in 3 daystry {//处理当前时间往前推两天,这三天时间内的调度结果值for (int i = 0; i < 3; i++) {// todayCalendar itemDay = Calendar.getInstance();itemDay.add(Calendar.DAY_OF_MONTH, -i);itemDay.set(Calendar.HOUR_OF_DAY, 0);itemDay.set(Calendar.MINUTE, 0);itemDay.set(Calendar.SECOND, 0);itemDay.set(Calendar.MILLISECOND, 0);//当前时间减去i天的00:00:00时刻Date todayFrom = itemDay.getTime();itemDay.set(Calendar.HOUR_OF_DAY, 23);itemDay.set(Calendar.MINUTE, 59);itemDay.set(Calendar.SECOND, 59);itemDay.set(Calendar.MILLISECOND, 999);//当前时间减去i天的23:59:59时刻Date todayTo = itemDay.getTime();// refresh log-report every minuteXxlJobLogReport xxlJobLogReport = new XxlJobLogReport();xxlJobLogReport.setTriggerDay(todayFrom);xxlJobLogReport.setRunningCount(0);xxlJobLogReport.setSucCount(0);xxlJobLogReport.setFailCount(0);//根据起止日期,从xxl_job_log日志表中查询这个时间段内总的执行次数、运行中次数、调度成功次数Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);if (triggerCountMap!=null && triggerCountMap.size()>0) {//总的执行次数int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;//运行中次数int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;//调度成功次数int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;//失败次数int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;xxlJobLogReport.setRunningCount(triggerDayCountRunning);xxlJobLogReport.setSucCount(triggerDayCountSuc);xxlJobLogReport.setFailCount(triggerDayCountFail);}// do refresh//把某一天的调度日志报表记录更新进去int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);if (ret < 1) { //若是之前没有添加过,则进行插入XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);}}// 2、log-clean: switch open & once each day//配置的保存日志最大天数大于0则处理,当前时间减去上次清理时间大于1天(毫秒数)则进行日志的清除if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0&& System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {// expire-time//清理日志的时间Calendar expiredDay = Calendar.getInstance();//当前时间减去配置的天数expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());expiredDay.set(Calendar.HOUR_OF_DAY, 0);expiredDay.set(Calendar.MINUTE, 0);expiredDay.set(Calendar.SECOND, 0);expiredDay.set(Calendar.MILLISECOND, 0);//得到清理的时间Date clearBeforeTime = expiredDay.getTime();// clean expired log//循环处理所有的大于最大存放日期的日志List<Long> logIds = null;do {//查询调度日期在清理截止日期之前的日志记录logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);if (logIds!=null && logIds.size()>0) {//删除日志记录XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);}} while (logIds!=null && logIds.size()>0);// update clean time//重新设置上次清理时间lastCleanLogTime = System.currentTimeMillis();}try {//休眠1分钟,心跳注册机制TimeUnit.MINUTES.sleep(1);} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");}});//设置为守护线程logrThread.setDaemon(true);logrThread.setName("xxl-job, admin JobLogReportHelper");//启动线程logrThread.start();}
logrThread守护线程以60秒为休眠周期循序统计当前时间往前推2天,这3天内按天统计任务总的执行成功、运行中、执行失败次数,把结果存放到xxl_job_log_report表中,供web首页显示,web首页展示截图如下:
还对日志记录进行过期清理,虽然守护线程的循环周期是60秒,但是对日志的清理方法一天只会执行一次,每次执行都会重新设置上次清除时间lastCleanLogTime,每次都判断当前时间与上次执行清理时间是否大于1天,大于才进行日志的清理。在application.properties中配置日志的保存天数:
xxl.job.logretentiondays=30
若是不想清除日志,可以配置值小于0,例如-1,此处配置的日志天数需要和执行器配置的天数保存一致,否则可能调度中心xxl_job_log日志表中还有记录,但是执行器目录下已经删除了此任务对应的执行日志文件,这样就会导致访问不到执行日志详情。
8.初始化预读和执行任务的守护线程
初始化的入口为:JobScheduleHelper.getInstance().start(),初始化定时守护线程,每次固定休眠4到5秒,为了防止在集群环境中,任务被重复调度,所以预读任务的时候使用数据库写锁的方式处理;预读下次执行时间在当前时间+5秒之内的任务,对于下次执行时间到当前时间已经差着5秒以上的任务(过期未执行),过期调度策略为立即执行一次,则进行任务调度;对于下次执行时间在当前时间减去5秒内的任务进行调度;对于下次执行时间在当前时间往后+5秒内的任务放到map中,map的key为任务的执行秒数,value为这一秒需要执行的任务id集合。看下start方法源码:
public void start(){// schedule threadscheduleThread = new Thread(new Runnable() {@Overridepublic void run() {try {//休眠5000 - System.currentTimeMillis()%1000毫秒,最大值的情况为5000-0,最小值的情况为5000-999//随机休眠4到5秒的范围TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>> init xxl-job admin scheduler success.");// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)//预读数量:按每个任务50ms计算,qps为20,快线程池+慢线程池最大线程数之和,再乘以20,即为1秒可以处理的最大任务量,默认是6000int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Job//起始时间long start = System.currentTimeMillis();//数据库连接Connection conn = null;//连接是否自动提交Boolean connAutoCommit = null;//预处理PreparedStatement preparedStatement = null;boolean preReadSuc = true;try {//获取数据库连接conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();//关闭自动提交conn.setAutoCommit(false);//执行sql语句:对xxl_job_lock添加写锁,为了防止在集群环境中,任务被重复调度,所以使用写锁的方式处理preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// tx start// 1、pre readlong nowTime = System.currentTimeMillis();//查询预执行的任务,且下次执行时间小于当前时间往后+5秒,最多查询可以处理的preReadCount数量List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {// 2、push time-ringfor (XxlJobInfo jobInfo: scheduleList) {// time-ring jump//任务下次执行时间到当前时间已经差着5秒以上,说明已经过了调度时间了if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、trigger-expire > 5s:pass && make next-trigger-timelogger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());// 1、misfire match//获取到此任务配置的过期调度策略MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);//若是立即执行一次,则调用执行方法if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 trigger//调用任务执行方法,执行类型为调度过期补偿JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );}// 2、fresh next//重新设置任务的下次执行时间和上次执行时间refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {//下次执行时间在当前时间减去5秒之内// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time// 1、trigger//调用任务执行方法,执行类型为Cron触发JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );// 2、fresh next//重新设置任务的下次执行时间和上次执行时间refreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read again//经过上面重新设置了下次执行时间,新设置的下次执行时间还在当前时间加上5秒之内if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 1、make ring second//计算结果值范围:0到59之间int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ring//把任务id放到一个map集合中pushTimeRing(ringSecond, jobInfo.getId());// 3、fresh next//重新设置任务的下次执行时间和上次执行时间refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {//下次执行时间在当前时间往后延5秒之内// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time// 1、make ring second//计算结果值范围:0到59之间int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ring//把任务id放到一个map集合中pushTimeRing(ringSecond, jobInfo.getId());// 3、fresh next//重新设置任务的下次执行时间和上次执行时间refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、update trigger info//更新任务下次执行时间和上次执行时间for (XxlJobInfo jobInfo: scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {//没有进行预读处理preReadSuc = false;}// tx stop} catch (Exception e) {if (!scheduleThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);}} finally {// commitif (conn != null) {try {//提交数据库,释放写锁conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {//还原数据库连接的自动提交设置conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {//关闭连接conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}// close PreparedStatement//关闭预处理if (null != preparedStatement) {try {preparedStatement.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}//计算花费的时间long cost = System.currentTimeMillis()-start;// Wait seconds, align second//花费时间小于1秒则让程序休眠,大于1秒则不休眠if (cost < 1000) { // scan-overtime, not waittry {// pre-read period: success > scan each second; fail > skip this period;//线程休眠,休眠时间计算:若是有预读,则使用1000减去0到999(0到1秒之间);没有预读,则使用5000减去0到999(4到5秒之间)//查询任务的定时周期为5秒,若是没有预读,则休眠时间在(0到1之间)+默认的(4到5秒)之间;有预读,则预读处理的时候已经把任务下次执行时间在当前时间+5秒之内任务加到环map中了,所以这里再休眠4到5秒,要不然就是白跑一趟TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");}});//设置为守护线程scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");//启动线程scheduleThread.start();
}
scheduleThread守护线程以4到5秒为休眠周期循环加载下次执行时间在当前时间+5秒内的任务,为了防止在集群环境中,任务被重复加载调度,所以预读任务的时候使用数据库写锁的方式处理,执行的sql语句:
select * from xxl_job_lock where lock_name = 'schedule_lock' for update
按快慢线程池的大小计算出可以预读的数量,查询任务下次执行时间小于当前时间加5秒的记录;对于下次执行时间到当前时间已经差着5秒以上的任务(过期未执行),过期调度策略为立即执行一次,则进行任务的调度,并重新设置下次执行时间;对于下次执行时间在当前时间减去5秒之内,调用任务执行方法,并重新设置下次执行时间,新设置的下次执行时间还在当前时间加上5秒之内,则把此任务加到环形map中;下次执行时间在当前时间往后延5秒之内(未到执行时间),则把此任务加到map中,并重新设置下次执行时间。上面花费时间小于1秒则让程序休眠,大于1秒则不休眠;休眠时间计算:若是有预读,则使用1000减去0到999(0到1秒之间);没有预读,则使用5000减去0到999(4到5秒之间);查询任务的定时周期为5秒,若是有预读,则休眠时间在(0到1之间)+默认的(4到5秒)之间;没有预读,则说明接下来的5秒内没有要执行的任务,此处休眠4到5秒,预读处理的时候已经把任务下次执行时间在当前时间+5秒之内任务加到map中了。
9.初始化处理预读任务的守护线程
初始化的入口为:JobScheduleHelper.getInstance().start(),初始化处理预读任务的守护线程,每次休眠周期在0到1秒之间,执行放到map中的预执行任务,根据当前秒数为key,从map中取出任务进行调度。看下start方法源码:
public void start(){// ring thread//创建环形线程,用于处理上面定时线程预读任务(周期5秒左右)的时候,对于下次执行时间在当前时间+5秒内的任务,使用此线程来进行调度ringThread = new Thread(new Runnable() {@Overridepublic void run() {while (!ringThreadToStop) {// align secondtry {//随机休眠1到1000毫秒,在1秒范围内TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);} catch (InterruptedException e) {if (!ringThreadToStop) {logger.error(e.getMessage(), e);}}try {// second dataList<Integer> ringItemData = new ArrayList<>();//获取当前的秒数int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {//ringData存放的是预处理时,当前时间+5秒内需要执行的任务,使用此map对象存放的集合在ringThread线程中进行处理//ringData的key是5秒内预处理的任务的秒数List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);}}// ring triggerlogger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );//当前这一秒下有任务要处理if (ringItemData.size() > 0) {// do trigger//循环处理这一秒下的任务for (int jobId: ringItemData) {// do trigger//调用任务执行方法,执行类型为Cron触发JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);}// clearringItemData.clear();}} catch (Exception e) {if (!ringThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");}});//设置为守护线程ringThread.setDaemon(true);ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");//启动线程ringThread.start();}
ringThread守护线程以0到1秒之间的休眠周期,循环处理预读(5秒内)加到map中的任务,根据当前的秒数为key从map中取出这一秒需要执行的任务进行调度。
10.初始化资源汇总说明图
(1)初始化的线程池
(2)初始化的守护线程
二.主动发起请求
调度中心主动向执行器发起的请求可以从调度中心客户端ExecutorBizClient类找到,此类存在xxl-job的公共核心xxl-job-core工程中,目录结构为com.xxl.job.core.biz.client,从类里面可以看到包含beat、idleBeat、run、kill、log五个方法。看下ExecutorBizClient的源码:
/*** 调度中心-》调用执行器的客户端,供调度中心使用*/
public class ExecutorBizClient implements ExecutorBiz {public ExecutorBizClient() {}public ExecutorBizClient(String addressUrl, String accessToken) {this.addressUrl = addressUrl;this.accessToken = accessToken;// validif (!this.addressUrl.endsWith("/")) {this.addressUrl = this.addressUrl + "/";}}private String addressUrl ;private String accessToken;private int timeout = 3;//心跳检测执行器是否在线,用于故障转移方式时的调用测试@Overridepublic ReturnT<String> beat() {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, "", String.class);}//心跳检测执行器是否忙碌,用于忙碌转移方式时的调用测试@Overridepublic ReturnT<String> idleBeat(IdleBeatParam idleBeatParam){//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, idleBeatParam, String.class);}//调用执行器运行@Overridepublic ReturnT<String> run(TriggerParam triggerParam) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);}//停止执行器的执行@Overridepublic ReturnT<String> kill(KillParam killParam) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, killParam, String.class);}//查询执行器产生的执行日志信息@Overridepublic ReturnT<LogResult> log(LogParam logParam) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class);}
}
此客户端类提供了5个调用执行器的方法,当方法被调用时,会拼接远程执行器的接口地址,进行远程调用。当需要向某个执行器发起请求时,需要为此执行器创建一个客户端,并把此客户端存放到map中,key为执行器地址,value为客户端,当下次再需要向此执行器发起请求时,直接从map中获取客户端即可。创建客户端时,需要把执行器地址和token作为参数传递进来,这样在发起远程调用时直接拼接url和传递token就行。看下根据执行器地址获取客户端的源码:
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address)
此方法调用到XxlJobScheduler类,并使用ExecutorBiz父接口来接收客户端。看下getExecutorBiz方法源码:
//使用集合记录执行器地址和它的客户端,key:执行器地址,value:调用执行器的客户端private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();public static ExecutorBiz getExecutorBiz(String address) throws Exception {// validif (address==null || address.trim().length()==0) {return null;}// load-cacheaddress = address.trim();//已经创建过则直接使用ExecutorBiz executorBiz = executorBizRepository.get(address);if (executorBiz != null) {return executorBiz;}// set-cache//没有创建过则创建executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());//放到map集合中供下次使用executorBizRepository.put(address, executorBiz);return executorBiz;}
1.beat调用
beat调用是调度中心检测执行器是否在线,用于故障转移方式时的调用测试,在源码中的调用位置:
public class ExecutorRouteFailover extends ExecutorRouter {@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {StringBuffer beatResultSB = new StringBuffer();//遍历执行器地址for (String address : addressList) {// beatReturnT<String> beatResult = null;try {//根据调用地址获取它对应的执行器客户端ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);//调用beat方法,发送请求beatResult = executorBiz.beat();} catch (Exception e) {logger.error(e.getMessage(), e);beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );}beatResultSB.append( (beatResultSB.length()>0)?"<br><br>":"").append(I18nUtil.getString("jobconf_beat") + ":").append("<br>address:").append(address).append("<br>code:").append(beatResult.getCode()).append("<br>msg:").append(beatResult.getMsg());// beat success//执行器还在线,则使用此执行器地址进行调用if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {beatResult.setMsg(beatResultSB.toString());//能调通的执行器地址beatResult.setContent(address);return beatResult;}}return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());}
}
当路由策略为故障转移时,在执行器集合中,选择能够调通,没有故障的执行器进行任务的调用。确定某个执行器是否有故障,只用调用一下它的beat接口,有反馈就是没有故障,没有反馈就是有故障。
2.idleBeat调用
idleBeat调用是检测执行器是否忙碌,用于忙碌转移方式时的调用测试,在源码中的调用位置:
public class ExecutorRouteBusyover extends ExecutorRouter {@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {StringBuffer idleBeatResultSB = new StringBuffer();//遍历所有执行器for (String address : addressList) {// beatReturnT<String> idleBeatResult = null;try {//根据地址获取调用执行器的客户端ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);//执行调用idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));} catch (Exception e) {logger.error(e.getMessage(), e);idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );}idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"").append(I18nUtil.getString("jobconf_idleBeat") + ":").append("<br>address:").append(address).append("<br>code:").append(idleBeatResult.getCode()).append("<br>msg:").append(idleBeatResult.getMsg());// beat success//调用成功,表示此执行器当前不处于忙碌状态if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {idleBeatResult.setMsg(idleBeatResultSB.toString());idleBeatResult.setContent(address);return idleBeatResult;}}return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());}
}
当路由策略为忙碌转移时,在执行器集合中,选择当前没有处理此任务的执行器执行任务,若是当前执行器正在执行此任务(任务的上一次调度,执行器还没有完成,又来一次调度),则返回失败,直到找到没有处理此任务的执行器进行调用。注意:忙碌转移是针对此任务执行器是否还在执行,不是指执行器是否有在执行任务,例如执行器当前正在执行其他任务,也算不忙碌。
3.run调用
run调用是指调用执行器执行任务,xxl-job提供了6中类型的调度方式,看下调度方式枚举类TriggerTypeEnum源码:
//调度方式枚举类
public enum TriggerTypeEnum {//手动触发MANUAL(I18nUtil.getString("jobconf_trigger_type_manual")),//Cron触发CRON(I18nUtil.getString("jobconf_trigger_type_cron")),//失败重试触发RETRY(I18nUtil.getString("jobconf_trigger_type_retry")),//父任务触发PARENT(I18nUtil.getString("jobconf_trigger_type_parent")),//API触发API(I18nUtil.getString("jobconf_trigger_type_api")),//调度过期补偿MISFIRE(I18nUtil.getString("jobconf_trigger_type_misfire"));private TriggerTypeEnum(String title){this.title = title;}private String title;public String getTitle() {return title;}
}
MANUAL:手动触发一次调度,用在web页面点击执行一次按钮时触发;
CRON:到执行时间了自动进行任务调度触发,用在预读守护线程加载预读任务、处理预读任务的守护线程到时间时的触发;
RETRY:任务调度失败或执行失败后,由监控任务调度失败或执行失败的守护线程触发;
PARENT:父任务触发,当调度中心收到执行器执行成功的反馈、处理任务结果丢失记录、停止执行器执行的反馈这三种情况下,更新完执行结果后,若是此任务有配置子任务,则触发子任务的执行;
API:API触发,目前xxl-job中没有使用到此种类型;
MISFIRE:调度过期补偿,用在预读守护线程加载下次执行时间在当前时间+5秒内任务时,对于下次执行时间已经在当前时间-5秒之前的任务(超时未执行),触发一次补偿调度。
源码中的调用我们就从手动触发执行一次开始看起,当在web界面中点击执行一次某个任务,页面截图如下:
调用到的控制类是JobInfoController,看下接口方法源码:
//触发调度@RequestMapping("/trigger")@ResponseBodypublic ReturnT<String> triggerJob(int id, String executorParam, String addressList) {if (executorParam == null) {executorParam = "";}//调度类型为手动触发JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);return ReturnT.SUCCESS;}
处理任务调度的类是JobTriggerPoolHelper,看下他的trigger源码:
private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();/*** 执行任务调度* @param jobId 任务id* @param triggerType 调度类型* @param failRetryCount 失败重试次数,大于等于0才生效* >=0: use this param* <0: use param from job info config* @param executorShardingParam //执行器分片信息* @param executorParam //任务参数* @param addressList //执行器地址*/public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);}
JobTriggerPoolHelper类先创建了自身对象,再调用addTrigger添加调度任务方法,看下addTrigger源码:
//使用volatile修饰变量是为了在多线程下,每个线程能及时拿到最新的minTim值private volatile long minTim = System.currentTimeMillis()/60000; // ms > min 计算出来的是分钟//jobTimeoutCountMap是以一分钟为口径进行统计的,一分钟内某个任务调度超过500毫秒的次数,根据此次数来选择使用快(小于10次)、慢(大于10次)线程池执行此任务private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();public void addTrigger(final int jobId,final TriggerTypeEnum triggerType,final int failRetryCount,final String executorShardingParam,final String executorParam,final String addressList) {// choose thread pool//选择使用快的还是慢的线程池执行此任务ThreadPoolExecutor triggerPool_ = fastTriggerPool;//使用jobTimeoutCountMap来存放任务超时次数集合,key:任务id,value:次数,它统计的口径是一分钟内的调度数据AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);//在1分钟内,此任务有10次以上超过500毫米才调度完成,使用慢线程池处理if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min//使用慢线程池处理triggerPool_ = slowTriggerPool;}// trigger//线程池执行任务triggerPool_.execute(new Runnable() {@Overridepublic void run() {//调度开始时间long start = System.currentTimeMillis();try {// do trigger//开始进行调度XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {// check timeout-count-map//调度结束后的时间-分钟long minTim_now = System.currentTimeMillis()/60000;//调度结束后的分钟数不等于设置的分钟数if (minTim != minTim_now) {//重新设置分钟数,作为下一个统计口径的时间点minTim = minTim_now;//清空map集合,已经过了minTim这一分钟的统计口径,jobTimeoutCountMap是以一分钟为口径进行统计的jobTimeoutCountMap.clear();}// incr timeout-count-map//计算一共花费了多少时间long cost = System.currentTimeMillis()-start;//调度时间大于500毫秒if (cost > 500) { // ob-timeout threshold 500ms//putIfAbsent:向map中添加记录,若是存在此key的记录,则返回value,若是不存在则插入,插入的时候返回的值为nullAtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));//已经存在此key的值,timeoutCount才不等于nullif (timeoutCount != null) {//使用AtomicInteger线程安全的方式把次数加1(cas自旋的方式,先比对在加1)timeoutCount.incrementAndGet();}}}}});}
处理此任务有快慢两个线程池可供选择,默认选择快线程池执行,选择慢线程池的条件:在1分钟内,此任务有10次以上超过500毫米才调度完成,使用慢线程池处理。这里使用volatile来修饰minTim,为了在多线程下,每个线程能及时拿到最新的minTim值;使用jobTimeoutCountMap来存放某个任务在一分钟内调度超过500毫秒的次数,也是使用volatile来修饰。当任务执行完成,判断当前的分钟数是否还等于minTim,若是等于,说明还在minTim这分钟内;若是不等于(大于),说明已经过了minTim这分钟的统计维度,需要把jobTimeoutCountMap清空,并把当前分钟数赋值给minTim;调度时间大于500毫秒,使用putIfAbsent方法向map中添加记录,若是存在此key的记录,则返回value,不进行插入,若是不存在则插入,插入的时候返回的值为null(put方法与putIfAbsent方法都会把旧的值返回,不同之处是当key存在,put方法会进行覆盖,而putIfAbsent不会进行覆盖)。已经存在此key的值,把原有的AtomicInteger值使用线程安全的方式次数加1,不存在则插入的AtomicInteger值就是1。
最终调用XxlJobTrigger.trigger()执行调度方法,看下trigger()方法源码:
public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {// load data//加载任务数据XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (jobInfo == null) {logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);return;}//有新设置的执行参数,则进行参数的覆盖if (executorParam != null) {jobInfo.setExecutorParam(executorParam);}//调度失败重试次数int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();//执行器分组信息XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressList//有设置新的执行器地址,则进行覆盖if (addressList!=null && addressList.trim().length()>0) {group.setAddressType(1);group.setAddressList(addressList.trim());}// sharding param//分片参数int[] shardingParam = null;if (executorShardingParam!=null){String[] shardingArr = executorShardingParam.split("/");if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {shardingParam = new int[2];shardingParam[0] = Integer.valueOf(shardingArr[0]);shardingParam[1] = Integer.valueOf(shardingArr[1]);}}//路由策略是分片广播if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()&& shardingParam==null) {//分片广播,则需要向所有的注册器都进行调用for (int i = 0; i < group.getRegistryList().size(); i++) {//处理调度processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {//非分片广播if (shardingParam == null) {shardingParam = new int[]{0, 1};}//处理调度processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}
trigger方法只是做一下调用的前置处理,根据任务id查询出任务,若是有新的执行参数,则覆盖xxl_job_info表中配置的参数;查询出执行器分组信息,获取到有哪些执行器可以调用;参数涉及到分片信息,所以默认创建只有一个分片的数组,索引为0;若是此任务配置的路由策略是分片广播,则所有的执行器都要执行任务,根据有多少个执行器来确定分为多少片,调用执行器的时候,传递当前执行器是第几个分片,总共有多少个分片(执行器在处理任务对应的具体方法时,处理的逻辑为:先查询这次任务涉及到的总记录数,需要按某个字段进行排序,然后用总记录数除以总的执行器数,得到每个执行器处理的平均执行数,最后一个执行器的执行数量为总记录数-平均执行数乘以(执行器数量-1),然后可以根据当前执行器所属的分片数,来查询到此执行器需要处理的记录范围,例如mysql的limit start,end语句,start为分片索引*平均执行数,end为执行数量)。广播分片执行器处理举例说明:
//获取当前分片序号int shardIndex = XxlJobHelper.getShardIndex();//获取总分片数int shardTotal = XxlJobHelper.getShardTotal();//总记录条数--查询数据库int targetTotal = xxService.getTargetTotal();//查询记录的起始位置int start = shardIndex;//查询记录的offsetint end = 1;//总记录大于分片数量if(targetTotal > shardTotal){//计算每个分片平均处理的数量int avgTotal = targetTotal/shardTotal;//数据查询起始start = shardIndex*avgTotal;//数据的offsetend = avgTotal;//最后一个执行器if(shardIndex == shardTotal-1) {//总数量-前面几个执行器执行的数量end = targetTotal-(avgTotal*(shardTotal-1);} }//使用start和end去查询需要处理的数据//拼接mysql语句:limit start end//对查询到的记录进行处理...
任务的处理调用processTrigger方法,看下processTrigger源码:
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){// param//阻塞处理策略ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy//路由策略ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy//当路由策略为分片广播,组织分片参数String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;// 1、save log-id//新建一条日志信息,添加上执行时间XxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());// 2、init trigger-param//构造调度任务的参数TriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init address//获取到执行器的地址String address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {//路由为分片广播的方式,根据分片的索引index获取到执行器的地址if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {//根据配置的路由策略,从注册执行器列表,匹配出此次调度的执行器地址,路由策略包含随机、故障转移、忙碌转移等routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {//执行器地址为空,异常routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executorReturnT<String> triggerResult = null;//执行器地址不为空,在进行调度if (address != null) {//执行调度triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger info//构造调度执行器的调度-日志信息StringBuffer triggerMsgSb = new StringBuffer();triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":").append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());if (shardingParam != null) {triggerMsgSb.append("("+shardingParam+")");}triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>").append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");// 6、save log trigger-info//设置日志其他相关字段,供调度失败再次调度时候使用jobLog.setExecutorAddress(address);jobLog.setExecutorHandler(jobInfo.getExecutorHandler());jobLog.setExecutorParam(jobInfo.getExecutorParam());jobLog.setExecutorShardingParam(shardingParam);jobLog.setExecutorFailRetryCount(finalFailRetryCount);//jobLog.setTriggerTime();//设置调度结果状态值jobLog.setTriggerCode(triggerResult.getCode());//设置调度信息jobLog.setTriggerMsg(triggerMsgSb.toString());//更新日志记录XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());}
processTrigger方法新建了一条执行日志记录,插入到xxl_job_log表中;构造调度任务的参数实体TriggerParam,作为调用执行器的传递参数封装;获取执行此次任务的执行器地址,若是路由策略为分片广播的方式,根据分片的索引index从执行器集合中获取执行器地址,若是其他路由方式,则使用对应的策略获取到执行器地址,例如随机、故障转移、忙碌转移等,故障转移就是上面介绍的beat调用,忙碌转移就是上面介绍的idleBeat调用;然后拿着构造好的参数类TriggerParam、匹配到的执行器地址address,执行任务的调用runExecutor;等到执行器返回调用结果后,构造调度执行器的详细日志信息,把调用的结果状态值和日志信息更新到一开始插入的日志表中,此日志记录着调用需要的所有参数,这样在进行失败重调的时候参数直接从日志记录中取。注意这里执行器只是返回是否调度成功,不返回具体是否执行成功,执行情况是等待执行器主动调用调度中心进行反馈。
任务的具体调度是runExecutor方法,看下runExecutor源码:
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ReturnT<String> runResult = null;try {//根据执行器地址获取调度执行器的客户端ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);//执行调度方法runResult = executorBiz.run(triggerParam);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");runResultSB.append("<br>address:").append(address);runResultSB.append("<br>code:").append(runResult.getCode());runResultSB.append("<br>msg:").append(runResult.getMsg());runResult.setMsg(runResultSB.toString());return runResult;}
runExecutor方法根据执行器地址获取到调用客户端,然后执行此客户端的run方法,即调用到ExecutorBizClient类的run方法,此run方法发起远程http调用,到此一次完整的调用流程走完。run方法的源码:
//调用执行器运行@Overridepublic ReturnT<String> run(TriggerParam triggerParam) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);}
4.kill调用
kill调用是调度中心对执行器正在处理的任务进行停止处理,当任务调度成功后,还没有收到执行器的反馈,调度中心可以调用kill来停止执行器的执行。在web 中的操作界面如下:
点击终止任务对应的接口为/joblog/logKill,看下logKill接口的源码:
@RequestMapping("/logKill")@ResponseBodypublic ReturnT<String> logKill(int id){// base checkXxlJobLog log = xxlJobLogDao.load(id);XxlJobInfo jobInfo = xxlJobInfoDao.loadById(log.getJobId());//任务不存在if (jobInfo==null) {return new ReturnT<String>(500, I18nUtil.getString("jobinfo_glue_jobid_unvalid"));}//调用执行器没有成功if (ReturnT.SUCCESS_CODE != log.getTriggerCode()) {return new ReturnT<String>(500, I18nUtil.getString("joblog_kill_log_limit"));}// request of killReturnT<String> runResult = null;try {//根据执行器地址获取对应的客户端ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(log.getExecutorAddress());//调用方法runResult = executorBiz.kill(new KillParam(jobInfo.getId()));} catch (Exception e) {logger.error(e.getMessage(), e);runResult = new ReturnT<String>(500, e.getMessage());}//停止任务执行成功if (ReturnT.SUCCESS_CODE == runResult.getCode()) {//把执行器处理状态设置为失败log.setHandleCode(ReturnT.FAIL_CODE);log.setHandleMsg( I18nUtil.getString("joblog_kill_log_byman")+":" + (runResult.getMsg()!=null?runResult.getMsg():""));log.setHandleTime(new Date());//更新日志信息(执行结果),完成任务XxlJobCompleter.updateHandleInfoAndFinish(log);return new ReturnT<String>(runResult.getMsg());} else {return new ReturnT<String>(500, runResult.getMsg());}}
调用执行器停止执行之前,先根据日志id获取到日志记录,校验当前任务是否还在,校验执行器是否调度成功,只有调度成功才能进行停止执行的调用;根据执行器地址获取对应的客户端,执行器地址在调度成功的时候,已经写到日志记录中,从日志记录中取出执行地址即可,调用停止方法;停止任务执行成功,把日志记录的执行器处理状态设置为失败,更新日志信息(执行结果),完成任务。调用kill方法即调用到ExecutorBizClient类的kill方法,此kill方法发起远程http调用。kill方法的源码:
//停止执行器的执行@Overridepublic ReturnT<String> kill(KillParam killParam) {//发起远程http调用return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, killParam, String.class);}
5.log调用
log调用为执行器web页面查看某个日志对应的执行器产生的执行日志文件,进行一次任务调度,调度中心侧会产生一条日志记录存放到xxl_job_log表中,执行器处理任务的时候,会产生自己的执行日志,执行器的处理日志存在于部署执行器的某个目录下。此log调用就是根据日志id和调度时间从执行器中加载执行日志文件,执行日志文件的存储规则为:默认的目录/调度时间(例2023-03-04)/日志id.log。
web界面中查看执行日志的入口截图:
显示执行日志的界面及发起请求日志的接口截图:
执行器中存放执行日志文件的截图:
加载执行日志文件的接口为logDetailCat接口,看下logDetailCat源码:
//查询具体执行明细,需要调用到此任务具体执行的那台机器去获取@RequestMapping("/logDetailCat")@ResponseBodypublic ReturnT<LogResult> logDetailCat(String executorAddress, long triggerTime, long logId, int fromLineNum){try {ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(executorAddress);ReturnT<LogResult> logResult = executorBiz.log(new LogParam(triggerTime, logId, fromLineNum));// is endif (logResult.getContent()!=null && logResult.getContent().getFromLineNum() > logResult.getContent().getToLineNum()) {XxlJobLog jobLog = xxlJobLogDao.load(logId);//处理状态为200表示执行完成,500表示执行异常(也结束),0位未执行完成if (jobLog.getHandleCode() > 0) {//日志已经加载完成logResult.getContent().setEnd(true);}}return logResult;} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<LogResult>(ReturnT.FAIL_CODE, e.getMessage());}}
根据执行器地址获取操作执行器调用的客户端,向执行器发起请求,传递调度时间、日志id,这样执行器端就可以拼接出此日志对应的执行器文件地址,然后加载文件,并根据传入的起始行数进行加载;传入起始行数是因为查看执行日志的时候,执行器未必已经处理完成,当执行器未处理完成,则前端使用定时器去调用logDetailCat接口,并传递新的起始行;若是执行器已经处理完成,则返回给前端一个end为true的标识,前端不再调用logDetailCat接口。调用log方法即调用到ExecutorBizClient类的log方法,此log方法发起远程http调用。log方法的源码:
//查询执行器产生的执行日志信息@Overridepublic ReturnT<LogResult> log(LogParam logParam) {//发起http调用return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class);}
三.接收请求处理
调度中心会接收执行器的请求,接收哪些请求可以从调度中心的api类JobApiController中看出,JobApiController类位于com.xxl.job.admin.controller包下,包含的接口处理为callback、registry、registryRemove三类。看下JobApiController类源码:
@Controller
@RequestMapping("/api")
public class JobApiController {@Resourceprivate AdminBiz adminBiz; //具体类型为AdminBizImpl/*** api** @param uri* @param data* @return*/@RequestMapping("/{uri}")@ResponseBody@PermissionLimit(limit=false)public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {// valid//只支持post方式if (!"POST".equalsIgnoreCase(request.getMethod())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri==null || uri.trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}//调用调度中心若是配置了token,则需要从执行器的request中获取到token值,在执行器传递token值时使用XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN作为key传递,此处也按这个key取值if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mapping//根据接口的结尾匹配具体是哪个方法//callback方法:执行器回调调度中心的方法if ("callback".equals(uri)) {List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);return adminBiz.callback(callbackParamList);} else if ("registry".equals(uri)) {//registry方法:执行器向调度中心进行在线注册的方法,默认30秒调用一次,心跳注册机制RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registry(registryParam);} else if ("registryRemove".equals(uri)) {//registryRemove方法:执行器结束,在bean销毁的时候会调用销毁执行器在线记录的方法RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registryRemove(registryParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}}
}
JobApiController类使用接口后缀通配符的方式接收执行器的调用,只支持post方式,调用调度中心若是配置了token,则需要从执行器的request请求中获取到token值,在执行器传递token值时使用XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN作为key传递,此处也按这个key取值;根据接口的结尾匹配具体是哪个方法。
处理具体请求都是AdminBiz接口类,使用注入的方式进行引入,说明引入的是AdminBiz接口的具体实现类,并且此类需要注册为bean对象。实现AdminBiz接口的类是AdminBizClient和AdminBizImpl,只有AdminBizImpl类注册为bean对象(使用@Service修饰,@Service注解再使用@Component),所以此处注入的AdminBiz具体类是AdminBizImpl。
1.callback请求
callback请求是执行器把执行结果反馈给调度中心的方法,调度中心收到此反馈之后更新日志记录的执行状态、执行消息,结束一次任务调度,若是有子任务,则进行子任务的调度。看下callback请求的入口源码:
//callback方法:执行器回调调度中心的方法if ("callback".equals(uri)) {//接收参数,转成list集合List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);return adminBiz.callback(callbackParamList);}
接收参数,转成HandleCallbackParam实体,调用callback方法。看下AdminBizImpl类的callback方法源码:
//响应执行器反馈的方法@Overridepublic ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {//JobCompleteHelper调度中心独有处理类return JobCompleteHelper.getInstance().callback(callbackParamList);}
处理反馈的具体类是JobCompleteHelper,看下JobCompleteHelper类的callback源码:
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {//使用反馈线程池处理反馈记录callbackThreadPool.execute(new Runnable() {@Overridepublic void run() {//循环处理所有的执行结果for (HandleCallbackParam handleCallbackParam: callbackParamList) {ReturnT<String> callbackResult = callback(handleCallbackParam);logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",(callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);}}});return ReturnT.SUCCESS;}
使用反馈线程池处理反馈记录,循环处理所有的执行结果,调用类里的callback方法,看下callback源码:
//调度中心对执行器反馈的处理private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {// valid log item//检查日志信息XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());if (log == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");}if (log.getHandleCode() > 0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc}// handle msg//在原有执行日志的基础上追加上反馈日志StringBuffer handleMsg = new StringBuffer();if (log.getHandleMsg()!=null) {handleMsg.append(log.getHandleMsg()).append("<br>");}if (handleCallbackParam.getHandleMsg() != null) {handleMsg.append(handleCallbackParam.getHandleMsg());}// success, save log//设置执行时间、执行结果状态值、执行日志log.setHandleTime(new Date());log.setHandleCode(handleCallbackParam.getHandleCode());log.setHandleMsg(handleMsg.toString());//完成此任务,并更新日志的状态值,有子任务再调用子任务XxlJobCompleter.updateHandleInfoAndFinish(log);return ReturnT.SUCCESS;}
此方法先检查日志信息,在原有执行日志的基础上追加上反馈日志,设置执行时间、执行结果状态值、执行日志,完成此任务,并更新日志的状态值,有子任务再调用子任务。看下完成任务更新日志的方法updateHandleInfoAndFinish源码:
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {// finish// 完成此任务,有子任务再调用子任务finishJob(xxlJobLog);// text最大64kb 避免长度过长if (xxlJobLog.getHandleMsg().length() > 15000) {xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) );}// fresh handle//更新日志的执行器处理情况信息return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);}
完成此任务,有子任务再调用子任务;更新日志的执行器处理情况信息。看下完成任务,若是有子任务再调度子任务的finishJob源码:
private static void finishJob(XxlJobLog xxlJobLog){// 1、handle success, to trigger child jobString triggerChildMsg = null;//任务执行完成if (XxlJobContext.HANDLE_CODE_SUCCESS == xxlJobLog.getHandleCode()) {XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId());//查询此任务下是否还有子任务,有子任务则执行子任务if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) {triggerChildMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>";//子任务id使用逗号拼接,此处使用逗号进行分割String[] childJobIds = xxlJobInfo.getChildJobId().split(",");//循环调用子任务执行for (int i = 0; i < childJobIds.length; i++) {//子任务id合法int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1;if (childJobId > 0) {//执行子任务,调用类型为父类调用JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null);ReturnT<String> triggerChildResult = ReturnT.SUCCESS;// add msgtriggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"),(i+1),childJobIds.length,childJobIds[i],(triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")),triggerChildResult.getMsg());} else {triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"),(i+1),childJobIds.length,childJobIds[i]);}}}}if (triggerChildMsg != null) {xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg );}}
若是任务执行完成,查询此任务下是否还有子任务,有子任务则执行子任务,子任务id使用逗号拼接,此处使用逗号进行分割,子任务id合法,执行子任务,调用类型为父类调用。web页面中设置子任务的截图:
2.registry请求
registry请求为执行器启动或者心跳机制(默认30秒)向调度中心注册在线情况的行为,这样调度中心在进行调度的时候才能够知道哪些执行器是在线的,只有在线的执行器才能响应调度。看下registry请求的入口源码:
else if ("registry".equals(uri)) {//registry方法:执行器向调度中心进行在线注册的方法,默认30秒调用一次,心跳注册机制RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registry(registryParam);}
接收参数,转成RegistryParam实体,调用registry方法。看下AdminBizImpl类的registry方法源码:
@Overridepublic ReturnT<String> registry(RegistryParam registryParam) {//JobRegistryHelper调度中心独有处理类return JobRegistryHelper.getInstance().registry(registryParam);}
处理反馈的具体类是JobRegistryHelper,看下JobRegistryHelper类的registry源码:
//响应执行器注册的方法public ReturnT<String> registry(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async execute//使用注册或删除注册的线程池执行registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {//先调用更新的方法int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret < 1) {//没有记录,则进行插入XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// fresh//此方法是一个空方法,刷新执行器的最新在线情况已经由registryMonitorThread守护线程执行freshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;}
使用注册或删除注册的线程池执行此次任务,执行器心跳注册信息存放在xxl_job_registry表中,注册的时候若是此执行器对应的记录已经存在,则更新它的最新注册时间,若是不存在,则进行插入。执行器属于哪个任务组是根据它的registry_key字段值来确定的,registryMonitorThread守护线程默认30秒执行一次清理xxl_job_registry表,把大于90秒没有过注册的执行器记录删除,然后把xxl_job_registry最新的执行器记录按registry_key分组,拼接执行器地址集合,根据registry_key值等于xxl_job_group表app_name字段为条件,把最新执行器地址更新到address_list字段中,address_list是对某个分组下当前在线执行器地址的合集,使用逗号拼接。web页面显示任务组当前在线的执行器集合截图:
3.registryRemove请求
registryRemove请求为执行器下线的时候,告知调度中心自己下线了,需要从注册表中移除此执行器,否则调度的时候没法给出响应。看下registryRemove请求的入口源码:
else if ("registryRemove".equals(uri)) {//registryRemove方法:执行器结束,在bean销毁的时候会调用销毁执行器在线记录的方法RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registryRemove(registryParam);}
接收参数,转成RegistryParam实体,调用registryRemove方法。看下AdminBizImpl类的registryRemove方法源码:
@Overridepublic ReturnT<String> registryRemove(RegistryParam registryParam) {//JobRegistryHelper调度中心独有处理类return JobRegistryHelper.getInstance().registryRemove(registryParam);}
处理移除的具体类是JobRegistryHelper,看下JobRegistryHelper类的registryRemove源码:
//响应执行器删除注册的方法public ReturnT<String> registryRemove(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async execute//使用注册或删除注册的线程池执行registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {//从执行器注册表中删除记录int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());if (ret > 0) {//此方法是一个空方法,刷新执行器的最新在线情况已经由registryMonitorThread守护线程执行freshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;}
使用注册或删除注册的线程池执行此次处理,把执行器的注册记录从xxl_job_registry删除,在下次registryMonitorThread守护线程处理的时候,会重新组织执行器在线集合,到时候此执行器会被剔除。
四.程序结束销毁处理
程序启动的时候初始化了4个线程池、6个守护线程,当程序结束的时候,需要销毁这些资源。资源销毁的入口类为XxlJobAdminConfig。
1.销毁入口类
XxlJobAdminConfig类是销毁的入口类是因为它实现了DisposableBean接口,重写了destroy方法。当bean被销毁的时候,会执行destroy方法,可以从这里作为销毁处理的入口。看下XxlJobAdminConfig类销毁相关的源码:
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {// 实现DisposableBean接口,重写它的bean销毁方法@Overridepublic void destroy() throws Exception {xxlJobScheduler.destroy();}
}
XxlJobAdminConfig被@Component注解修饰,在程序启动的时候,会加载到spring容器中,此时XxlJobAdminConfig就是一个bean对象,实现了DisposableBean接口,即bean销毁的接口,就是当程序停止的时候,会销毁bean,这样在销毁XxlJobAdminConfig的时候,可以从这里进行资源的清理。
2.资源销毁处理
销毁资源调用到XxlJobScheduler类的destroy(),看下destroy方法源码:
//销毁调度中心资源public void destroy() throws Exception {// stop-schedule//停止预读线程、环形处理任务线程JobScheduleHelper.getInstance().toStop();// admin log report stop//停止日志报表守护线程JobLogReportHelper.getInstance().toStop();// admin lose-monitor stop//销毁处理执行器反馈的线程池、停止没法完成任务监听的守护线程JobCompleteHelper.getInstance().toStop();// admin fail-monitor stop//停止监听失败任务再进行重试调度、发报警邮件的守护线程JobFailMonitorHelper.getInstance().toStop();// admin registry stop//销毁处理执行器注册或者删除的线程池、停止监听执行器是否在线的守护线程JobRegistryHelper.getInstance().toStop();// admin trigger pool stop//销毁处理任务调度的快、慢线程池JobTriggerPoolHelper.toStop();}
此destroy销毁方法就是为了销毁初始化init时创建的线程池、守护线程,下面逐个介绍下销毁过程。
(1)JobScheduleHelper.getInstance().toStop()
停止预读守护线程、环形处理任务守护线程,看下源码:
public void toStop(){// 1、stop schedule//停止守护线程的while条件scheduleThreadToStop = true;try {//休眠1秒TimeUnit.SECONDS.sleep(1); // wait} catch (InterruptedException e) {logger.error(e.getMessage(), e);}//中断线程if (scheduleThread.getState() != Thread.State.TERMINATED){// interrupt and waitscheduleThread.interrupt();try {scheduleThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}// if has ring data//是否还有未处理完的环形预处理任务boolean hasRingData = false;if (!ringData.isEmpty()) {for (int second : ringData.keySet()) {List<Integer> tmpData = ringData.get(second);if (tmpData!=null && tmpData.size()>0) {hasRingData = true;break;}}}//有未处理完的预读任务if (hasRingData) {try {//休眠8秒,让预读处理任务处理完成TimeUnit.SECONDS.sleep(8);} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}// stop ring (wait job-in-memory stop)//停止环形任务线程的while条件ringThreadToStop = true;try {//休眠一秒TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {logger.error(e.getMessage(), e);}//中断环形预处理线程if (ringThread.getState() != Thread.State.TERMINATED){// interrupt and waitringThread.interrupt();try {ringThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");}
停止守护线程的while条件,scheduleThreadToStop是被volatile修饰的,被volatile修饰的字段有总线嗅探感知机制,当scheduleThreadToStop的值在某个线程中被改变时,会把改变的结果值及时写到主线程,然后其他引用了此变量的线程会感知到变化,并把自己副本的此变量值失效,重新读取最新的值。还有未处理完的预处理任务,则让程序休眠8秒,8秒已经足够处理预读任务了,因为预读的任务是5秒内的。
(2)JobLogReportHelper.getInstance().toStop()
停止日志报表守护线程,看下源码:
public void toStop(){//跳出while语句toStop = true;// interrupt and wait//中断线程logrThread.interrupt();try {//等待线程处理完成logrThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}
(3)JobCompleteHelper.getInstance().toStop()
销毁处理执行器回调的线程池、停止没法完成任务监听的守护线程,看下源码:
public void toStop(){//跳出while语句toStop = true;// stop registryOrRemoveThreadPool//销毁线程池callbackThreadPool.shutdownNow();// stop monitorThread (interrupt and wait)//中断线程monitorThread.interrupt();try {monitorThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}
(4)JobFailMonitorHelper.getInstance().toStop()
停止监听失败任务再进行重试调度、发报警邮件的守护线程,看下源码:
public void toStop(){//跳出while语句toStop = true;// interrupt and wait//中断线程monitorThread.interrupt();try {monitorThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}
(5)JobRegistryHelper.getInstance().toStop()
销毁处理执行器注册或者删除的线程池、停止监听执行器是否在线的守护线程,看下源码:
public void toStop(){//跳出while语句toStop = true;// stop registryOrRemoveThreadPool//销毁线程池registryOrRemoveThreadPool.shutdownNow();// stop monitir (interrupt and wait)//中断线程registryMonitorThread.interrupt();try {registryMonitorThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}
(6)JobTriggerPoolHelper.toStop()
销毁处理任务调度的快、慢线程池,看下源码:
public static void toStop() {helper.stop();}public void stop() {//triggerPool.shutdown();//销毁线程池fastTriggerPool.shutdownNow();slowTriggerPool.shutdownNow();logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");}
执行器
执行器是对任务的具体执行,是任务逻辑处理的具体实现,提供响应调度中心调度,也有主动向调度中心发起请求。执行器一般就是开发业务代码的系统,某些模块需要使用到定时器处理功能,项目引入xxl-job-core依赖,即可作为执行器开发使用。
一.程序启动初始化
程序启动后会做很多的资源初始化,初始化netty来监听某个端口,调度中心调用此netty的端口地址,即可与执行器建立连接。资源初始化的入口类为XxlJobSpringExecutor类,我们从这个类来看初始化过程。
1.初始化入口类
之所以说入口类为XxlJobSpringExecutor是因为我们在执行器侧配置xxl-job的配置文件时,使用XxlJobSpringExecutor实体来接收配置信息,并把XxlJobSpringExecutor注册为Bean对象,有了Bean对象,我们对资源的初始化即可从这里入手。来看下执行器的配置类源码:
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);//调度中心地址@Value("${xxl.job.admin.addresses}")private String adminAddresses;//token值@Value("${xxl.job.accessToken}")private String accessToken;//所属的执行器分组@Value("${xxl.job.executor.appname}")private String appname;//执行器地址@Value("${xxl.job.executor.address}")private String address;//ip@Value("${xxl.job.executor.ip}")private String ip;//netty监听的端口@Value("${xxl.job.executor.port}")private int port;//执行日志存放的目录@Value("${xxl.job.executor.logpath}")private String logPath;//执行日志最多存放天数@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;//注册xxlJobExecutor的bean@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}
}
此配置类接收了application.properties配置文件中的配置信息,创建了一个XxlJobSpringExecutor实体类,把接收到的配置信息都赋值到此实体类中,并把此XxlJobSpringExecutor实体注册成Bean对象。来看下XxlJobSpringExecutor实体源码:
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);// 实现了SmartInitializingSingleton接口(只适用于单列bean),在bean实例初始化完成后,会调用afterSingletonsInstantiated方法@Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository/*initJobHandlerRepository(applicationContext);*/// init JobHandler Repository (for method)//初始化任务方法,处理所有Bean中使用@XxlJob注解标识的方法initJobHandlerMethodRepository(applicationContext);// refresh GlueFactory//重新设置GlueFactory的类型为SpringGlueFactoryGlueFactory.refreshInstance(1);// super starttry {//调用到XxlJobExecutor类的start方法,对一些资源进行初始化super.start();} catch (Exception e) {throw new RuntimeException(e);}}// 实现DisposableBean接口,重写它的bean销毁方法@Overridepublic void destroy() {super.destroy();}private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext == null) {return;}// init job handler from method//从程序上下文中获取到所有的bean名称集合String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);//遍历bean集合for (String beanDefinitionName : beanDefinitionNames) {//根据bean名称从程序上下文获取到此bean对象Object bean = applicationContext.getBean(beanDefinitionName);Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBeantry {//对Bean对象进行方法过滤,查询到方法被XxlJob注解修饰,是则放到annotatedMethods集合中annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookup<XxlJob>() {@Overridepublic XxlJob inspect(Method method) {//判断方法被XxlJob注解修饰才返回return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}//当前遍历的bean没有被XxlJob注解修饰,则调过处理if (annotatedMethods==null || annotatedMethods.isEmpty()) {continue;}//循环处理当前Bean下被XxlJob修饰的方法for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {//执行的方法Method executeMethod = methodXxlJobEntry.getKey();//XxlJob注解类XxlJob xxlJob = methodXxlJobEntry.getValue();// regist//注册此任务处理器registJobHandler(xxlJob, bean, executeMethod);}}}// ---------------------- applicationContext ----------------------private static ApplicationContext applicationContext;//实现ApplicationContextAware接口,获取上下文,得到加载到spring容器中的所有bean对象@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {XxlJobSpringExecutor.applicationContext = applicationContext;}public static ApplicationContext getApplicationContext() {return applicationContext;}
}
XxlJobSpringExecutor类继承了XxlJobExecutor类,接收配置信息的字段都是定义在XxlJobExecutor类中的;XxlJobSpringExecutor类实现了ApplicationContextAware接口,重写它的setApplicationContext方法即可得到spring上下文;实现了SmartInitializingSingleton接口,重写它的afterSingletonsInstantiated方法,在bean实例初始化完成后,会调用afterSingletonsInstantiated方法,这个方法就是初始化的真正入口。
2.初始化处理任务的方法
调度中心添加任务的时候,需要指定此任务由执行器的哪个处理任务方法来执行,因为一个执行器里面可以定义多个处理任务的方法。调度中心web界面配置处理任务的方法截图:
执行器使用@XxlJob注解来修饰每一个处理任务的方法,@XxlJob注解提供三个可选配置,value值用于匹配调度中心创建任务时的JobHandler值,init是配置处理任务之前的初始方法,destroy是配置处理任务之后的销毁工作。看下@XxlJob源码:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface XxlJob {/*** jobhandler name*/String value();/*** init handler, invoked when JobThread init*/String init() default "";/*** destroy handler, invoked when JobThread destroy*/String destroy() default "";
}
要想在程序启动时,获取到配置了哪些处理任务的方法,即被@XxlJob修饰的方法,需要把配置任务的类设置成Bean对象,即使用@Component修饰,这样可以根据Bean对象来找方法内包含@XxlJob注解的方法,把这些方法保存到一个集合,当收到调度中心发来的执行任务命令时,可以从这个集合中找到它对应的方法,然后执行此方法。看下配置具体处理任务的源码:
@Component
public class SampleXxlJob {private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);/*** 1、简单任务示例(Bean模式)*/@XxlJob("demoJobHandler")public void demoJobHandler() throws Exception {XxlJobHelper.log("XXL-JOB, Hello World.");for (int i = 0; i < 5; i++) {XxlJobHelper.log("beat at:" + i);TimeUnit.SECONDS.sleep(2);}// default success}/*** 2、分片广播任务*/@XxlJob("shardingJobHandler")public void shardingJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);// 业务逻辑for (int i = 0; i < shardTotal; i++) {if (i == shardIndex) {XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);} else {XxlJobHelper.log("第 {} 片, 忽略", i);}}}/*** 生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑;*/@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")public void demoJobHandler2() throws Exception {XxlJobHelper.log("XXL-JOB, Hello World.");}public void init(){logger.info("init");}public void destroy(){logger.info("destroy");}
}
处理被@XxlJob修饰方法的入口为:XxlJobSpringExecutor类实现了SmartInitializingSingleton接口,重写它的afterSingletonsInstantiated方法,当Bean对象实例化初始化后会执行此方法,此方法内包含解析@XxlJob的方法initJobHandlerMethodRepository,看下此方法的源码:
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext == null) {return;}// init job handler from method//从程序上下文中获取到所有的bean名称集合String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);//遍历bean集合for (String beanDefinitionName : beanDefinitionNames) {//根据bean名称从程序上下文获取到此bean对象Object bean = applicationContext.getBean(beanDefinitionName);Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBeantry {//对Bean对象进行方法过滤,查询到方法被XxlJob注解修饰,是则放到annotatedMethods集合中annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookup<XxlJob>() {@Overridepublic XxlJob inspect(Method method) {//判断方法被XxlJob注解修饰才返回return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}//当前遍历的bean没有被XxlJob注解修饰,则调过处理if (annotatedMethods==null || annotatedMethods.isEmpty()) {continue;}//循环处理当前Bean下被XxlJob修饰的方法for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {//执行的方法Method executeMethod = methodXxlJobEntry.getKey();//XxlJob注解类XxlJob xxlJob = methodXxlJobEntry.getValue();// regist//注册此任务处理器registJobHandler(xxlJob, bean, executeMethod);}}}
此方法从程序上下文中获取到所有的bean名称集合,遍历bean集合,根据bean名称从程序上下文获取到此bean对象,对Bean对象进行方法过滤,查询到方法被XxlJob注解修饰的记录,放到annotatedMethods集合中,循环处理当前Bean下被XxlJob修饰的方法,注册此任务处理方法。注册任务处理方法为registJobHandler,看下registJobHandler源码:
protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){if (xxlJob == null) {return;}//获取注解@XxlJob("demoJobHandler")配置的值String name = xxlJob.value();//make and simplify the variables since they'll be called several times later//获取此Bean对象的类Class<?> clazz = bean.getClass();//获取方法名称String methodName = executeMethod.getName();if (name.trim().length() == 0) {throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");}//判断是否已经有名称为name值的@XxlJobif (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");}//方法关闭安全检查executeMethod.setAccessible(true);// init and destroyMethod initMethod = null;Method destroyMethod = null;//注解XxlJob是否有配置init属性if (xxlJob.init().trim().length() > 0) {try {//通过反射机制获取到init方法initMethod = clazz.getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");}}//注解XxlJob是否有配置destroy属性if (xxlJob.destroy().trim().length() > 0) {try {//通过反射机制获取到destroy方法destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");}}// registry jobhandler// 把此被XxlJob注解修饰的方法注册到任务处理器中,new MethodJobHandler创建一个任务处理器方法registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}
此方法获取注解@XxlJob配置的value值,此值与调度中心的JobHandler对应,一个执行器内此value是唯一的,检查是否配置了init和destory属性,若是配置了,则使用Bean的反射机制获取到具体的方法。对于解析到的这些值,使用MethodJobHandler实体来存放他们,然后把此处理器添加到一个map集合中,key为@XxlJob配置的value值,value为创建的MethodJobHandler实体。看下具体注册的registJobHandler源码:
//job处理器集合,key:@XxlJob注解的value值,value:此任务执行的对象,包含Bean对象,执行的方法、初始方法、销毁方法private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);//把任务添加处理器集合中,后续当需要处理某个@XxlJob定义的任务时,直接从jobHandlerRepository集合按key取出,直接调用它的执行方法即可return jobHandlerRepository.put(name, jobHandler);}
使用ConcurrentMap来存储这些处理方法,当执行器被调度的时候,根据任务配置的JobHandler为key从此map中获取到具体的处理类。
3.初始化执行日志目录
在afterSingletonsInstantiated()方法调用了初始其他资源的方法super.start(),因为初始化入口类XxlJobSpringExecutor继承了XxlJobExecutor,所以此start方法调用到XxlJobExecutor类的方法。看下start方法源码:
public void start() throws Exception {// init logpath//初始化执行日志目录XxlJobFileAppender.initLogPath(logPath);// init invoker, admin-client//初始化操作调度中心的客户端initAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThread//初始化清除日志文件的守护线程,清除周期为1天,按配置的保留文件天数进行过期文件的清除JobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThread//初始化调度反馈线程,若反馈阻塞队列有值,则进行反馈,并把反馈结果写入日志文件中;若是反馈失败,则把记录写入到反馈失败日志中;初始化一个重试失败反馈的线程进行失败心跳重试反馈TriggerCallbackThread.getInstance().start();// init executor-server//初始化netty服务,监听端口的调用情况,做出响应处理;把当前执行器注册到调度中心中,初始化一个注册线程,并指定时间进行心跳调用注册方法initEmbedServer(address, ip, port, appname, accessToken);}
start方法包含了很多其他的初始化方法,需要的参数logPath、adminAddresses等是在创建XxlJobSpringExecutor类时已经赋上值,这些值的来源就是application.properties中配置的值。看下初始化执行日志目录的initLogPath方法源码:
private static String logBasePath = "/data/applogs/xxl-job/jobhandler";private static String glueSrcPath = logBasePath.concat("/gluesource");//初始化执行日志目录public static void initLogPath(String logPath){// initif (logPath!=null && logPath.trim().length()>0) {logBasePath = logPath;}// mk base dir//创建目录File logPathDir = new File(logBasePath);//不存在则进行创建if (!logPathDir.exists()) {logPathDir.mkdirs();}logBasePath = logPathDir.getPath();// mk glue dir//创建目录,File glueBaseDir = new File(logPathDir, "gluesource");//不存在则进行创建if (!glueBaseDir.exists()) {glueBaseDir.mkdirs();}glueSrcPath = glueBaseDir.getPath();}public static String getLogPath() {return logBasePath;}public static String getGlueSrcPath() {return glueSrcPath;}
使用logBasePath记录执行日志的存储目录,glueSrcPath记录像shell模式这样的,需要把运行脚本组织为bash.sh这样的文件的目录。若是用户配置了logPath目录,则进行logBasePath的覆盖,没有配置使用默认的目录。
执行日志存储截图:
运行文件存储截图:
4.初始化操作调度中心的客户端
执行器需要与调度中心进行交互,调度中心可能是集群部署的,所以需要使用集合存放所有的调度中心客户端,执行器调用调度中心需要知道地址、token值,所以客户端类存放了调度中心的地址和token,token值需要与调度中心配置的保存一致,否则进行调度的时候,验证不通过。这里把操作调度中心的客户端都初始化好,后面需要调用的时候直接取客户端发起请求即可。看下初始化调度中心的方法initAdminBizList源码:
//存放所有的调用调度中心的客户端private static List<AdminBiz> adminBizList;//初始化连接调度中心的客户端private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {//调度中心的地址使用逗号进行分割if (adminAddresses!=null && adminAddresses.trim().length()>0) {for (String address: adminAddresses.trim().split(",")) {if (address!=null && address.trim().length()>0) {//创建调用调度中心的客户端AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);if (adminBizList == null) {adminBizList = new ArrayList<AdminBiz>();}adminBizList.add(adminBiz);}}}}public static List<AdminBiz> getAdminBizList(){return adminBizList;}
adminAddresses是application.properties配置文件中定义的调度中心地址,当调度中心为集群时,使用逗号连接,此处使用逗号就行分割,创建所有的调度中心客户端类AdminBizClient,并把它存放到list集合中。
5.初始化清除日志文件的守护线程
执行日志文件支持设置保存天数,此处的天数需要与调度中心设置的日志记录保存天数一致,否则调度中心查看某个日志记录的执行日志时会查不到。这里说下日志记录是调度中心存放在表xxl_job_log中的记录,执行日志是执行器执行任务产生的任务文件,存放在执行器部署服务器的目录下。看下JobLogFileCleanThread类下初始化的start方法源码:
public void start(final long logRetentionDays){// limit min value//日志存留天数需要大于3才有效果if (logRetentionDays < 3 ) {return;}//创建一个线程localThread = new Thread(new Runnable() {@Overridepublic void run() {//没有停止线程while (!toStop) {try {// clean log dir, over logRetentionDays//获取磁盘下的日志文件集合File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();if (childDirs!=null && childDirs.length>0) {// today//获取今天的时间Calendar todayCal = Calendar.getInstance();todayCal.set(Calendar.HOUR_OF_DAY,0);todayCal.set(Calendar.MINUTE,0);todayCal.set(Calendar.SECOND,0);todayCal.set(Calendar.MILLISECOND,0);Date todayDate = todayCal.getTime();for (File childFile: childDirs) {// valid//判断是否为目录,日志文件是按日期存放的,例:2023-02-25/1.logif (!childFile.isDirectory()) {continue;}//文件需要包含-if (childFile.getName().indexOf("-") == -1) {continue;}// file create dateDate logFileCreateDate = null;try {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");//把以日期格式命名的文件名转成日期格式logFileCreateDate = simpleDateFormat.parse(childFile.getName());} catch (ParseException e) {logger.error(e.getMessage(), e);}if (logFileCreateDate == null) {continue;}//当前时间-文件创建时间的差值(毫秒)大于logRetentionDays指定的日志保留天数,logRetentionDays * (24 * 60 * 60 * 1000)是把天数转成毫秒if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {//递归删除此文件夹及它下面的文件FileUtil.deleteRecursively(childFile);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {//休眠一天时间TimeUnit.DAYS.sleep(1);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy.");}});//设置为守护线程,localThread.setDaemon(true);localThread.setName("xxl-job, executor JobLogFileCleanThread");//启动守护线程localThread.start();}
配置的日志保存天数要大于3才生效,创建一个休眠周期为1天的守护线程循环进行判断清理,每次清理时获取磁盘下的日志文件集合,日志文件的存放格式为:2023-02-25/1.log,在具体日志的外层加了一个日志的日期文件夹,这里只处理文件夹,判断文件夹名为日期格式(包含-),然后把文件夹名称转成日期,使用当前日期减去文件夹转成的日期,若是差值大于需要保存的天数,则递归删除此文件夹及它下面的文件。
6.初始化向调度中心反馈执行结果的守护线程
调度中心调用执行器的执行方法后,执行器没有立即进行任务的执行,先给调用中心返回调度成功,把此调度任务添加到队列中,当任务被执行线程取出来执行结束后,把执行结果放到反馈队列中,此守护线程就是把反馈队列中的反馈信息反馈到调度中心。看下TriggerCallbackThread类初始化反馈执行结果守护线程的start源码:
public void start() {// valid//检查是否有调用调度中心的客户端if (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");return;}// callback//创建反馈回调线程triggerCallbackThread = new Thread(new Runnable() {@Overridepublic void run() {// normal callback//只要不停止,就一直循环获取while(!toStop){try {//使用take方法出队,take和put方法不互斥,读写分离,分别使用takeLock/putLock进行加锁HandleCallbackParam callback = getInstance().callBackQueue.take();//回调参数类不为空,则处理回调if (callback != null) {// callback list param//定义一个集合接收callBackQueue队列中的所有回调类List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();//drainTo方法为把callBackQueue队列中的所有值转移到新的callbackParamList集合中,经过此方法调用,此时callBackQueue为空,callbackParamList接收到队列里面的所有元素int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);//一开始出队列的的对象也要加入到集合中callbackParamList.add(callback);// callback, will retry if errorif (callbackParamList!=null && callbackParamList.size()>0) {//处理回调doCallback(callbackParamList);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}// last callback//当停止反馈线程后,把当前callBackQueue反馈队列里面还没有反馈完的记录进行反馈try {List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);if (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");}});//设置为守护线程triggerCallbackThread.setDaemon(true);triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");//启动线程triggerCallbackThread.start();
}
没有调用调度中心客户端则不进行线程创建,反馈守护线程没有休眠周期,一直循环从反馈队列callBackQueue中取值,当有反馈记录,直接执行反馈,当停止反馈线程后,把当前callBackQueue反馈队列里面还没有反馈完的记录进行反馈。
7.初始化重试反馈失败记录的守护线程
执行器向调度中心反馈执行结果的时候,可能网络问题或者调度中心重启了,导致反馈失败,反馈失败的记录执行器会放到反馈失败目录文件下,存放反馈失败文件地址定义源码:
//回调失败日志目录private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator).concat("callbacklog").concat(File.separator);//回调失败日志文件名private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log");
在执行器配置的日志目录下加上callbacklog目录,日志文件名为.log的格式。看下TriggerCallbackThread类初始化此守护线程的源码:
public void start() {// valid//检查是否有调用调度中心的客户端if (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");return;}// retry//重试回调上面回调线程triggerCallbackThread调用失败的记录,按休眠时间进行循环triggerRetryCallbackThread = new Thread(new Runnable() {@Overridepublic void run() {while(!toStop){try {//重试反馈一开始进行反馈并失败的记录retryFailCallbackFile();} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {//默认休眠30秒TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");}});//设置为守护线程triggerRetryCallbackThread.setDaemon(true);//启动线程triggerRetryCallbackThread.start();}
没有调用调度中心客户端则不进行线程创建,重试失败反馈守护默认休眠30秒,循环调用重试失败反馈方法retryFailCallbackFile,看下它的源码:
//重试反馈一开始进行反馈并失败的记录private void retryFailCallbackFile(){// valid//检查存放失败反馈的文件目录是否为空File callbackLogPath = new File(failCallbackFilePath);if (!callbackLogPath.exists()) {return;}//callbackLogPath是一个目录,若是一个文件,则删除此文件if (callbackLogPath.isFile()) {callbackLogPath.delete();}//callbackLogPath是一个目录、并且此目录下有文件才放行if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) {return;}// load and clear file, retry//遍历处理回调错误日志for (File callbaclLogFile: callbackLogPath.listFiles()) {//把文件转成byte数组byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);// avoid empty file//若是空文件则删除if(callbackParamList_bytes == null || callbackParamList_bytes.length < 1){callbaclLogFile.delete();continue;}//把byte数组转成list集合,一开始就是把list集合转成byte数组存放到文件中的,现在就是反向转一下List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);//删除文件callbaclLogFile.delete();//调用反馈的方法doCallback(callbackParamList);}}
检查存放失败反馈的文件目录是否为空,不为空则遍历处理反馈失败日志,若是空文件则删除,从日志文件中读出byte数组,把byte数组转成list集合,一开始就是把list集合转成byte数组存放到文件中的,现在就是反向转一下,拿到记录后,调用反馈方法。
8.初始化守护线程并创建netty服务监听端口调用+处理调用的线程池
执行器使用netty服务来接收调度中心的调用,netty是非常优秀的异步、基于事件驱动的网络应用框架。netty收到调用,使用线程池来处理调用的具体实现。来看下初始化的入口initEmbedServer方法源码:
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {// fill ip port//监听端口,有配置则使用配置的端口,没有配置,则查找一个没有被占用的端口port = port>0?port: NetUtil.findAvailablePort(9999);//执行器的ip,有配置则使用配置,没有配置,则获取本地ip地址ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();// generate address//若是本地机器的地址没有配置,则使用上面获取到的本地ip、本地端口组织address;有配置则使用配置的地址if (address==null || address.trim().length()==0) {//得到ip:端口的连接信息String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null//组织address地址信息address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}// accessToken//没有加token信息,则输出警告日志信息if (accessToken==null || accessToken.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}// start//创建一个基于netty的监听服务器,监听port端口embedServer = new EmbedServer();//启动此监听服务器,创建一个守护线程,创建一个netty服务,监听port端口,创建一个自定义处理器来处理netty服务被调用时的响应处理类;//使用线程池来处理netty的服务调用,根据服务请求的uri来具体处理调用请求,处理结束后,向调用方响应处理结果//把当前执行器注册到调度中心中embedServer.start(address, port, appname, accessToken);}
执行器自己的ip和netty的端口支持在application.properties配置文件中配置,若是没有配置ip,则支持获取自身的ip地址,支持ipv4和ipv6的网络;netty监听的端口若是没有配置,则从9999到65535之间使用线性探测法找到一个没有被占用的端口。注意netty的端口和执行器项目的端口不是一回事。创建一个EmbedServer类,调用它的start方法,看下start方法源码:
public void start(final String address, final int port, final String appname, final String accessToken) {//创建执行器处理具体调用的类executorBiz = new ExecutorBizImpl();//创建一个守护线程thread = new Thread(new Runnable() {@Overridepublic void run() {// param//bossGroup线程组用于监听客户端的连接EventLoopGroup bossGroup = new NioEventLoopGroup();//workerGroup线程组用于处理连接,读写事件EventLoopGroup workerGroup = new NioEventLoopGroup();//创建线程池处理netty服务的调用ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");}});try {// start server//创建nettyServerBootstrap bootstrap = new ServerBootstrap();//设置netty属性bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) //使用非阻塞的服务端信道类型.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception { //处理连接、读写事件的处理类channel.pipeline()//使用addLast向netty的channel信道中注册handler.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle 读空闲时长、写空闲时长、读写空闲时长、单位.addLast(new HttpServerCodec())//服务器的编解码器遵从http协议,HttpServerCodec类已经包含了HttpRequestDecoder(解码器), HttpResponseEncoder(编码器).addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL ;netty提供的http消息聚合器,通过它可以把HttpMessage和HttpContent聚合成一个完整的FullHttpRequest或FullHttpResponse.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));//自定义处理器,当监听的端口被调用时,使用自定义处理器进行具体的实现}}).childOption(ChannelOption.SO_KEEPALIVE, true);//启用心跳保活机制,Tcp会监控连接是否有效,当连接处于空闲状态,超过了2个小时,本地的tcp会发送一个数据包给远程的Socket,如果远程没有响应,则Tcp会持续尝试11分钟,直到响应为止,若是12分钟还是没有响应,则tcp会尝试关闭此Socket连接// bind//绑定监听的信道端口ChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registry//把当前执行器注册到调度中心中startRegistry(appname, address);// wait util stop//防止代码运行结束调用finally中定义的关闭netty的方法,一直阻塞着,防止进程结束future.channel().closeFuture().sync();} catch (InterruptedException e) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);} finally {// stoptry {//关闭netty的线程组workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}}}});//设置为守护线程,用户线程结束-》守护线程结束-》jvm结束thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave//启动线程thread.start();}
start方法里面创建executorBiz,当有调度的时候,由此类来进行具体的任务实现;创建一个守护线程,线程的run方法里面创建了bizThreadPool线程池,此线程池用来处理调度任务;创建netty服务,并绑定netty服务监听的端口号,创建EmbedHttpServerHandler类来处理netty被调用的处理,绑定HttpObjectAggregator使用FullHttpRequest来接收参数。看下netty核心处理类EmbedHttpServerHandler的源码:
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);private ExecutorBiz executorBiz; //处理netty调用的实现类private String accessToken; //token值private ThreadPoolExecutor bizThreadPool;//线程池public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {this.executorBiz = executorBiz;this.accessToken = accessToken;this.bizThreadPool = bizThreadPool;}//继承了SimpleChannelInboundHandler,则重写他的channelRead0方法,当netty监听的端口被调用时,会调用到自定义处理类的channelRead0方法@Overrideprotected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {// request parse//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);//获取请求的参数信息String requestData = msg.content().toString(CharsetUtil.UTF_8);//获取请求的结尾地址String uri = msg.uri();//请求方式HttpMethod httpMethod = msg.method();//复用tcp连接boolean keepAlive = HttpUtil.isKeepAlive(msg);//从请求头中根据key获取token信息String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);// invoke//使用线程池执行此任务bizThreadPool.execute(new Runnable() {@Overridepublic void run() {// do invoke//执行请求处理Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);// to json//执行结果转成json格式字符串String responseJson = GsonTool.toJson(responseObj);// write response//把执行结果向调用端响应writeResponse(ctx, keepAlive, responseJson);}});}//执行请求处理private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// valid//只支持post方式if (HttpMethod.POST != httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}//结尾地址为空if (uri == null || uri.trim().length() == 0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}//比对请求方传递的token值是否正确if (accessToken != null&& accessToken.trim().length() > 0&& !accessToken.equals(accessTokenReq)) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingtry {//根据请求的结尾地址,调用对应的方法进行处理switch (uri) {case "/beat"://调度中心进行心跳检测return executorBiz.beat();case "/idleBeat"://调度中心检测执行器是否忙碌IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);case "/run"://调度中心调度执行器执行任务TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);case "/kill"://调度中心调度执行器停止任务处理KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);case "/log"://调度中心查询执行日志信息LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);default:return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}}/*** write response*/private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {// write response//响应的结果值FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson)//设置响应头部格式response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); // HttpHeaderValues.TEXT_PLAIN.toString()response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());if (keepAlive) {response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);}//使用信道的上下文向请求方写入、刷洗响应信息ctx.writeAndFlush(response);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);ctx.close();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {ctx.channel().close(); // beat 3N, close if idlelogger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");} else {super.userEventTriggered(ctx, evt);}}}
EmbedHttpServerHandler类是netty自定义处理类,因为netty的信道绑定了new HttpObjectAggregator,所以使用FullHttpRequest来接收参数;继承了SimpleChannelInboundHandler,则重写他的channelRead0方法,当netty监听的端口被调用时,会调用到自定义处理类的channelRead0方法;使用bizThreadPool线程池处理此请求,使用FullHttpRequest来获取请求的参数、uri、token等信息,对token值进行校验,匹配uri地址,对每个uri应该做的响应由executorBiz来处理,把响应结果使用信道的上下文向请求方写入、刷洗响应信息。
9.初始化注册执行器在线情况的守护线程
在初始化netty结束后,有一行调用执行器注册到调度中心的代码:startRegistry(appname, address),此方法会创建一个守护线程,看下startRegistry源码:
public void startRegistry(final String appname, final String address) {// start registry//把当前执行器注册到调度中心中ExecutorRegistryThread.getInstance().start(appname, address);}
执行注册的具体类是ExecutorRegistryThread,看下它的start源码:
public void start(final String appname, final String address){// valid//校验执行器名称不能为空if (appname==null || appname.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");return;}//校验调用调度中心的客户端不能为空if (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");return;}//创建注册线程registryThread = new Thread(new Runnable() {@Overridepublic void run() {// registry//当停止的之后,才跳出while循环while (!toStop) {try {//构造注册请求参数RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);//遍历所有的调用调度中心的客户端,向所有的调度中心注册上此执行器for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//使用具体的实现类AdminBizClient调用注册方法ReturnT<String> registryResult = adminBiz.registry(registryParam);//执行器调用调度中心的注册方法成功if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {//默认休眠30秒,继续向调度中心中注册当前执行器在线的信息,心跳的方式TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (InterruptedException e) {if (!toStop) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}}// registry remove//删除执行器注册信息,当程序停止或者调用了stop方法之后,会跳出上面的while循环try {//构造删除注册请求参数RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);//遍历所有的调用调度中心的客户端,向所有的调度中心删除此执行器for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//使用具体的实现类AdminBizClient调用删除执行器的方法ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");}});//设置为守护线程registryThread.setDaemon(true);registryThread.setName("xxl-job, executor ExecutorRegistryThread");//启动线程registryThread.start();}
registryThread注册守护线程默认休眠30秒,循环向调度中心执行注册方法,告诉调度中心自己还在线。当线程停止即toStop为true时,向调度中心发起移除注册的请求,即告诉调度中心自己下线了。
10.初始化资源汇总说明图
二.主动发起请求
执行器需要与调度中心进行交互,执行器主动发起请求包含的方法可以从它的客户端类AdminBizClient看出,此类存在xxl-job的公共核心xxl-job-core工程中,目录结构为com.xxl.job.core.biz.client,从类里面可以看到包含callback、registry、registryRemove三个方法。看下AdminBizClient的源码:
/*** admin api test* 执行器-》调用调度中心的客户端,供执行器使用* @author xuxueli 2017-07-28 22:14:52*/
public class AdminBizClient implements AdminBiz {public AdminBizClient() {}public AdminBizClient(String addressUrl, String accessToken) {this.addressUrl = addressUrl;this.accessToken = accessToken;// validif (!this.addressUrl.endsWith("/")) {this.addressUrl = this.addressUrl + "/";}}private String addressUrl ;private String accessToken;private int timeout = 3;//此方法为执行器-》调度中心的反馈方法@Overridepublic ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);}//此方法为执行器-》调度中心的注册方法,把当前执行器注册到调度中心中@Overridepublic ReturnT<String> registry(RegistryParam registryParam) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);}//此方法为执行器-》调度中心的删除执行器方法,把当前执行器从调度中心注册列表中删除@Overridepublic ReturnT<String> registryRemove(RegistryParam registryParam) {//发起http远程调用return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);}}
此客户端类提供了3个调用调度中心的方法,当方法被调用时,会拼接远程调度中心的接口地址,进行远程调用。有多少个调度中心就有多少个调用客户端类,在程序初始化的时候,这些调度中心客户端类已经初始化好,放到list集合中,当需要调用的时候,直接遍历这些客户端集合,对每个客户端执行调用方法即可。获取调用客户端集合并遍历调用的样例源码:
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//使用具体的实现类AdminBizClient调用删除执行器的方法ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}//获取调用客户端public static List<AdminBiz> getAdminBizList(){return adminBizList;}
1.callback调用
callback调用是执行器把任务的执行结果值反馈给调度中心,在源码中的调用位置为:反馈守护线程查询到反馈队列里面有值或者反馈失败重试守护线程检测到有反馈失败的记录时,发起的反馈调用。看下反馈的调用入口源码:
while(!toStop){try {//使用take方法出队,take和put方法不互斥,读写分离,分别使用takeLock/putLock进行加锁HandleCallbackParam callback = getInstance().callBackQueue.take();//回调参数类不为空,则处理回调if (callback != null) {// callback list param//定义一个集合接收callBackQueue队列中的所有回调类List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();//drainTo方法为把callBackQueue队列中的所有值转移到新的callbackParamList集合中,经过此方法调用,此时callBackQueue为空,callbackParamList接收到队列里面的所有元素int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);//一开始出队列的的对象也要加入到集合中callbackParamList.add(callback);// callback, will retry if errorif (callbackParamList!=null && callbackParamList.size()>0) {//处理反馈doCallback(callbackParamList);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}
守护线程从callBackQueue反馈队列里面取值,当有需要反馈的记录,则取出callBackQueue队列里面的所有值,调用反馈方法,处理反馈的方法为doCallback,看下它的源码:
private void doCallback(List<HandleCallbackParam> callbackParamList){boolean callbackRet = false;// callback, will retry if error//遍历调用调度中心的客户端for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//每个客户端都进行调用ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);//回调成功if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");callbackRet = true;//有一个调度中心执行成功了,则退出反馈调用,只用调用到一个就行break;} else {//回调失败callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);}} catch (Exception e) {//回调错误callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());}}//回调有失败的情况if (!callbackRet) {//把这些失败回调都追加到回调失败日志中appendFailCallbackFile(callbackParamList);}}
此方法遍历所有调用调度中心的客户端,执行反馈信息调用,若是反馈失败,则把这些反馈记录写到反馈失败磁盘目录下,把反馈的日志信息写入到执行器日志文件中。这里分析一下写入到执行日志的方法callbackLog源码:
private void callbackLog(List<HandleCallbackParam> callbackParamList, String logContent){for (HandleCallbackParam callbackParam: callbackParamList) {//根据日期、日志id创建日志文件的存放目录(使用日期格式:xxxx-xx-xx),得到日志文件名logId.logString logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId());//使用InheritableThreadLocal记录日志文件名的线程内部变量XxlJobContext.setXxlJobContext(new XxlJobContext(-1,null,logFileName,-1,-1));XxlJobHelper.log(logContent);}}
每一条反馈记录都对应着一次任务调度,使用任务的调度时间+日志id即可组织出执行日志的结尾目录和文件名。创建一个XxlJobContext实体接收文件名,此XxlJobContext类提供一个setXxlJobContext的方法,看下此方法的源码:
//使用InheritableThreadLocal来作为线程内部变量,与ThreadLocal相比InheritableThreadLocal可以在子线程中调用到父线程的线程内部变量private static InheritableThreadLocal<XxlJobContext> contextHolder = new InheritableThreadLocal<XxlJobContext>(); // support for child thread of job handler)public static void setXxlJobContext(XxlJobContext xxlJobContext){contextHolder.set(xxlJobContext);}public static XxlJobContext getXxlJobContext(){return contextHolder.get();}
设置进来的XxlJobContext对象使用InheritableThreadLocal修饰,把此变量设置为线程变量,这样在此线程处理的后面,直接调用get方法,即可获取到这个线程前面流程设置的XxlJobContext对象,是线程隔离的。看下它写入日志的方法XxlJobHelper.log(logContent)源码:
public static boolean log(String appendLogPattern, Object ... appendLogArguments) {//按格式进行占位符号的替代FormattingTuple ft = MessageFormatter.arrayFormat(appendLogPattern, appendLogArguments);//获取到日志信息String appendLog = ft.getMessage();/*appendLog = appendLogPattern;if (appendLogArguments!=null && appendLogArguments.length>0) {appendLog = MessageFormat.format(appendLogPattern, appendLogArguments);}*///获取调用者的堆栈信息,可以获取到调用者的类名callInfo.getClassName()、方法名callInfo.getMethodName()StackTraceElement callInfo = new Throwable().getStackTrace()[1];//处理日志详情return logDetail(callInfo, appendLog);}
按格式进行占位符号的替代,获取到日志信息,获取调用者的堆栈信息,可以获取到调用者的类名callInfo.getClassName()、方法名callInfo.getMethodName(),在追加日志的时候需要带上调用的类名等信息。看下处理日志详情的方法logDetail源码:
private static boolean logDetail(StackTraceElement callInfo, String appendLog) {//从InheritableThreadLocal中获取到内部线程变量值,获取到上面设置的日志文件信息XxlJobContext xxlJobContext = XxlJobContext.getXxlJobContext();if (xxlJobContext == null) {return false;}/*// "yyyy-MM-dd HH:mm:ss [ClassName]-[MethodName]-[LineNumber]-[ThreadName] log";StackTraceElement[] stackTraceElements = new Throwable().getStackTrace();StackTraceElement callInfo = stackTraceElements[1];*///组织日志信息StringBuffer stringBuffer = new StringBuffer();stringBuffer.append(DateUtil.formatDateTime(new Date())).append(" ").append("["+ callInfo.getClassName() + "#" + callInfo.getMethodName() +"]").append("-").append("["+ callInfo.getLineNumber() +"]").append("-").append("["+ Thread.currentThread().getName() +"]").append(" ").append(appendLog!=null?appendLog:"");String formatAppendLog = stringBuffer.toString();// appendlog//获取日志文件名称String logFileName = xxlJobContext.getJobLogFileName();if (logFileName!=null && logFileName.trim().length()>0) {//把日志信息追加到某个日志文件名下XxlJobFileAppender.appendLog(logFileName, formatAppendLog);return true;} else {logger.info(">>>>>>>>>>> {}", formatAppendLog);return false;}}
通过XxlJobContext.getXxlJobContext()即可获取之前设置进去的线程变量,从此变量里面获取到日志的文件名,组织日志信息,把日志信息追加到此日志文件下。看下追加日志的方法appendLog源码:
public static void appendLog(String logFileName, String appendLog) {// log fileif (logFileName==null || logFileName.trim().length()==0) {return;}File logFile = new File(logFileName);//日志文件xx.log不存在,则进行创建if (!logFile.exists()) {try {//创建文件logFile.createNewFile();} catch (IOException e) {logger.error(e.getMessage(), e);return;}}// logif (appendLog == null) {appendLog = "";}appendLog += "\r\n";// append file content//把日志信息追加到日志文件中FileOutputStream fos = null;try {fos = new FileOutputStream(logFile, true);fos.write(appendLog.getBytes("utf-8"));fos.flush();} catch (Exception e) {logger.error(e.getMessage(), e);} finally {if (fos != null) {try {fos.close();} catch (IOException e) {logger.error(e.getMessage(), e);}}}}
当反馈失败的时候,需要把失败信息写入到存放反馈失败的目录文件中,看下处理反馈失败的源码appendFailCallbackFile:
//把这些失败回调都追加到反馈失败日志中private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){// validif (callbackParamList==null || callbackParamList.size()==0) {return;}// append file//将对象转成byte数组byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);//创建反馈错误日志文件-以时间为名称File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));//若是此文件已经存在if (callbackLogFile.exists()) {/*for (int i = 0; i < 100; i++) {callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) ));if (!callbackLogFile.exists()) {break;}}*///使用时间+序号的方式获取到唯一的文件名int fileIndex = 0;while(true) {callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(fileIndex++)) ));if (!callbackLogFile.exists()) {break;}}}//把错误反馈日志文件写入到错误日志中FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);}
把反馈失败的集合转成byte数组,创建反馈失败日志文件-以时间为名称,若是同一时间下有重复,则加上序号处理,把反馈失败日志信息写入到失败日志文件中。这样在重试反馈失败记录的守护线程下次执行的时候,就能加载到此失败记录,并进行重试反馈。
2.registry调用
registry调用为执行器调用调度中心更新执行器最新在线时间,调度中心收到请求后会更新xxl_job_registry表的update_time字段,这样在调度中心进行定时清理离线执行器时,不会把此执行器删除。源码中的调用位置为:
while (!toStop) {try {//构造注册请求参数RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);//遍历所有的调用调度中心的客户端,向所有的调度中心注册上此执行器for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//使用具体的实现类AdminBizClient调用注册方法ReturnT<String> registryResult = adminBiz.registry(registryParam);//执行器调用调度中心的注册方法成功if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {//默认休眠30秒,继续向调度中心中注册当前执行器在线的信息,心跳的方式TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (InterruptedException e) {if (!toStop) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}}
执行器默认休眠30秒循环调用注册方法。
3.registryRemove调用
当执行器下线后,需要通知调度中心从xxl_job_registry表删除此执行器的注册记录,这样在守护线程下次检查某个任务组的在线执行器时,能够及时的把此执行器剔除。源码中的使用位置:
//删除执行器注册信息,当程序停止或者调用了stop方法之后,会跳出上面的while循环try {//构造注册请求参数RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);//遍历所有的调用调度中心的客户端,向所有的调度中心删除此执行器for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {//使用具体的实现类AdminBizClient调用删除执行器的方法ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");}
当注册执行器的守护线程被停止时,就会跳出while循环,然后执行移除执行器注册的方法。
三.接收请求处理
执行器接收调用中心的请求是用netty来监听的,具体接收哪些请求可以从EmbedServer类的内部类EmbedHttpServerHandler中查看,此类存在xxl-job的公共核心xxl-job-core工程中,目录结构为com.xxl.job.core.server,具体的请求可以从内部类EmbedHttpServerHandler的process方法看出包含beat、idleBeat、run、kill、log五个方法。看下process方法源码:
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// valid//只支持post方式if (HttpMethod.POST != httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}//结尾地址为空if (uri == null || uri.trim().length() == 0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}//比对请求方传递的token值是否正确if (accessToken != null&& accessToken.trim().length() > 0&& !accessToken.equals(accessTokenReq)) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingtry {//根据请求的结尾地址,调用对应的方法进行处理switch (uri) {case "/beat"://调度中心进行心跳检测return executorBiz.beat();case "/idleBeat"://调度中心检测执行器是否忙碌IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);case "/run"://调度中心调度执行器执行任务TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);case "/kill"://调度中心调度执行器停止任务处理KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);case "/log"://调度中心查询执行日志信息LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);default:return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}}
方法只支持post方式,具体处理类executorBiz的具体实现是ExecutorBizImpl类,在程序启动初始化时已经创建。
1.beat请求
beat请求是调度中心确认执行器是否在线的接口,若是能正常调通,则表示执行器在线,若是调不通则表示执行器已经离线,是调度中心在使用故障转移路由模式时会调用。看下beat源码:
@Overridepublic ReturnT<String> beat() {return ReturnT.SUCCESS;}
直接返回成功,能调通就是成功。
2.idleBeat请求
idleBeat请求是调度中心确实执行器是否忙碌的接口,当执行器还在处理此任务的上一次调度,那这次调度就不选择此执行器处理,这是调度中心使用忙碌转移路由模式时会调用。看下idleBeat源码:
//响应调度中心确认执行器是否忙碌@Overridepublic ReturnT<String> idleBeat(IdleBeatParam idleBeatParam) {// isRunningOrHasQueueboolean isRunningOrHasQueue = false;//根据任务id获取处理此任务的线程类JobThread jobThread = XxlJobExecutor.loadJobThread(idleBeatParam.getJobId());//线程类存在,且正在运行或者还有未处理完的任务队列if (jobThread != null && jobThread.isRunningOrHasQueue()) {//标记为trueisRunningOrHasQueue = true;}//为true,表示此执行器现在正在处理这个任务的上一次调度if (isRunningOrHasQueue) {return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue.");}return ReturnT.SUCCESS;}
判断是否有线程正在执行此任务,若是有,则返回忙碌,若是没有,则返回成功。根据任务id获取对应线程类和是否忙碌的介绍放在run请求中。
3.run请求
run请求是执行器响应调度中心运行任务的接口,对具体的任务进行执行。看下run源码:
//响应调度中心执行任务@Overridepublic ReturnT<String> run(TriggerParam triggerParam) {// load old:jobHandler + jobThread//根据任务id获取任务线程类,从jobThreadRepository中获取,key:任务id,value:任务线程类JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());//从任务线程类获取绑定的任务处理器IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;String removeOldReason = null;// valid:jobHandler + jobThread//获取任务的运行模式GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());//bean模式if (GlueTypeEnum.BEAN == glueTypeEnum) {// new jobhandler//获取执行器任务handler:使用@XxlJob修饰的值,从集合jobHandlerRepository中获取,key:@XxlJob注解的value值,value:此任务执行的对象,包含Bean对象,执行的方法、初始方法、销毁方法//程序启动的时候,所有被@XxlJob修饰的处理类都添加到jobHandlerRepository集合中了IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());// valid old jobThread//上一次此任务id绑定的任务处理器不等于此次执行的任务处理器if (jobThread!=null && jobHandler != newJobHandler) {// change handler, need kill old threadremoveOldReason = "change jobhandler or glue type, and terminate the old job thread.";//线程设置为nulljobThread = null;//线程绑定的处理器也设置为nulljobHandler = null;}// valid handler//给任务处理器重新赋值if (jobHandler == null) {jobHandler = newJobHandler;if (jobHandler == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");}}} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof GlueJobHandler&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change handler or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {try {IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());}}} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof ScriptJobHandler&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change script or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}// valid handlerif (jobHandler == null) {jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));}} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");}// executor block strategy//任务id对应的线程不为空if (jobThread != null) {//获取阻塞处理策略ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);//丢弃后续调度if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// discard when running//线程正在运行或队列里面还有任务,则丢弃此次任务调度if (jobThread.isRunningOrHasQueue()) {return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {//覆盖之前调度// kill running jobThread//线程正在运行或队列里面还有任务,则覆盖之前调度if (jobThread.isRunningOrHasQueue()) {removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();//任务线程设置为nulljobThread = null;}} else {// just queue trigger}}// replace thread (new or exists invalid)//经过上面的校验处理,此任务id对应的任务线程类还是为空if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}// push data to queue//把任务放到调度队列里面ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);//返回调度结果return pushResult;}
当执行器收到调度任务请求时,会看能不能复用线程处理类,新建的线程处理类会放到map集合中,key:任务id,value:任务线程类,看下根据任务id获取任务线程类的方法loadJobThread源码:
//存放任务、任务线程类集合,key:任务id,value:任务线程类private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();//根据任务id加载此任务的处理线程类public static JobThread loadJobThread(int jobId){return jobThreadRepository.get(jobId);}
使用ConcurrentMap来存放创建的线程处理类JobThread,使用之前先看能否复用,已经创建过并且还存在则复用。JobThread类绑定了它的处理类handler,当能通过任务id获取到JobThread类,则可以获取到handler类,handler类是程序启动时解析@XxlJob注解初始化好的处理任务方法的封装。看下JobThread类的部分源码:
public class JobThread extends Thread{private static Logger logger = LoggerFactory.getLogger(JobThread.class);private int jobId; //任务idprivate IJobHandler handler;//处理器private LinkedBlockingQueue<TriggerParam> triggerQueue; //存放执行任务的阻塞队列private Set<Long> triggerLogIdSet; //去重调度日志private volatile boolean toStop = false;private String stopReason;private boolean running = false; // if running jobprivate int idleTimes = 0; // 停止线程的中断标识
}
JobThread继承了Thread,可以使用到线程的特性,进行方法的运行;绑定了任务id、处理器,使用阻塞队列存放待处理的任务。
获取任务的运行模式,例如bean模式是使用@XxlJob来进行具体任务的实现,shell模式是可执行文件的方式来实现,根据运行模式来创建IJobHandler。IJobHandler是一个抽象父类,它的子类包含3个,截图如下:
我们这里就分析bean这种模式,其他模式类似,看下处理bean这种模式的源码:
if (GlueTypeEnum.BEAN == glueTypeEnum) {// new jobhandler//获取执行器任务handler:使用@XxlJob修饰的值,从集合jobHandlerRepository中获取,key:@XxlJob注解的value值,value:此任务执行的对象,包含Bean对象,执行的方法、初始方法、销毁方法//程序启动的时候,所有的被@XxlJob修饰的处理类都添加到jobHandlerRepository集合中了IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());// valid old jobThread//上一次此任务id绑定的任务处理器不等于此次执行的任务处理器if (jobThread!=null && jobHandler != newJobHandler) {// change handler, need kill old threadremoveOldReason = "change jobhandler or glue type, and terminate the old job thread.";//线程设置为nulljobThread = null;//线程绑定的处理器也设置为nulljobHandler = null;}// valid handler//给任务处理器重新赋值if (jobHandler == null) {jobHandler = newJobHandler;if (jobHandler == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");}}}
根据任务的jobHandler值,从程序启动就初始化好的被@XxlJob修饰的处理方法中,匹配到此jobHandler对应的handler,看下获取handler的源码:
//job处理器集合,key:@XxlJob注解的value值,value:此任务执行的对象,包含Bean对象,执行的方法、初始方法、销毁方法 private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);//把任务添加处理器集合中,后续当需要处理某个@XxlJob定义的任务时,直接从jobHandlerRepository集合用key取出,直接调用它的执行方法即可return jobHandlerRepository.put(name, jobHandler);}
从jobHandlerRepository集合中根据key获取handler。
当能复用JobThread,但是上传任务绑定的执行handler不等于这次的handler,就是说上次此任务设置的jobHandler为test1这次设置的为test2,对于这样的情况,需要重新创建一个新的JobThread类,使jobThread等于null,后面会判断jobThread为null则进行JobThread创建。
当能复用JobThread,说明可能上次的调度还没有处理完成,此时需要根据配置的阻塞处理策略来进行处理。当策略为丢弃后续调度,且任务线程正在运行或者任务队列里面还有未处理的任务,则不执行这次调度,丢弃此次调度,优先保证上次调度执行完成;当策略为覆盖之前调度,且任务线程正在运行或者任务队列里面还有未处理的任务,则使jobThread为null,在重新创建JobThread的时候,会对任务id之间绑定的JobThread进行中断,这样就能达到覆盖之前调度。源码中的体现:
if (jobThread != null) {//获取阻塞处理策略ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);//丢弃后续调度if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// discard when running//线程正在运行或队列里面还有任务,则丢弃此次任务调度if (jobThread.isRunningOrHasQueue()) {return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {//覆盖之前调度// kill running jobThread//线程正在运行或队列里面还有任务,则覆盖之前调度if (jobThread.isRunningOrHasQueue()) {removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();//任务线程设置为nulljobThread = null;}} else {// just queue trigger}}
经过上面的校验,若是没法复用JobThread,需要新创建一个,来看创建JobThread的源码:
if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}
创建方法在XxlJobExecutor类的registJobThread,看下它的源码:
//存放任务、任务线程集合,key:任务id,value:任务线程类private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();//注册一个任务线程public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread = new JobThread(jobId, handler);//启动线程,开始运行JobThread重写的run方法newJobThread.start();logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});//ConcurrentMap的put方法,当key重复的时候,会返回旧的值,但是会把新的值进行覆盖;putIfAbsent是key重复,则返回旧的值,但是不进行覆盖JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!//当新建的任务线程已经存在,则把原来的线程中断if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}
创建了一个JobThread类,并绑定了它的任务id和handler,JobThread类继承了Thread,调用start方法,即会执行JobThread类里的run方法。把创建的JobThread存放到map中,使用ConcurrentMap的put方法,当key重复的时候,会返回旧的值,但是会把新的值进行覆盖,当有旧值时,把旧的线程进行中断,这样就达到了阻塞处理策略为覆盖之前调度的需求。
处理好JobThread类后,往它的任务队列里面存此次的调度,源码为:
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
看下pushTriggerQueue源码方法:
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeat//调度日志id检验是否重复if (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}//日志id添加到集合中triggerLogIdSet.add(triggerParam.getLogId());//调度参数实体添加到调度队列中triggerQueue.add(triggerParam);return ReturnT.SUCCESS;}
任务存到任务队列triggerQueue中,这次run请求处理完成, 可以给执行器反馈调度结果。
因为JobThread调用了start方法,会执行它的run方法,看下run方法源码:
//线程调用start()方法后,会执行run方法@Overridepublic void run() {// inittry {//先执行初始化方法handler.init();} catch (Throwable e) {logger.error(e.getMessage(), e);}// execute//不停止线程则一直执行while(!toStop){//任务运行状态设置为falserunning = false;//次数加1idleTimes++;TriggerParam triggerParam = null;try {// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)//从阻塞队列里面移除队首元素,若是当前队列没有元素,则进行等待,等待时间为3秒triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);//获取到元素if (triggerParam!=null) {//标记任务为运行状态truerunning = true;//重置次数idleTimes = 0;//set集合中移除这个日志id,用于去重判断triggerLogIdSet.remove(triggerParam.getLogId());// log filename, like "logPath/yyyy-MM-dd/9999.log"//创建执行任务的文件目录名String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// init job context//把执行任务的变量对象设置为线程内部变量,后面取参数等操作的时候可以从这这里取XxlJobContext.setXxlJobContext(xxlJobContext);// execute//添加日志,会从上面设置的线程内部变量xxlJobContext中取到文件名称,然后追加上日志XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());//有设置任务超时时间if (triggerParam.getExecutorTimeout() > 0) {// limit timeout//创建一个任务线程Thread futureThread = null;try {//任务需要有返回值,所以使用CallableFutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {//使用子线程处理任务的时候,需要再设置一下线程变量,否则拿不到上面设置的线程变量// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);//执行处理器的方法,若是需要接收参数,可以使用XxlJobHelper.getJobParam方法获取,这个方法也是从线程内部变量XxlJobContext中获取的变量handler.execute();return true;}});futureThread = new Thread(futureTask);futureThread.start();//在给定的时间内需要处理完成,处理不完成,抛出超时异常Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle result//任务处理超时,给线程内部变量XxlJobContext的handleCode字段设置为502XxlJobHelper.handleTimeout("job execute timeout ");} finally {//中断线程futureThread.interrupt();}} else {//没有设置任务超时时间,直接调用// just executehandler.execute();}// valid execute handle dataif (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {//xxlJobContext.setHandleCode为500,并把执行错信息追加到xxlJobContext.setHandleMsgXxlJobHelper.handleFail("job handle result lost.");} else {String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)?tempHandleMsg.substring(0, 50000).concat("..."):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}//把日志信息追加到日志文件中,使用线程内部变量从XxlJobContext中获取到当前处理任务的日志目录,往日志目录中追加日志XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="+ XxlJobContext.getXxlJobContext().getHandleCode()+ ", handleMsg = "+ XxlJobContext.getXxlJobContext().getHandleMsg());} else {//次数大于30次,并且任务队列里面没有待处理的任务,则把次任务线程中断、删除if (idleTimes > 30) {if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}} catch (Throwable e) {if (toStop) {//把日志信息追加到日志文件中,使用线程内部变量从XxlJobContext中获取到当前处理任务的日志目录,往日志目录中追加日志XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}// handle resultStringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();//xxlJobContext.setHandleCode为500,并把执行错信息追加到xxlJobContext.setHandleMsgXxlJobHelper.handleFail(errorMsg);//把日志信息追加到日志文件中,使用线程内部变量从XxlJobContext中获取到当前处理任务的日志目录,往日志目录中追加日志XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {//调度参数不为空,说明进行过处理if(triggerParam != null) {// callback handler info//线程没有停止if (!toStop) {// commonm//向反馈队列中添加执行结果,反馈线程会向调度中心进行反馈TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// is killed//处理线程停止了,把反馈参数添加到反馈队列中TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job running, killed]" ));}}}}// callback trigger request in queue//当处理线程停止,而任务队列里面还有未处理完的任务,则向调度中心反馈执行失败信息while(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killed//向反馈线程的队列中加入反馈参数TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job not executed, in the job queue, killed.]"));}}// destroytry {//执行销毁方法handler.destroy();} catch (Throwable e) {logger.error(e.getMessage(), e);}logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());}
若是任务的处理方法配置了init,则执行init方法;从阻塞队列里面移除队首元素,若是当前队列没有元素,则进行等待,等待时间为3秒,因为是先启动的线程再往阻塞队列存的任务。当拿到要处理的任务,标记任务为运行状态true,重置次数idleTimes为0,当idleTimes大于30次,表示有30次没有获取到要处理的任务,时间已经大于90秒(每次取任务最多等3秒,30次,则最大为90秒),则中断此线程类。获取任务的执行文件地址,创建XxlJobContext对象接收参数,还是使用它的线程内部变量方式,对后续日志追加时能拿到日志文件地址。
当任务设置了执行超时时间,则使用FutureTask来创建一个任务,再创建一个内部线程,把XxlJobContext添加到子线程中,这也是为何要使用InheritableThreadLocal来修饰XxlJobContext而不是ThreadLocal的原因,InheritableThreadLocal可以在子线程中调用到父线程设置的内部变量,而ThreadLocal只能在一个线程内共享内部变量。使用FutureTask.get方法,设置给定的时间内需要处理完成,处理不完成,抛出超时异常。若是没有设置超时则进行正常的调用即可,把执行的结果写入到执行日志文件中。
当JobThread执行完任务,任务队列中已经没有待处理的任务了,空跑30次以上,则进行JobThread的销毁。源码为:
//次数大于30次,并且任务队列里面没有待处理的任务,则把次任务线程中断、删除if (idleTimes > 30) {if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}
调用删除JobThread的方法,源码为:
//移除某个任务的处理线程,并中断此线程的执行public static JobThread removeJobThread(int jobId, String removeOldReason){JobThread oldJobThread = jobThreadRepository.remove(jobId);if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();return oldJobThread;}return null;}
从map集合中删除此记录,并中断线程的运行。
当任务处理完成,需要往反馈队列里面存放反馈记录,这个存放动作是在finally中,看下源码:
finally {//调度参数不为空,说明进行过处理if(triggerParam != null) {// callback handler info//线程没有停止if (!toStop) {// commonm//向反馈线程中添加执行结果,反馈线程会向调度中心进行反馈TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// is killed//处理线程停止了,把反馈参数添加到反馈队列中TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job running, killed]" ));}}
}
当此处从任务队列中获取到值,即triggerParam不等于空,当线程没有停止,向反馈队列中添加执行结果,反馈线程会向调度中心进行反馈,若是线程已经停止了,则向反馈队列里面添加处理失败的标识。
若是线程被中断了,例如覆盖之前调度这样的阻塞策略,则会跳出while循环,而任务队列里面还有未处理完的任务,则把这些任务放到反馈队列中,并标记任务执行失败,看下处理的源码:
//当处理线程停止,而任务队列里面还有未处理完的任务,则向调度中心反馈执行失败信息while(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killed//向反馈线程的队列中加入回调参数TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_CODE_FAIL,stopReason + " [job not executed, in the job queue, killed.]"));}}
若是任务的处理方法配置了destroy,则执行destroy方法。此时再来看判断JobThread类是否在运行或者还有未处理完的任务方法isRunningOrHasQueue源码:
public boolean isRunningOrHasQueue() {//线程正在运行或者调度队列里面还有未处理完的任务return running || triggerQueue.size()>0;}
当任务运行时running会设置为true,triggerQueue为阻塞队列的任务集合。
这里分析下handler的execute方法为何能够执行到具体的方法,先看MethodJobHandler类部分源码:
public class MethodJobHandler extends IJobHandler {private final Object target; //Bean对象-包含XxlJob注解的对象private final Method method; //执行的方法private Method initMethod; //初始化方法private Method destroyMethod; //销毁方法public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {this.target = target;this.method = method;this.initMethod = initMethod;this.destroyMethod = destroyMethod;}//执行处理的方法,被@XxlJob修饰的方法@Overridepublic void execute() throws Exception {//方法中有定义参数,则执行的时候带有参数Class<?>[] paramTypes = method.getParameterTypes();if (paramTypes.length > 0) {method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types} else {method.invoke(target);}}
}
target字段存放的是Bean对象,即有@XxlJob修饰方法的整个类,并且这个类是注册为Bean对象的。method、initMethod、destroyMethod都是Method类型,是通过target这个bean使用反射生成的,看下部分生成源码:
//获取此Bean对象的classClass<?> clazz = bean.getClass();Method initMethod = null;//注解XxlJob是否有配置init属性if (xxlJob.init().trim().length() > 0) {try {//通过反射机制获取到init方法initMethod = clazz.getDeclaredMethod(xxlJob.init());//方法关闭安全检查initMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");}}
然后使用method.invoke反射的方法执行方法。
4.kill请求
kill请求是调度中心调用执行器停止任务的处理接口,用于对调度中心已经调度成功,执行器还没有执行反馈的任务进行停止处理。调用的源码位置:
//响应调度中心停止执行器执行某个任务@Overridepublic ReturnT<String> kill(KillParam killParam) {// kill handlerThread, and create new one//根据任务id获取线程JobThread jobThread = XxlJobExecutor.loadJobThread(killParam.getJobId());if (jobThread != null) {//执行删除线程的方法XxlJobExecutor.removeJobThread(killParam.getJobId(), "scheduling center kill job.");return ReturnT.SUCCESS;}return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread already killed.");}
根据任务id获取到处理此任务的JobThread类,然后调用停止此JobThread线程类的方法,并把它从map集合中删除。
5.log请求
log请求是调度中心查看执行器执行日志的接口,调度中心的日志表xxl_job_log记录着处理此次任务的执行器地址,当需要查看执行日志时,会调用此执行器进行响应。调用的源码:
//响应调度中心获取某个任务的执行日志@Overridepublic ReturnT<LogResult> log(LogParam logParam) {// log filename: logPath/yyyy-MM-dd/9999.logString logFileName = XxlJobFileAppender.makeLogFileName(new Date(logParam.getLogDateTim()), logParam.getLogId());//根据行数读取日志LogResult logResult = XxlJobFileAppender.readLog(logFileName, logParam.getFromLineNum());return new ReturnT<LogResult>(logResult);}
根据任务的调度时间和日志id组织日志文件目录和文件名,按起始行读取日志信息。
public static LogResult readLog(String logFileName, int fromLineNum){// valid log fileif (logFileName==null || logFileName.trim().length()==0) {return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true);}//根据日志目录创建文件File logFile = new File(logFileName);if (!logFile.exists()) {return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true);}// read fileStringBuffer logContentBuffer = new StringBuffer();int toLineNum = 0;LineNumberReader reader = null;try {//读取文件reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), "utf-8"));String line = null;while ((line = reader.readLine())!=null) {toLineNum = reader.getLineNumber(); // [from, to], start as 1//读取的行大于起始行才作为结果if (toLineNum >= fromLineNum) {//逐行拼接日志记录logContentBuffer.append(line).append("\n");}}} catch (IOException e) {logger.error(e.getMessage(), e);} finally {if (reader != null) {try {reader.close();} catch (IOException e) {logger.error(e.getMessage(), e);}}}// result//构造结果实体LogResult logResult = new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false);return logResult;}
四.程序结束销毁处理
程序启动的时候初始化5个守护线程、1个netty服务、1个map集合、1个list集合、1个线程池;处理任务时创建的线程,当程序结束的时候,需要销毁这些资源。资源销毁的入口类为XxlJobSpringExecutor。
1.销毁入口类
XxlJobSpringExecutor类是销毁的入口类是因为它实现了DisposableBean接口,重写了destroy方法。当bean被销毁的时候,会执行destroy方法,可以从这里作为销毁处理的入口。看下XxlJobSpringExecutor类销毁相关的源码:
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {// 实现DisposableBean接口,重写它的bean销毁方法@Overridepublic void destroy() {super.destroy();}
}//注册xxlJobExecutor的bean@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}
XxlJobSpringExecutor类创建后使用@Bean修饰,注册为bean对象,它还实现了DisposableBean接口,重写destroy销毁方法,最终调用到的是父类XxlJobExecutor的destroy方法。
2.资源销毁处理
销毁资源的处理类为XxlJobExecutor的destroy,看下它的源码:
//当bean销毁时调用此方法public void destroy(){// destroy executor-server//销毁netty服务,停止注册线程、向调度中心调用删除此执行器stopEmbedServer();// destroy jobThreadRepository//销毁任务线程if (jobThreadRepository.size() > 0) {for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");// wait for job thread push result to callback queueif (oldJobThread != null) {try {oldJobThread.join();} catch (InterruptedException e) {logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);}}}jobThreadRepository.clear();}//销毁记录着被@XxlJob修饰的方法集合jobHandlerRepository.clear();// destroy JobLogFileCleanThread//销毁周期为一天的清除文件的线程JobLogFileCleanThread.getInstance().toStop();// destroy TriggerCallbackThread//销毁执行器执行反馈的线程、执行器执行失败反馈的线程TriggerCallbackThread.getInstance().toStop();}
销毁资源的方法都在destroy中,现在逐个介绍下销毁过程。
(1)stopEmbedServer()
销毁netty服务,停止注册守护线程、向调度中心调用删除此执行器。看下源码:
//销毁netty服务private void stopEmbedServer() {// stop provider factoryif (embedServer != null) {try {//销毁netty服务embedServer.stop();} catch (Exception e) {logger.error(e.getMessage(), e);}}}
销毁netty服务,看它的stop方法源码:
//销毁netty服务public void stop() throws Exception {// destroy server thread//启动时候创建的线程还存活则进行中断if (thread != null && thread.isAlive()) {//中断线程thread.interrupt();}// stop registry//停止注册stopRegistry();logger.info(">>>>>>>>>>> xxl-job remoting server destroy success.");}
初始化netty服务时创建的守护线程,若是还存活则进行中断,当此守护线程中断后,通过此线程创建的netty服务也随之销毁,在finally方法中关闭netty资源,源码如下:
finally {// stoptry {//关闭netty的线程组workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}
}
看下停止注册服务的方法stopRegistry源码:
//停止注册public void stopRegistry() {// stop registry//调用执行器注册线程类的停止方法ExecutorRegistryThread.getInstance().toStop();}
停止方法最终由ExecutorRegistryThread的toStop执行,看下源码:
//执行器注册线程类的停止方法public void toStop() {//停止标识为true,则上面使用心跳注册机制的while会跳出循环,然后执行移除此执行器注册信息toStop = true;// interrupt and wait//中断注册线程if (registryThread != null) {registryThread.interrupt();try {registryThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}
当把注册守护线程循环条件toStop设置为true后,会跳出while循环,停止自动注册,然后会执行移除此执行器注册信息的接口,最后中断注册守护线程。
(2)jobThreadRepository清理
jobThreadRepository是一个存放处理任务线程类JobThread的map集合,每个JobThread都是一个线程类,都需要中断线程,然后清空map集合。
(3)jobHandlerRepository.clear()
jobHandlerRepository是一个存放任务处理器类IJobHandler的map集合,每个IJobHandler都是一个可执行的处理类,需要清空此map。
(4)JobLogFileCleanThread.getInstance().toStop()
销毁休眠周期为一天的清除文件守护线程,看下源码:
public void toStop() {//停止标识为true,则上面while的条件不满足,跳出循环toStop = true;if (localThread == null) {return;}// interrupt and wait//中断清除日志文件的守护线程localThread.interrupt();try {localThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}
(5)TriggerCallbackThread.getInstance().toStop()
销毁执行反馈的守护线程、执行失败反馈的守护线程,看下源码:
public void toStop(){//标识为true,则上面的while条件不符合,跳出循环,toStop = true;// stop callback, interrupt and wait//销毁回调线程if (triggerCallbackThread != null) { // support empty admin addresstriggerCallbackThread.interrupt();try {triggerCallbackThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}// stop retry, interrupt and wait//销毁重试调度反馈线程if (triggerRetryCallbackThread != null) {triggerRetryCallbackThread.interrupt();try {triggerRetryCallbackThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}
当toStop为true时,会跳出循环处理反馈消息的逻辑,若反馈队列里面还有未反馈的记录,则进行最后的反馈,中断线程使用了triggerCallbackThread.join(),就是得等线程运行结束。停止线程之前运行的最后反馈源码:
//当停止反馈线程后,把当前callBackQueue反馈队列里面还没有反馈完的记录进行反馈try {List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);if (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}
Redisson优化分布式锁问题
xxl-job为了防止集群部署调度中心时任务被重复加载,使用mysql的写锁机制进行控制,每次预加载任务时,都创建一个mysql连接,并对表xxl_job_lock加锁,加锁成功才进行任务的预读处理,这样就能保证集群环境下每次只会有一台机器加锁成功。预读任务加载完毕后,还需要释放锁,关闭mysql连接,非常浪费资源、加大数据库的压力。对于分布式加锁的问题,此处使用主流的Redisson分布式锁进行优化,优化的步骤如下:
(1)引入依赖
按着项目定义的规范,版本号都是定义在父工程中,所以在父项目的pom.xml中定义Redisson的版本号:
<redisson.version>3.16.4</redisson.version>
在调度中心xxl-job-admin的pom.xml中引入需要的Redisson、redis依赖,引入redis是为了获取到redis的连接信息:
<!-- redis --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- redisson --><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>${redisson.version}</version></dependency>
(2)配置redis连接信息
在xxl-job-admin项目的application.properties配置文件中添加redis的连接信息:
### redis
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.timeout=3000
spring.redis.lettuce.pool.max-active=20
spring.redis.lettuce.pool.max-idle=10
spring.redis.lettuce.pool.max-wait=-1
spring.redis.lettuce.pool.min-idle=0
(3)创建Redisson客户端
在xxl-job-admin项目的com.xxl.job.admin.core包下创建一个包redis,新建一个RedissonConfig配置类,定义Redisson客户端。此处使用的是redis单节点的方式,若是其它例如集群、哨兵模式,请参考官网进行创建:
@Configuration
@EnableConfigurationProperties(value = RedisProperties.class)
public class RedissonConfig {//创建redisson客户端,此时默认使用单节点@Beanpublic RedissonClient redissonClient(RedisProperties redisProperties){Config config = new Config();config.useSingleServer().setAddress("redis://"+redisProperties.getHost()+":"+redisProperties.getPort());config.useSingleServer().setDatabase(redisProperties.getDatabase());config.useSingleServer().setPassword(redisProperties.getPassword());config.useSingleServer().setTimeout((int)redisProperties.getTimeout().getSeconds()*1000);RedissonClient redisson = Redisson.create(config);return redisson;}}
(4)原有代码改造
需要把Redisson客户端注入到处理预读任务的JobScheduleHelper类中,由于Redisson客户端是一个Bean对象,而JobScheduleHelper是一个普通类,所以需要在创建JobScheduleHelper时传递参数的方式实现。调用JobScheduleHelper类最前置的类是XxlJobAdminConfig类,此类也是一个Bean对象,是初始化资源的入口类,我们把Redisson客户端注入到此类中,并在执行init初始化方法时,把Redisson客户端传递下去。
XxlJobAdminConfig.java修改的地方如下:
//注入redisson客户端@ResourceRedissonClient redissonClient;public void afterPropertiesSet() throws Exception {//初始化调度中心资源--添加参数xxlJobScheduler.init(redissonClient);}
XxlJobScheduler类的init方法接收参数,并在创建JobScheduleHelper类时把Redisson客户端作为参数传递进去,XxlJobScheduler.java修改的地方如下:
//初始化调度中心资源--接收参数public void init(RedissonClient redissonClient) throws Exception {//把redissonClient作为参数传递过去JobScheduleHelper.getInstance(redissonClient).start();}//销毁调度中心资源public void destroy() throws Exception {//停止预读线程、环形处理任务线程JobScheduleHelper.getInstance(null).toStop();}
JobScheduleHelper类之前使用饿汉式的方式创建,此时需要改为接收参数式的懒汉式创建,JobScheduleHelper.java修改的地方如下:
//redisson客户端private RedissonClient redissonClient;private static JobScheduleHelper instance = null;//接收参数式的懒汉式创建对象public static JobScheduleHelper getInstance(RedissonClient redissonClient){if(instance == null) {synchronized (JobScheduleHelper.class){if(instance == null) {instance = new JobScheduleHelper(redissonClient);}}}return instance;}//创建对象时,注入redisson客户端public JobScheduleHelper(RedissonClient redissonClient){this.redissonClient = redissonClient;}
(5)替代加锁机制
把创建mysql连接的代码修改为获取redisson锁,加锁成功才进行预读流程处理,把关闭mysql连接的代码修改为关闭redisson锁。
while (!scheduleThreadToStop) {//起始时间long start = System.currentTimeMillis();//获取redisson锁RLock lock = redissonClient.getLock("preReadJob");try {//尝试加锁boolean res = lock.tryLock(30, TimeUnit.SECONDS);if(res) {//获取到锁,进行处理long nowTime = System.currentTimeMillis();//处理预读流程...}} catch (Exception e) {} finally {//释放锁if (lock.isLocked() && lock.isHeldByCurrentThread()) {lock.unlock();}}
相关文章:
xxl-job调度中心、执行器源码详解
文章目录简介调度中心一.程序启动初始化1.初始化入口类2.初始化I18n3.初始化快慢调度线程池4.初始化处理执行器注册或移除线程池更新执行器最新在线的守护线程5.初始化监控任务调度失败或执行失败的守护线程6.初始化处理执行器回调线程池监控任务执行结果丢失的守护线程7.初始化…...
cpp c++summary笔记 复杂类型 “right-left” rule
复杂类型 “right-left” rule 先向右走在向左走,循环往复,右侧的终止为看到右括号,右中括号,左侧为左括号,指针(或其他int等)。 符号读作*指向AA的指针(总在左侧)[]容纳AA的数组(总在左侧)()返…...
bash编程(马哥)
bash基础特性: 命令行展开:~,{} 命令别名:alias,unalias 命令历史:history 命令和路径补全:$PATH glob通配符:*,?,[],[^], 快捷键&am…...
搭建Gerrit环境Ubuntu
搭建Gerrit环境 1.安装apache sudo apt-get install apache2 注意:To run Gerrit behind an Apache server using mod_proxy, enable the necessary Apache2 modules: 执行:sudo a2enmod proxy_http 执行:sudo a2enmod ssl 使新的配置生效,需要执行如下命令:serv…...
朋友去华为面试,轻松拿到26K的Offer,羡慕了......
最近有朋友去华为面试,面试前后进行了20天左右,包含4轮电话面试、1轮笔试、1轮主管视频面试、1轮hr视频面试。 据他所说,80%的人都会栽在第一轮面试,要不是他面试前做足准备,估计都坚持不完后面几轮面试。 其实&…...
springboot项目如何配置启动端口
文章目录0 写在前面1 配置文件(.yaml)--推荐2 配置文件(.properties)3 IDEA配置--不推荐4 写在最后0 写在前面 项目启动需要一个独立的端口,所以在此记录一下。 根据配置文件的后缀书写格式略有不同。 1 配置文件(.yaml)–推荐 若是.yaml后缀的配置文件࿰…...
IOS - 抓包通杀篇
IOS中大多数情况,开发者都会使用OC提供的api函数,CFNetworkCopySystemProxySettings来进行代理检测; CFNetworkCopySystemProxySettings 检测函数直接会检测这些ip和端口等: 采用直接附加页面进程: frida -UF -l 通…...
盒子模型的简介
盒子的组成 一个盒子由外到内可以分成四个部分:margin(外边距)、border(边框)、padding(内边距)、content(内容)。会发现margin、border、padding是css属性,因…...
Kubernetes 101,第二部分,pod
在上一篇文章中,我们了解了Kubernetes 的基础知识以及对其主要架构的介绍。 介绍完毕后,就该探索如何在 Kubernetes 中运行应用程序了。 容器包装器 在 Kubernetes 中,我们无法直接创建单个容器。相反,为了更好,我们可以将容器包装成一个单元,其中包括: 规范:多个容器可…...
protobuf序列化解码原理
Protobuf的编码方式 Varints是一种紧凑表示数字的办法。他用一个或者多个字节表示一个数字,值越小的数字节节数越少。相对与传统的用4字节表示int32类型的数字,Varints对于小于128的数值都可以用一个字节表示,大于128的数值会用更多的字节来表…...
OpenCV——line、circle、rectangle、ellipse、polylines函数的使用和绘制文本putText函数以及绘制中文的方法。
学习OpenCV的过程中,画图是不可避免的,本篇文章旨在介绍OpenCV中与画图相关的基础函数。 1、画线条——line()函数 介绍: cv2.line(image, start_point, end_point, color, thickness)参数: image: 图像start_point:…...
性能平台数据提速之路
作者 | 性能中台团队 导读 性能平台负责MEG所有研发数据的管理、接入、传输、应用等各个环节。数据的提速对于公司报表建设、决策分析、转化策略效果都有至关重要的影响。重点介绍数据生产端与消费端提速落地实践,如何高性价比满足大数据生产端提速?如何…...
Dns域名解析服务器
前言 域名解析服务器的介绍 域名服务器的类型划分 DNS域名解析的过程 为什么需要DNS解析域名为IP地址? 通俗理解Dns DNS劫持 DNS污染 Dns面试经验 前言 DNS是一个应用层协议,用来获取域名对应的IP地址 域名解析服务器的介绍 DNS(Dom…...
关于 JavaScript 中的 Promises
在 JavaScript 中,Promise 是一个对象,它表示一个可能还不可用,但会在未来解决的值。Promises 用于处理异步操作,例如发出网络请求或访问数据库,其中结果不是立即可用的。如果你准备好了,我想开始我们的冒险…...
PMP考前冲刺题——错题集
3、 [多选] 采购部门需要向全球不同的供应商采购项目所需的各种商品,所有采购订单均己发送给供应商并已按要求处理。项目经理后来收到客户提出的变更请求。由于项目经理未及时通知采购部门,运抵的所有物品都是按原来的需求所提供。 项目经理本应做什么来…...
【C++】30h速成C++从入门到精通(多态)
多态的概念多态:通俗来说就是多种心态,具体点就是去完成某个行为,当不同的对象去完成时会产生出不同的状态。多态的定义及实现多态的构成条件多态是在不同继承关系的类对象,去调用同意函数,产生了不同的行为࿰…...
从proc文件系统中获取gateway的IP地址
在linux的命令行下获取当前网络环境的gateway的IP并不是一件难事,常用的命令有ip route或者route -n,其实route -n也是通过读取proc文件系统下的文件来从内核获取路由表的,但ip route是通过netlink来获取的路由表;本文将讨论如何编写程序从proc文件系统中获取路由表,并从路…...
【LeetCode】剑指 Offer(17)
目录 题目:剑指 Offer 34. 二叉树中和为某一值的路径 - 力扣(Leetcode) 题目的接口: 解题思路: 代码: 过啦!!! 写在最后: 题目:剑指 Offer …...
MySQL索引类型
MySQL 是最流行的关系型数据库管理系统,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS (Relational Database Management System,关系数据库管理系统) 应用软件之一。 索…...
你了解HashMap吗?
一、前言:面试过的人都知道,HashMap是Java程序员在面试中最最最经常被问到的一个点,可以说,不了解HashMap都不好意思说自己是做Java开发的。基本上你去面试十家公司,有七八家都会问到你HashMap。那么今天,就…...
我一个女孩子居然做了十年硬件……
2011年,一个三本大学的电子信息专业的大三女学生跟2个通信专业的大二男生组成了一组代表学校参加2011年“瑞萨杯”全国大学生电子设计大赛,很意外的获得了湖北赛区省三等奖,虽然很意外,但还是挺高兴的,毕竟第一次为喜欢…...
【Linux】编译器gcc g++和调试器gdb的使用
文章目录1.编译器gcc/g1.1C语言程序的翻译过程1.预处理2.编译3.汇编4. 链接1.2 链接方式与函数库1.动态链接与静态链接2.动态库与静态库1.3 gcc与g的使用2.调试器gdb2.1debug和release2.2gdb的安装2.3gdb的使用2.4gdb的常用指令3.总结1.编译器gcc/g 1.1C语言程序的翻译过程 1…...
高效能自动化港口数字化码头智慧港航,中国人工智能企业CIMCAI世界港航人工智能领军者,成熟港口码头人工智能产品中国人工智能企业
打造高效能自动化港口数字化码头智慧港航,中国人工智能企业CIMCAI中集飞瞳世界港航人工智能领军者,成熟港口码头人工智能产品全球顶尖AI科技CIMCAI成熟AI产品全球前三船公司及港口落地,包括全球港口/堆场智能闸口验箱,全球港口岸边…...
HTTP协议(一)
HTTP协议(一) 什么是HTTP协议 客户端连上web服务器后,如果想要获得web服务器中的某个web资源,需要遵守一定的通讯格式,HTTP协议用于定义客户端与web服务器之间通讯的格式;基于TCP连接的传输协议ÿ…...
计算神经网络参数量Params、计算量FLOPs(亲测有效的3种方法)
1.stat(cpu统计) pip install torchstat from torchstat import statstat(model, (3, 32, 32)) #统计模型的参数量和FLOPs,(3,32,32)是输入图像的size 结果: 问题:当网络中有自定义参数时&am…...
sizeof与一维数组和二维数组
🍕博客主页:️自信不孤单 🍬文章专栏:C语言 🍚代码仓库:破浪晓梦 🍭欢迎关注:欢迎大家点赞收藏关注 sizeof与一维数组和二维数组 文章目录sizeof与一维数组和二维数组前言1. sizeof与…...
Spark UI
Spark UIExecutorsEnvironmentStorageSQLExchangeSortAggregateJobsStagesStage DAGEvent TimelineTask MetricsSummary MetricsTasks展示 Spark UI ,需要设置配置项并启动 History Server # SPARK_HOME表示Spark安装目录 ${SPAK_HOME}/sbin/start-history-server…...
windows应用(vc++2022)MFC基础到实战(2)
目录向导和资源编辑器使用 MFC 应用程序向导创建 MFC 应用程序使用类视图管理类和 Windows 消息使用资源编辑器创建和编辑资源生成 MFC 应用程序的操作1.创建一个主干应用程序。2.了解即使在不添加你自己的任何一行代码的情况下,框架和 MFC 应用程序向导也能提供的内…...
记一次反射型XSS
记一次反射型XSS1.反射型XSS1.1.前言1.2.测试过程1.3.实战演示1.3.1.输入框1.3.2.插入代码1.3.3.跳转链接2.总结1.反射型XSS 1.1.前言 关于这个反射型XSS,利用的方式除了钓鱼,可能更多的就是自娱自乐,那都说是自娱自乐了,并且对系…...
BUUCTF-[羊城杯 2020]Bytecode
题目下载:下载 这道题是一个关于python字节码的。 补充一下相关知识:https://shliang.blog.csdn.net/article/details/119676978dis --- Python 字节码反汇编器 — Python 3.7.13 文档 手工还原参考:[原创]死磕python字节码-手工还原python源码-软件逆…...
开通公司网站怎么开通/网络广告的特点
201701 20170124 arcgis server,忘记Manager用户名和密码的解决办法: 1、找到arcgis server的安装目录,目录指向\ArcGIS\Server\tools\passwordreset文件夹。文件夹下有一个cmd文件,名字叫做PasswordReset,主要用到两个命令&#…...
专业长春网站建设工作室/黄冈网站推广策略
linux启动oracle命令 su - oraclesqlplus /nologconnect /as sysdbastartup lsnrctl start...
怎么做web网站/产品宣传
一、传入参数的传递 parameterType指定参数类型 基本类型参数(int、string.......) pojo类型:user对象 map类型 包装类型 1、map类型的传递 需求:查询用户性别为男,姓张的用户 <mapper namespace"com.itcast…...
网站建设 慕课/短视频营销推广策略
编码一个SMS一般需要如下的信息:TP_Data_Coding_SchemeTP_UD编码方式TP_Destination_Address对方号码TP_Message_Reference参考号码TP_Status_Report_Request状态报告TP_User_Data用户信息TP_Validity_Priod有效期ServiceCenterNumber短信中心号码所以在编码器中存在…...
开发公司组织架构及岗位职责/宁波seo咨询
一、对MySQL的锁的了解当数据库有并发事务的时候,可能会产生数据的不一致,这时候需要一些机制来保证访问的次序,锁机制就是这样的一个机制。就像酒店的房间,如果大家随意进出,就会出现多人抢夺同一个房间的情况&#x…...
免费学校网站建设/怎么免费创建网站
需要解决问题:调研openstf/stf(https://github.com/openstf/stf),搭建docker(https://www.docker.com/)环境。 拆解为: docker基本使用stf 如何安装逐个来看: 1. docker基本使用 理解…...