# xxl-job分布式任务调度框架

  • 整体思路借鉴quratz,由调度器,触发器,作业三大件组成;
  • 调度器+触发器单独提取成了一个项目,成为调度中心;
  • 作业抽象为由执行器统一执行,单独提取成了一个服务,支持原生与springBoot两种方式,本质上是基于netty的轻量级server,默认端口为9999;

# 集群的实现

  • 调度中心集群,由db实现全局锁调度;
  • 执行器集群,由db实现注册中心的功能;

# 数据定义

  • 作业jobInfo
字段 属性
job_group 作业分组
schedule_type 调度类型
executor_handler 执行器任务handler
trigger_status 调度状态
trigger_last_time 上次调度时间
trigger_next_time 下次调度时间
  • 作业分组jobGroup
字段 属性
app_name 执行器应用名
address_type 0=自动注册,1=手动录入
address_list 执行器地址列表,多地址逗号分隔
  • 作业账户jobUser--略
  • 作业日志报告jobLogReport--略
  • 作业日志jobLog--略
  • glue作业日志jogLogglue--略
  • 作业注册中心jobRegistry
字段 属性
registry_group 注册中心分组
registry_key 注册中心键名
registry_value 注册中心键值
  • 调度锁jobLock
字段 属性
lock_name 锁名称

# 源码说明

  • 源码分为两大块,分别是调度中心,执行器;
  • 调度中心位于模块:xxl-job-admin
  • 执行器项目位于模块:xxl-job-core

# 调度中心源码解读

# 调度器启动流程

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
 	
    @Override
    public void afterPropertiesSet() throws Exception {
     	xxlJobScheduler = new XxlJobScheduler();
        /*xxx: 调度中心项目启动时,初始化任务调度器 */
        xxlJobScheduler.init();   
    }
    
    @Override
    public void destroy() throws Exception {
        xxlJobScheduler.destroy();
    }
}
/*xxx: 任务调度器*/
public class XxlJobScheduler  {
 	/*xxx: 调度器初始化*/
    public void init() throws Exception {

        /*xxx: 任务触发调度池 启动 */
        JobTriggerPoolHelper.toStart();

        /*xxx: 调度中心注册表监控,  频度为每30秒*/
        JobRegistryHelper.getInstance().start();

        /*xxx: 作业失败报警, 频度为 每10秒钟 */
        JobFailMonitorHelper.getInstance().start();

        /*xxx: 作业丢失监控(调度作业在运行中状态超过10分钟,且对应执行器心跳注册不在线,则主动标记为失败), 频度为 每分钟 */
        JobCompleteHelper.getInstance().start();

        /*xxx: 报告日志监测: 频度为每分钟 */
        JobLogReportHelper.getInstance().start();

        /*xxx: 开始调度*/
        JobScheduleHelper.getInstance().start();
    }
    
    /*xxx: 调度器销毁 */
    public void destroy() throws Exception {
     	JobScheduleHelper.getInstance().toStop();
        JobLogReportHelper.getInstance().toStop();
        JobCompleteHelper.getInstance().toStop();
        JobFailMonitorHelper.getInstance().toStop();
        JobRegistryHelper.getInstance().toStop();
        JobTriggerPoolHelper.toStop();
    }
}

# 任务触发池的启动流程

public class JobTriggerPoolHelper {
    
    /*xxx: 启动任务触发池, 最少有十个线程数维持活跃状态 */
    public void start(){
     	   /*xxx: 快任务触发池*/
        fastTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                    }
                });
        
        /*xxx: 慢任务触发池*/
        slowTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                    }
                });
    }
    
    public void stop() {
        fastTriggerPool.shutdownNow();
        slowTriggerPool.shutdownNow();
    }
    
    
}	

# 任务触发池的触发机制

  • 异步推送触发任务
public class JobTriggerPoolHelper {
 	/*xxx: 添加触发器*/
    public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {
        
     	/*xxx: 选择线程池 */
        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
            triggerPool_ = slowTriggerPool;
        }
        
        
        /*xxx: 使用该线程池触发任务 */
        triggerPool_.execute(new Runnable() {
            @Override
            public void run() {
                /*xxx: 实际触发任务的方法, 通过jobid, jobType来定义 任务  */
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
            }
        });
    }
}
/*xxx: 触发器 */
public class XxlJobTrigger {
 	/*xxx: 加载任务 */
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    
    
    public static void trigger(int jobId,
                               TriggerTypeEnum triggerType,
                               int failRetryCount,
                               String executorShardingParam,
                               String executorParam,
                               String addressList) {
        /*xxx: 触发处理 */
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
    }
    

    private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
     	/*xxx: 触发远程执行器 */
        ReturnT<String> triggerResult = runExecutor(triggerParam, address);   
        
        //xxx: 更新触发日志
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
    }
    
    /*xxx: 触发任务调度*/
    public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
     	/*xxx: 客户端执行器*/
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        /*xxx: 客户端执行器 进行执行*/
        ReturnT<String> runResult = executorBiz.run(triggerParam);   
    }
}
/*xxx: 任务调度器*/
public class XxlJobScheduler  {
    
 	private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();
    
    /*xxx: 客户端执行器*/
    public static ExecutorBiz getExecutorBiz(String address) throws Exception {
     	/*xxx: 首次执行时,将客户端执行器实例化后,进行缓存 */
        executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());

        executorBizRepository.put(address, executorBiz);
        return executorBiz;   
    }
}

# 作业执行器的结构(核心)

# 执行器的体系结构

  • 顶级接口
/*xxx: 客户端执行器  */
public interface ExecutorBiz {
    ReturnT<String> beat();
    
    ReturnT<String> run(TriggerParam triggerParam);
    
    ReturnT<String> kill(KillParam killParam);
    
    ReturnT<LogResult> log(LogParam logParam);
}	
  • 代理端
public class ExecutorBizClient implements ExecutorBiz {
    
 	public ExecutorBizClient(String addressUrl, String accessToken) {
        this.addressUrl = addressUrl;
        this.accessToken = accessToken;
        if (!this.addressUrl.endsWith("/")) {
            this.addressUrl = this.addressUrl + "/";
        }
    } 
    
    @Override
    public ReturnT<String> beat() {
        return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, "", String.class);
    }
    
    @Override
    /*xxx: 执行器代理 */
    public ReturnT<String> run(TriggerParam triggerParam) {
        /*xxx: 发送请求给客户端执行器,客户端同步执行任务后,反馈执行结果*/
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    }
    
    @Override
    public ReturnT<String> kill(KillParam killParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, killParam, String.class);
    }

    @Override
    public ReturnT<LogResult> log(LogParam logParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class);
    }
}
  • 业务端
public class ExecutorBizImpl implements ExecutorBiz {
 	@Override
    /*xxx: 执行器实际执行的方法*/
    public ReturnT<String> run(TriggerParam triggerParam) {
        //xxx: 省略其它抽象...
    }   
    
    @Override
    public ReturnT<String> kill(KillParam killParam) {
        //xxx: 省略其它抽象...
    }
}

# 执行器的执行过程(调度中心部分-核心)

  • 详见执行器章节
public class ExecutorBizImpl implements ExecutorBiz {
 	   
    @Override
    /*xxx: 执行器实际执行的方法*/
    public ReturnT<String> run(TriggerParam triggerParam) {
     	/*xxx: 加载任务线程 */
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        
        //xxxx: 其它抽象,略...
        if (jobThread != null) {
         	  /*xxx: 获取 执行器的阻塞策略*/
        	ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); 
            
        	/*xxx: 获取 执行器的阻塞策略*/
        	ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        
        	/*xxx: 丢弃后续调度*/
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                
                if (jobThread.isRunningOrHasQueue()) {
                    
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                    
                }
                //xxx: 覆盖之前的调度
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {

                if (jobThread.isRunningOrHasQueue()) {
                    
                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }
        
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }
        
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }
}

# 执行器注册中心的启动机制

/*xxx: 调度中心注册表监控*/
public class JobRegistryHelper {
    
 	public void start(){
        
     	registryOrRemoveThreadPool = new ThreadPoolExecutor(2,
				10,
				30L,
				TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>(2000),null,null);
        
        /*xxx: 注册表监控线程*/
		registryMonitorThread = new Thread(()->{
            while (!toStop) {
						// auto registry group
						/*xxx: 从数据库中,获取作业组*/
						List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
                        
                        /*xxx: 移除无效地址, 包括 调度中心,执行器*/
						List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (ids!=null && ids.size()>0) {
								XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
						}
                        
                        //xxx: 省略其它抽象...
                        
                        /*xxx: 调度中心心跳, 30秒 */
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);

            }
        });
        
    }
}

# 执行器注册中心的注册(核心)

# 注册中心结构设计

/*xxx: 调度中心接口 */
public interface AdminBiz {
 	/*xxx: 回调 */
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList);
    
    /*xxx: 注册*/
    public ReturnT<String> registry(RegistryParam registryParam);
    
    /*xxx: 移除注册*/
    public ReturnT<String> registryRemove(RegistryParam registryParam);
}
//xxx: 执行器代理
public class AdminBizClient implements AdminBiz {
    /*xxx: 调度中心客户端*/
    public AdminBizClient(String addressUrl, String accessToken) {
        this.addressUrl = addressUrl;
        this.accessToken = accessToken;

        // valid
        if (!this.addressUrl.endsWith("/")) {
            this.addressUrl = this.addressUrl + "/";
        }
    }
    
    @Override
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
        return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
    }

    @Override
    /*xxx: 调度中心注册*/
    public ReturnT<String> registry(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
    }

    @Override
    public ReturnT<String> registryRemove(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
    }
}
@Service
public class AdminBizImpl implements AdminBiz {
 	@Override
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
        return JobCompleteHelper.getInstance().callback(callbackParamList);
    }

    @Override
    /*xxx: 注册中心注册 */
    public ReturnT<String> registry(RegistryParam registryParam) {
        return JobRegistryHelper.getInstance().registry(registryParam);
    }

    @Override
    public ReturnT<String> registryRemove(RegistryParam registryParam) {
        return JobRegistryHelper.getInstance().registryRemove(registryParam);
    }   
}

执行器启动时,会向注册中心注册。注册的动作是通过调度中心完成的,因为执行器无法直接操作数据库

# 作业失败监控的启动流程

public class JobFailMonitorHelper {
 	
    public void start(){
     	
        /*xxx: 监控线程 */
		monitorThread = new Thread(()->{
            
            while (!toStop) {
                        List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
                        
                        for (long failLogId: failLogIds) {
                         	/*xxx: 失败重试 */
							if (log.getExecutorFailRetryCount() > 0) {   
                            	//xxx: trigger,略
                            }
                            
                            //xxx: 失败告警
                            boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
                        }

                        //xxx: 间隔10s触发
                        TimeUnit.SECONDS.sleep(10);
            }
        });
    }
    
}

# 作业结果监控的启动机制

public class JobCompleteHelper {
 	
    public void start(){
     	/*xxx: 回调线程池 */
		callbackThreadPool = new ThreadPoolExecutor(
				2,
				20,
				30L,
				TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>(3000),null,null);
        
        
        /*xxx: 监控线程 */
		monitorThread = new Thread(()->{
           //xxx: 等待触发池初始化
			TimeUnit.MILLISECONDS.sleep(50);
            
            while (!toStop) {
					
                // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
                /*xxx: 任务结果丢失 */
                Date losedTime = DateUtil.addMinutes(new Date(), -10);
                List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
                
                for (Long logId: losedJobIds) {
                    XxlJobCompleter.updateHandleInfoAndFinish(jobLog);   
                }
                    
                
                //xxx: 频度为60s
                TimeUnit.SECONDS.sleep(60);
            }
        });
    }
}

# 作业日志监控的启动机制

public class JobLogReportHelper {
    
 	public void start(){
        
     	/*xxx: 调度报告日志 */
        logrThread = new Thread(()->{
            while (!toStop) {

                
                /*xxx: 刷新近3天的日志统计报告 */
                int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
                
                /*xxx: 清除过期的日志, 略*/
                    
                
                /*xxx: 频度一分钟*/
                TimeUnit.MINUTES.sleep(1);
                
            }
        });   
    }
}

# 调度的启动机制(核心)

# 根据时间间隔的触发

public class JobScheduleHelper {
 	
    
    /*xxx: 调度器开始调度 */
    public void start(){
     	
        /*xxx: 调度线程初始化 */
        scheduleThread = new Thread(()->{
            /*xxx: 初始化调度线程时,延后执行,异步环境,估计是等待其它基础设施初始化 */
            TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
            
            /*xxx: 调度线程控制字段 */
            while (!scheduleThreadToStop) {
                  /*xxx: 开启调度锁 */
                  preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );  
                
                  /*xxx: 从数据库中,读取调度列表, 一次读取5秒内将会触发的6000个  即6000个以内的作业,会一次性读出  */
                  List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                
                for (XxlJobInfo jobInfo: scheduleList) {
                 	//xxx: 对本次任务调度的处理,这些作业的触发是一个时间区间
                                /*xxx: 当前时间在触发时间区间之外, 可根据策略决定是否触发,或者跳过当次触发 */
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                  /*xxx: 刷新下次触发的时间点 */
                                  refreshNextValidTime(jobInfo, new Date());    
                                  /*xxx: 当前时间正好在触发的时间区间*/
                                }else if (nowTime > jobInfo.getTriggerNextTime()) {
                                 	/*xxx: 触发任务, 触发的类型为 cron*/
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                   
                                    /*xxx: 刷新下次触发的时间点 */
                                    refreshNextValidTime(jobInfo, new Date());
                                    
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
                                     	/*xxx: 5秒内的触发,放置在 timeRing中,由专门的线程进行触发*/
                                        pushTimeRing(ringSecond, jobInfo.getId());  
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
                                    }
                                    
                                }
                    
                    for (XxlJobInfo jobInfo: scheduleList) {
                                /*xxx: 更新持久化 */
                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            }

                }
                
                /*xxx: 一秒之内的循环,需要控制速率*/
                  if (cost < 1000) {  
                        try {
                            
                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:5000) - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                           
                        }
                    }
            }
        });
        
    }
}

# 每秒的触发

public class JobScheduleHelper {
 	
    /*xxx: 调度器开始调度 */
    public void start(){
     	ringThread = new Thread(()->{
            //xxx: 毫秒对齐
            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
            
            while (!ringThreadToStop) {
             	// 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                for (int i = 0; i < 2; i++) {
                   List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                }   
                
                for (int jobId: ringItemData) {
                      /*xxx: 触发任务*/
                      JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                }
                ringItemData.clear();
                
            }
            
            /*xxx: 以秒为基本单位,对其毫秒 */
            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
        });   
    }
}

# 调度中心核心线程说明

  • 任务触发器 由两个触发池构成,常驻至少20个活跃线程用于触发任务

  • 注册中心 由一个心跳线程,一个线程池(内含两个核心线程) 构成; 心跳线程用于自动移除注册中心的失效数据; 线程池用于主动注册或者主动移除注册;

  • 作业结果处理 由一个心跳线程,一个线程池(内含两个核心线程)构成; 心跳线程用于自动标记的作业状态数据; 线程池用于回调的方式主动更新作业状态;

  • 调度中心的调度,有两个线程进行执行,一个是根据时间范围进行调度,还有一个是每秒进行调度;

# 执行器源码解读

  • 执行器依赖于调度中心,很多的关键操作都需要委托调度中心执行

# 执行器初始化流程

/*xxx: 作业执行器*/
public class XxlJobExecutor  {
 	
    /*xxx: 启动执行器 */
    public void start() throws Exception {
     	/*xxx: 初始化执行器日志路径 ,默认为 /data/applogs/xxl-job/jobhandler */
        XxlJobFileAppender.initLogPath(logPath);
        
        /*xxx: 初始化注册中心客户端*/
        initAdminBizList(adminAddresses, accessToken);
        
        /*xxx: 初始化日志清理线程*/
        JobLogFileCleanThread.getInstance().start(logRetentionDays);
        
        /*xxx: 初始化 触发器回调线程 */
        TriggerCallbackThread.getInstance().start();
        
        /*xxx: 启动执行器服务监听*/
        initEmbedServer(address, ip, port, appname, accessToken);
    }
    
    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
     	//xxx: 略...   
    }
    
}

# 初始化调度中心

/*xxx: 作业执行器*/
public class XxlJobExecutor  {
 	
    /*xxx: 调度中心地址列表 */
    private static List<AdminBiz> adminBizList;
    
    /*xxx: 初始调度中心地址 */
    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
     	for (String address: adminAddresses.trim().split(",")) {
                if (address!=null && address.trim().length()>0) {

                    AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

                    if (adminBizList == null) {
                        adminBizList = new ArrayList<AdminBiz>();
                    }
                    /*xxx: 添加至列表*/
                    adminBizList.add(adminBiz);
                }
            }   
    }
    
    
}
//xxx: 调度中心客户端代理
public class AdminBizClient implements AdminBiz {
 	/*xxx: 调度中心客户端*/
    public AdminBizClient(String addressUrl, String accessToken) {
        this.addressUrl = addressUrl;
        this.accessToken = accessToken;

        // valid
        if (!this.addressUrl.endsWith("/")) {
            this.addressUrl = this.addressUrl + "/";
        }
    }
    
    private String addressUrl ;
    private String accessToken;
    private int timeout = 3;
    
    @Override
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
        return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
    }

    @Override
    /*xxx: 调度中心注册*/
    public ReturnT<String> registry(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
    }

    @Override
    public ReturnT<String> registryRemove(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
    }
    
}

# 日志清理任务

public class JobLogFileCleanThread {
    
    public void start(final long logRetentionDays){
        
         /*xxx: 如果日志保留时间小于3天,则不清理*/
        if (logRetentionDays < 3 ) {
            return;
        }
        
        localThread = new Thread(()->{
            while (!toStop) {
             	File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
                
                for (File childFile: childDirs) {
                 	/*xxx: 超出保留时间的日志,清理*/
                    /*xxx: 默认为保留近30天的日志 */
                   if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
                                    FileUtil.deleteRecursively(childFile);
                     }   
                }
                
                /*xxx: 线程频度为1天*/
                TimeUnit.DAYS.sleep(1);
            }
        });
        
    }
 	   
}

# 触发器回调的初始化

public class TriggerCallbackThread {
    
    	public void start() {
            /*xxx: 触发器回调,必须配置调度中心地址*/
            if (XxlJobExecutor.getAdminBizList() == null) {
                return;
            }
            
            /*xxx: 触发器回调线程 */
            triggerCallbackThread = new Thread(()->{
                while(!toStop){
                 	/*xxx: 该步会阻塞获取数据 */
                    HandleCallbackParam callback = getInstance().callBackQueue.take(); 
                    
                    List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                    getInstance().callBackQueue.drainTo(callbackParamList);
                    /*xxx: 执行回调 */
                    if (callbackParamList!=null && callbackParamList.size()>0) {
                        doCallback(callbackParamList);
                    }
                }
                
                /*xxx: 退出线程时,继续处理回调*/
                List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                
                getInstance().callBackQueue.drainTo(callbackParamList);
                doCallback(callbackParamList);
                
            });
            
            //xxx: 对于出错的回调,进行重试, 它将一直重试,直至成功
            triggerRetryCallbackThread = new Thread(()->{
                while(!toStop){
                    //xxx: 回调重试
                 	retryFailCallbackFile();
                    
                    //xxx: 调用频度为30秒
                    TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                }
            });
        }
    
    private void doCallback(List<HandleCallbackParam> callbackParamList){
     	/*xxx: 遍历尝试执行回调*/
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
         	ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
            //xxx: 如果正确的回调后,则不再继续后续的调度中心调用
            
            /*xxx: 如果出错,会记录日志*/
            if (!callbackRet) {
                appendFailCallbackFile(callbackParamList);
            }
        }
    }
    
    private void retryFailCallbackFile(){
    	
        /*xxx: 获取出错日志 */
        File callbackLogPath = new File(failCallbackFilePath);
        
        for (File callbaclLogFile: callbackLogPath.listFiles()) {
         	byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);
            
            List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);
            
            doCallback(callbackParamList);
        }
    }
}

# 执行器启动(重要)

/*xxx: 内置服务,用于接收 job请求,并进行处理,基于netty实现  */
public class EmbedServer {
 	
    /*xxx: 启动执行器服务 */
    public void start(final String address, final int port, final String appname, final String accessToken) {
     	/*xxx: 执行器初始化 */
        executorBiz = new ExecutorBizImpl(); 
        
        thread = new Thread(()->{
            
            /*xxx: 任务线程池*/
           ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                        0,
                        200,
                        60L,
                        TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(2000),null,null);
            
            //xxx: 通过netty启动服务,略
             bootstrap.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel channel) throws Exception {
                                    channel.pipeline()
                                            .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                            .addLast(new HttpServerCodec())
                                            .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                            /*xxx: 请求处理*/
                                            .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                                }
                            })
                            .childOption(ChannelOption.SO_KEEPALIVE, true);
            
            
            ChannelFuture future = bootstrap.bind(port).sync();
            
            //xxx: 向注册中心注册 执行器信息
            startRegistry(appname, address);
            
            /*xxx: 维持当前线程 */
            future.channel().closeFuture().sync();
            
        });
    }
    
    /*xxx: 向调度中心注册服务 */
    public void startRegistry(final String appname, final String address) {
        // start registry
        /*xxx: 向调度中心 注册 调度器 */
        ExecutorRegistryThread.getInstance().start(appname, address);
    }
    
}
/*xxx: 任务处理器*/
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    
    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

        /*xxx: 调用线程池执行任务*/
        bizThreadPool.execute(()->{
            /*xxx: 执行实际业务方法*/
            Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
        });
         	
        /*xxx: 响应请求*/
        writeResponse(ctx, keepAlive, responseJson);
    }
    
    /*xxx: 执行实际的业务方法 */
    private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

        //xxx: 省略其它抽象...
        if ("/run".equals(uri)) {
                    TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                    /*xxx: 通过执行器执行任务 */
                    return executorBiz.run(triggerParam);
         }   
        //xxx: 省略其它抽象...
    }
}

# 注册中心注册执行器(重要)

/*xxx: 注册中心注册执行器 线程 */
public class ExecutorRegistryThread {
 	public void start(final String appname, final String address){
     	/*xxx: 调度中心注册表线程 */
        registryThread = new Thread(()->{
            while (!toStop) {
             	/*xxx: 遍历所有的 调度中心 */
                for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                    /*xxx: 委托调度中心 向注册中心 注册服务 */
                    ReturnT<String> registryResult = adminBiz.registry(registryParam);
                    
                    /*xxx: 委托成功后,就没必要再次委托下一个了 */
                }
                /*xxx: 调度器向调度中心中心注册, 频度为 30秒 */
                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
            }
            
            /*xxx: 线程结束时,移除所有在调度中心注册的调度器 */
            ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
        });
    }
}

# 执行器的任务执行过程(重要)

public class ExecutorBizImpl implements ExecutorBiz {
 	
    /*xxx: 执行器实际执行的方法*/
    public ReturnT<String> run(TriggerParam triggerParam) {
     	
        /*xxx: 加载任务线程 */
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        
        /*xxx: 根据任务类型,分别处理*/
        if (GlueTypeEnum.BEAN == glueTypeEnum) {
         	/*xxx: 加载作业处理器 */
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());   
            if (jobHandler == null) {
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return ;
                }
            }
        }
        
        //xxx: 省略其它抽象...
        
        if (jobThread != null) {
         	/*xxx: 获取 执行器的阻塞策略*/
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            
            /*xxx: 丢弃后续调度*/
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
             	return;   
            }else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
             	//xxx: 覆盖之前的调度
                jobThread = null;
            }
        }
        
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }
        
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    }
}
/*xxx: 作业执行器*/
public class XxlJobExecutor  {
 	/*xxx: 作业线程注册表 */
    private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
    
    public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
     	JobThread newJobThread = new JobThread(jobId, handler);
        newJobThread.start();
        
        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);
    }
}
/*xxx: 作业线程 */
public class JobThread extends Thread{
 	
    /*xxx: 作业处理器 */
	private IJobHandler handler;
    
    /*xxx: 作业触发器列表 */
	private LinkedBlockingQueue<TriggerParam> triggerQueue;
    
    public void run() {
        while(!toStop){
         	/*xxx: 获取触发任务*/
			triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);   
            
            if (triggerParam!=null) {
                
                /*xxx: 如果触发器有超时时间,则是同步执行*/
					if (triggerParam.getExecutorTimeout() > 0) {
                        FutureTask<Boolean> futureTask=()->{
                           /*xxx: 调用业务处理逻辑*/
							handler.execute();
                       	};
                        
                       Thread futureThread = new Thread(futureTask);
                        futureThread.start();
                        
                        /*xxx: 阻塞获取执行结果*/
						Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                        
                    }else{
                        /*xxx: 如果触发器没有指定超时时间,则是异步执行*/
						handler.execute();
                    }
                
               try{
                   //上面的部分,全部在try里面...
               } finally{
                   	//xxx: 触发回调
                    TriggerCallbackThread.pushCallBack();
                }
                
            }else{
                //xxx: 空闲时间大于了30秒,则关闭当前线程
				if (idleTimes > 30) {
					if(triggerQueue.size() == 0) {	
						XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
					}
				}
            }
        }
    }	
    
}

# 执行器核心线程行为说明

  • 执行线程为阻塞执行,有超时时间限制,超时后线程会销毁,且执行线程与具体的jobId绑定
  • 执行线程执行任务,根据是否超时,可分为同步调用与异步调用,单线程调用无法指定超时,对于调度器来说,都是同步的;
  • 一次触发,经历了多次异步动作:request->bizThreadPool(一个线程池,最大200线程)->JobThread(单业务线程)->futureThread(如果有超时设置)

# 实践说明

  • executor(通常是应用本身),如果配置了 appName以及注册之功效地址,则会进行自动注册。但是注意注册的是executor,这个在页面是没有直接的显示页面的
  • 调度中心的处理单位是executor-group,也就是菜单执行器列表,无论是自动注册,还是手动注册,都需要手动的去维护。自动注册的区别在于可以实现动态管理(appName相同的应用自动划归为一个组),手动注册的方式可以静态的管理应用列表;
  • 除此之外,job本身也不会自动注册到注册中心,都需要手动维护
  • 简言之,调度中心无法直观的感受到应用是否成功注册,任务是否成功装载,都需要手动配置,尽管实际上是生效了;