# xxl-job分布式任务调度框架
- 整体思路借鉴quratz,由调度器,触发器,作业三大件组成;
- 调度器+触发器单独提取成了一个项目,成为调度中心;
- 作业抽象为由执行器统一执行,单独提取成了一个服务,支持原生与
springBoot
两种方式,本质上是基于netty的轻量级server,默认端口为9999;
# 集群的实现
- 调度中心集群,由
db
实现全局锁调度; - 执行器集群,由
db
实现注册中心的功能;
# 数据定义
- 作业
jobInfo
字段 | 属性 |
---|---|
job_group | 作业分组 |
schedule_type | 调度类型 |
executor_handler | 执行器任务handler |
trigger_status | 调度状态 |
trigger_last_time | 上次调度时间 |
trigger_next_time | 下次调度时间 |
- 作业分组
jobGroup
字段 | 属性 |
---|---|
app_name | 执行器应用名 |
address_type | 0=自动注册,1=手动录入 |
address_list | 执行器地址列表,多地址逗号分隔 |
- 作业账户
jobUser
--略 - 作业日志报告
jobLogReport
--略 - 作业日志
jobLog
--略 - glue作业日志
jogLogglue
--略 - 作业注册中心
jobRegistry
字段 | 属性 |
---|---|
registry_group | 注册中心分组 |
registry_key | 注册中心键名 |
registry_value | 注册中心键值 |
- 调度锁
jobLock
字段 | 属性 |
---|---|
lock_name | 锁名称 |
# 源码说明
- 源码分为两大块,分别是调度中心,执行器;
- 调度中心位于模块:
xxl-job-admin
- 执行器项目位于模块:
xxl-job-core
# 调度中心源码解读
# 调度器启动流程
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
@Override
public void afterPropertiesSet() throws Exception {
xxlJobScheduler = new XxlJobScheduler();
/*xxx: 调度中心项目启动时,初始化任务调度器 */
xxlJobScheduler.init();
}
@Override
public void destroy() throws Exception {
xxlJobScheduler.destroy();
}
}
/*xxx: 任务调度器*/
public class XxlJobScheduler {
/*xxx: 调度器初始化*/
public void init() throws Exception {
/*xxx: 任务触发调度池 启动 */
JobTriggerPoolHelper.toStart();
/*xxx: 调度中心注册表监控, 频度为每30秒*/
JobRegistryHelper.getInstance().start();
/*xxx: 作业失败报警, 频度为 每10秒钟 */
JobFailMonitorHelper.getInstance().start();
/*xxx: 作业丢失监控(调度作业在运行中状态超过10分钟,且对应执行器心跳注册不在线,则主动标记为失败), 频度为 每分钟 */
JobCompleteHelper.getInstance().start();
/*xxx: 报告日志监测: 频度为每分钟 */
JobLogReportHelper.getInstance().start();
/*xxx: 开始调度*/
JobScheduleHelper.getInstance().start();
}
/*xxx: 调度器销毁 */
public void destroy() throws Exception {
JobScheduleHelper.getInstance().toStop();
JobLogReportHelper.getInstance().toStop();
JobCompleteHelper.getInstance().toStop();
JobFailMonitorHelper.getInstance().toStop();
JobRegistryHelper.getInstance().toStop();
JobTriggerPoolHelper.toStop();
}
}
# 任务触发池的启动流程
public class JobTriggerPoolHelper {
/*xxx: 启动任务触发池, 最少有十个线程数维持活跃状态 */
public void start(){
/*xxx: 快任务触发池*/
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
/*xxx: 慢任务触发池*/
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
}
public void stop() {
fastTriggerPool.shutdownNow();
slowTriggerPool.shutdownNow();
}
}
# 任务触发池的触发机制
- 异步推送触发任务
public class JobTriggerPoolHelper {
/*xxx: 添加触发器*/
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
/*xxx: 选择线程池 */
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
/*xxx: 使用该线程池触发任务 */
triggerPool_.execute(new Runnable() {
@Override
public void run() {
/*xxx: 实际触发任务的方法, 通过jobid, jobType来定义 任务 */
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
});
}
}
/*xxx: 触发器 */
public class XxlJobTrigger {
/*xxx: 加载任务 */
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {
/*xxx: 触发处理 */
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
/*xxx: 触发远程执行器 */
ReturnT<String> triggerResult = runExecutor(triggerParam, address);
//xxx: 更新触发日志
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
}
/*xxx: 触发任务调度*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
/*xxx: 客户端执行器*/
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
/*xxx: 客户端执行器 进行执行*/
ReturnT<String> runResult = executorBiz.run(triggerParam);
}
}
/*xxx: 任务调度器*/
public class XxlJobScheduler {
private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();
/*xxx: 客户端执行器*/
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
/*xxx: 首次执行时,将客户端执行器实例化后,进行缓存 */
executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
}
# 作业执行器的结构(核心)
# 执行器的体系结构
- 顶级接口
/*xxx: 客户端执行器 */
public interface ExecutorBiz {
ReturnT<String> beat();
ReturnT<String> run(TriggerParam triggerParam);
ReturnT<String> kill(KillParam killParam);
ReturnT<LogResult> log(LogParam logParam);
}
- 代理端
public class ExecutorBizClient implements ExecutorBiz {
public ExecutorBizClient(String addressUrl, String accessToken) {
this.addressUrl = addressUrl;
this.accessToken = accessToken;
if (!this.addressUrl.endsWith("/")) {
this.addressUrl = this.addressUrl + "/";
}
}
@Override
public ReturnT<String> beat() {
return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, "", String.class);
}
@Override
/*xxx: 执行器代理 */
public ReturnT<String> run(TriggerParam triggerParam) {
/*xxx: 发送请求给客户端执行器,客户端同步执行任务后,反馈执行结果*/
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
@Override
public ReturnT<String> kill(KillParam killParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, killParam, String.class);
}
@Override
public ReturnT<LogResult> log(LogParam logParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class);
}
}
- 业务端
public class ExecutorBizImpl implements ExecutorBiz {
@Override
/*xxx: 执行器实际执行的方法*/
public ReturnT<String> run(TriggerParam triggerParam) {
//xxx: 省略其它抽象...
}
@Override
public ReturnT<String> kill(KillParam killParam) {
//xxx: 省略其它抽象...
}
}
# 执行器的执行过程(调度中心部分-核心)
- 详见执行器章节
public class ExecutorBizImpl implements ExecutorBiz {
@Override
/*xxx: 执行器实际执行的方法*/
public ReturnT<String> run(TriggerParam triggerParam) {
/*xxx: 加载任务线程 */
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
//xxxx: 其它抽象,略...
if (jobThread != null) {
/*xxx: 获取 执行器的阻塞策略*/
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
/*xxx: 获取 执行器的阻塞策略*/
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
/*xxx: 丢弃后续调度*/
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
//xxx: 覆盖之前的调度
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
if (jobThread.isRunningOrHasQueue()) {
jobThread = null;
}
} else {
// just queue trigger
}
}
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
}
# 执行器注册中心的启动机制
/*xxx: 调度中心注册表监控*/
public class JobRegistryHelper {
public void start(){
registryOrRemoveThreadPool = new ThreadPoolExecutor(2,
10,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),null,null);
/*xxx: 注册表监控线程*/
registryMonitorThread = new Thread(()->{
while (!toStop) {
// auto registry group
/*xxx: 从数据库中,获取作业组*/
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
/*xxx: 移除无效地址, 包括 调度中心,执行器*/
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
//xxx: 省略其它抽象...
/*xxx: 调度中心心跳, 30秒 */
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
});
}
}
# 执行器注册中心的注册(核心)
# 注册中心结构设计
/*xxx: 调度中心接口 */
public interface AdminBiz {
/*xxx: 回调 */
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList);
/*xxx: 注册*/
public ReturnT<String> registry(RegistryParam registryParam);
/*xxx: 移除注册*/
public ReturnT<String> registryRemove(RegistryParam registryParam);
}
//xxx: 执行器代理
public class AdminBizClient implements AdminBiz {
/*xxx: 调度中心客户端*/
public AdminBizClient(String addressUrl, String accessToken) {
this.addressUrl = addressUrl;
this.accessToken = accessToken;
// valid
if (!this.addressUrl.endsWith("/")) {
this.addressUrl = this.addressUrl + "/";
}
}
@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}
@Override
/*xxx: 调度中心注册*/
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
}
}
@Service
public class AdminBizImpl implements AdminBiz {
@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return JobCompleteHelper.getInstance().callback(callbackParamList);
}
@Override
/*xxx: 注册中心注册 */
public ReturnT<String> registry(RegistryParam registryParam) {
return JobRegistryHelper.getInstance().registry(registryParam);
}
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
return JobRegistryHelper.getInstance().registryRemove(registryParam);
}
}
执行器启动时,会向注册中心注册。注册的动作是通过调度中心完成的,因为执行器无法直接操作数据库
# 作业失败监控的启动流程
public class JobFailMonitorHelper {
public void start(){
/*xxx: 监控线程 */
monitorThread = new Thread(()->{
while (!toStop) {
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
for (long failLogId: failLogIds) {
/*xxx: 失败重试 */
if (log.getExecutorFailRetryCount() > 0) {
//xxx: trigger,略
}
//xxx: 失败告警
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
}
//xxx: 间隔10s触发
TimeUnit.SECONDS.sleep(10);
}
});
}
}
# 作业结果监控的启动机制
public class JobCompleteHelper {
public void start(){
/*xxx: 回调线程池 */
callbackThreadPool = new ThreadPoolExecutor(
2,
20,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3000),null,null);
/*xxx: 监控线程 */
monitorThread = new Thread(()->{
//xxx: 等待触发池初始化
TimeUnit.MILLISECONDS.sleep(50);
while (!toStop) {
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
/*xxx: 任务结果丢失 */
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
for (Long logId: losedJobIds) {
XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
}
//xxx: 频度为60s
TimeUnit.SECONDS.sleep(60);
}
});
}
}
# 作业日志监控的启动机制
public class JobLogReportHelper {
public void start(){
/*xxx: 调度报告日志 */
logrThread = new Thread(()->{
while (!toStop) {
/*xxx: 刷新近3天的日志统计报告 */
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
/*xxx: 清除过期的日志, 略*/
/*xxx: 频度一分钟*/
TimeUnit.MINUTES.sleep(1);
}
});
}
}
# 调度的启动机制(核心)
# 根据时间间隔的触发
public class JobScheduleHelper {
/*xxx: 调度器开始调度 */
public void start(){
/*xxx: 调度线程初始化 */
scheduleThread = new Thread(()->{
/*xxx: 初始化调度线程时,延后执行,异步环境,估计是等待其它基础设施初始化 */
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
/*xxx: 调度线程控制字段 */
while (!scheduleThreadToStop) {
/*xxx: 开启调度锁 */
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
/*xxx: 从数据库中,读取调度列表, 一次读取5秒内将会触发的6000个 即6000个以内的作业,会一次性读出 */
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
for (XxlJobInfo jobInfo: scheduleList) {
//xxx: 对本次任务调度的处理,这些作业的触发是一个时间区间
/*xxx: 当前时间在触发时间区间之外, 可根据策略决定是否触发,或者跳过当次触发 */
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
/*xxx: 刷新下次触发的时间点 */
refreshNextValidTime(jobInfo, new Date());
/*xxx: 当前时间正好在触发的时间区间*/
}else if (nowTime > jobInfo.getTriggerNextTime()) {
/*xxx: 触发任务, 触发的类型为 cron*/
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
/*xxx: 刷新下次触发的时间点 */
refreshNextValidTime(jobInfo, new Date());
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
/*xxx: 5秒内的触发,放置在 timeRing中,由专门的线程进行触发*/
pushTimeRing(ringSecond, jobInfo.getId());
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
for (XxlJobInfo jobInfo: scheduleList) {
/*xxx: 更新持久化 */
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
}
/*xxx: 一秒之内的循环,需要控制速率*/
if (cost < 1000) {
try {
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:5000) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
}
}
}
});
}
}
# 每秒的触发
public class JobScheduleHelper {
/*xxx: 调度器开始调度 */
public void start(){
ringThread = new Thread(()->{
//xxx: 毫秒对齐
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
while (!ringThreadToStop) {
// 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
for (int jobId: ringItemData) {
/*xxx: 触发任务*/
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
ringItemData.clear();
}
/*xxx: 以秒为基本单位,对其毫秒 */
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
});
}
}
# 调度中心核心线程说明
任务触发器 由两个触发池构成,常驻至少20个活跃线程用于触发任务
注册中心 由一个心跳线程,一个线程池(内含两个核心线程) 构成; 心跳线程用于自动移除注册中心的失效数据; 线程池用于主动注册或者主动移除注册;
作业结果处理 由一个心跳线程,一个线程池(内含两个核心线程)构成; 心跳线程用于自动标记的作业状态数据; 线程池用于回调的方式主动更新作业状态;
调度中心的调度,有两个线程进行执行,一个是根据时间范围进行调度,还有一个是每秒进行调度;
# 执行器源码解读
- 执行器依赖于调度中心,很多的关键操作都需要委托调度中心执行
# 执行器初始化流程
/*xxx: 作业执行器*/
public class XxlJobExecutor {
/*xxx: 启动执行器 */
public void start() throws Exception {
/*xxx: 初始化执行器日志路径 ,默认为 /data/applogs/xxl-job/jobhandler */
XxlJobFileAppender.initLogPath(logPath);
/*xxx: 初始化注册中心客户端*/
initAdminBizList(adminAddresses, accessToken);
/*xxx: 初始化日志清理线程*/
JobLogFileCleanThread.getInstance().start(logRetentionDays);
/*xxx: 初始化 触发器回调线程 */
TriggerCallbackThread.getInstance().start();
/*xxx: 启动执行器服务监听*/
initEmbedServer(address, ip, port, appname, accessToken);
}
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
//xxx: 略...
}
}
# 初始化调度中心
/*xxx: 作业执行器*/
public class XxlJobExecutor {
/*xxx: 调度中心地址列表 */
private static List<AdminBiz> adminBizList;
/*xxx: 初始调度中心地址 */
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
/*xxx: 添加至列表*/
adminBizList.add(adminBiz);
}
}
}
}
//xxx: 调度中心客户端代理
public class AdminBizClient implements AdminBiz {
/*xxx: 调度中心客户端*/
public AdminBizClient(String addressUrl, String accessToken) {
this.addressUrl = addressUrl;
this.accessToken = accessToken;
// valid
if (!this.addressUrl.endsWith("/")) {
this.addressUrl = this.addressUrl + "/";
}
}
private String addressUrl ;
private String accessToken;
private int timeout = 3;
@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}
@Override
/*xxx: 调度中心注册*/
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
}
}
# 日志清理任务
public class JobLogFileCleanThread {
public void start(final long logRetentionDays){
/*xxx: 如果日志保留时间小于3天,则不清理*/
if (logRetentionDays < 3 ) {
return;
}
localThread = new Thread(()->{
while (!toStop) {
File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
for (File childFile: childDirs) {
/*xxx: 超出保留时间的日志,清理*/
/*xxx: 默认为保留近30天的日志 */
if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
FileUtil.deleteRecursively(childFile);
}
}
/*xxx: 线程频度为1天*/
TimeUnit.DAYS.sleep(1);
}
});
}
}
# 触发器回调的初始化
public class TriggerCallbackThread {
public void start() {
/*xxx: 触发器回调,必须配置调度中心地址*/
if (XxlJobExecutor.getAdminBizList() == null) {
return;
}
/*xxx: 触发器回调线程 */
triggerCallbackThread = new Thread(()->{
while(!toStop){
/*xxx: 该步会阻塞获取数据 */
HandleCallbackParam callback = getInstance().callBackQueue.take();
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
getInstance().callBackQueue.drainTo(callbackParamList);
/*xxx: 执行回调 */
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
}
/*xxx: 退出线程时,继续处理回调*/
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
getInstance().callBackQueue.drainTo(callbackParamList);
doCallback(callbackParamList);
});
//xxx: 对于出错的回调,进行重试, 它将一直重试,直至成功
triggerRetryCallbackThread = new Thread(()->{
while(!toStop){
//xxx: 回调重试
retryFailCallbackFile();
//xxx: 调用频度为30秒
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
});
}
private void doCallback(List<HandleCallbackParam> callbackParamList){
/*xxx: 遍历尝试执行回调*/
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
//xxx: 如果正确的回调后,则不再继续后续的调度中心调用
/*xxx: 如果出错,会记录日志*/
if (!callbackRet) {
appendFailCallbackFile(callbackParamList);
}
}
}
private void retryFailCallbackFile(){
/*xxx: 获取出错日志 */
File callbackLogPath = new File(failCallbackFilePath);
for (File callbaclLogFile: callbackLogPath.listFiles()) {
byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);
List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);
doCallback(callbackParamList);
}
}
}
# 执行器启动(重要)
/*xxx: 内置服务,用于接收 job请求,并进行处理,基于netty实现 */
public class EmbedServer {
/*xxx: 启动执行器服务 */
public void start(final String address, final int port, final String appname, final String accessToken) {
/*xxx: 执行器初始化 */
executorBiz = new ExecutorBizImpl();
thread = new Thread(()->{
/*xxx: 任务线程池*/
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),null,null);
//xxx: 通过netty启动服务,略
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
/*xxx: 请求处理*/
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
//xxx: 向注册中心注册 执行器信息
startRegistry(appname, address);
/*xxx: 维持当前线程 */
future.channel().closeFuture().sync();
});
}
/*xxx: 向调度中心注册服务 */
public void startRegistry(final String appname, final String address) {
// start registry
/*xxx: 向调度中心 注册 调度器 */
ExecutorRegistryThread.getInstance().start(appname, address);
}
}
/*xxx: 任务处理器*/
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
/*xxx: 调用线程池执行任务*/
bizThreadPool.execute(()->{
/*xxx: 执行实际业务方法*/
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
});
/*xxx: 响应请求*/
writeResponse(ctx, keepAlive, responseJson);
}
/*xxx: 执行实际的业务方法 */
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
//xxx: 省略其它抽象...
if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
/*xxx: 通过执行器执行任务 */
return executorBiz.run(triggerParam);
}
//xxx: 省略其它抽象...
}
}
# 注册中心注册执行器(重要)
/*xxx: 注册中心注册执行器 线程 */
public class ExecutorRegistryThread {
public void start(final String appname, final String address){
/*xxx: 调度中心注册表线程 */
registryThread = new Thread(()->{
while (!toStop) {
/*xxx: 遍历所有的 调度中心 */
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
/*xxx: 委托调度中心 向注册中心 注册服务 */
ReturnT<String> registryResult = adminBiz.registry(registryParam);
/*xxx: 委托成功后,就没必要再次委托下一个了 */
}
/*xxx: 调度器向调度中心中心注册, 频度为 30秒 */
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
/*xxx: 线程结束时,移除所有在调度中心注册的调度器 */
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
});
}
}
# 执行器的任务执行过程(重要)
public class ExecutorBizImpl implements ExecutorBiz {
/*xxx: 执行器实际执行的方法*/
public ReturnT<String> run(TriggerParam triggerParam) {
/*xxx: 加载任务线程 */
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
/*xxx: 根据任务类型,分别处理*/
if (GlueTypeEnum.BEAN == glueTypeEnum) {
/*xxx: 加载作业处理器 */
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return ;
}
}
}
//xxx: 省略其它抽象...
if (jobThread != null) {
/*xxx: 获取 执行器的阻塞策略*/
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
/*xxx: 丢弃后续调度*/
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
return;
}else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
//xxx: 覆盖之前的调度
jobThread = null;
}
}
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
}
}
/*xxx: 作业执行器*/
public class XxlJobExecutor {
/*xxx: 作业线程注册表 */
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
JobThread newJobThread = new JobThread(jobId, handler);
newJobThread.start();
JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);
}
}
/*xxx: 作业线程 */
public class JobThread extends Thread{
/*xxx: 作业处理器 */
private IJobHandler handler;
/*xxx: 作业触发器列表 */
private LinkedBlockingQueue<TriggerParam> triggerQueue;
public void run() {
while(!toStop){
/*xxx: 获取触发任务*/
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
/*xxx: 如果触发器有超时时间,则是同步执行*/
if (triggerParam.getExecutorTimeout() > 0) {
FutureTask<Boolean> futureTask=()->{
/*xxx: 调用业务处理逻辑*/
handler.execute();
};
Thread futureThread = new Thread(futureTask);
futureThread.start();
/*xxx: 阻塞获取执行结果*/
Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
}else{
/*xxx: 如果触发器没有指定超时时间,则是异步执行*/
handler.execute();
}
try{
//上面的部分,全部在try里面...
} finally{
//xxx: 触发回调
TriggerCallbackThread.pushCallBack();
}
}else{
//xxx: 空闲时间大于了30秒,则关闭当前线程
if (idleTimes > 30) {
if(triggerQueue.size() == 0) {
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
}
}
}
}
# 执行器核心线程行为说明
- 执行线程为阻塞执行,有超时时间限制,超时后线程会销毁,且执行线程与具体的jobId绑定
- 执行线程执行任务,根据是否超时,可分为同步调用与异步调用,单线程调用无法指定超时,对于调度器来说,都是同步的;
- 一次触发,经历了多次异步动作:
request
->bizThreadPool
(一个线程池,最大200线程)->JobThread
(单业务线程)->futureThread
(如果有超时设置)
# 实践说明
- executor(通常是应用本身),如果配置了 appName以及注册之功效地址,则会进行自动注册。但是注意注册的是executor,这个在页面是没有直接的显示页面的
- 调度中心的处理单位是executor-group,也就是菜单执行器列表,无论是自动注册,还是手动注册,都需要手动的去维护。自动注册的区别在于可以实现动态管理(appName相同的应用自动划归为一个组),手动注册的方式可以静态的管理应用列表;
- 除此之外,job本身也不会自动注册到注册中心,都需要手动维护。
- 简言之,调度中心无法直观的感受到应用是否成功注册,任务是否成功装载,都需要手动配置,尽管实际上是生效了;