xxl-job 入门
官网
概述
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
快速开始
项目源码
下载项目源码并解压
自上到下分别为:调度中心、核心依赖、执行器样例
选择合适的版本,这里为了兼容jdk1.8和低版本的maven使用2.4.0版本
调度中心
在doc文件夹中找到数据库初始化脚本,运行sql文件
修改xxl-job-admin的数据库配置信息为我们自己的数据库
自此,xxl-job的调度中心的准备工作已经完成
访问http://localhost:8080/xxl-job-admin/
账号:admin 密码:123456
处理器
在自己本地的maven中install xxl-job-core
新建一个springboot项目,导入web依赖。
pom文件中导入依赖
<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.4.0</version>
</dependency>
修改配置文件
# 应用服务 WEB 访问端口
server.port=8085### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 调度中心通讯TOKEN [选填]:非空时启用;
xxl.job.admin.accessToken=default_token
### 调度中心通讯超时时间[选填],单位秒;默认3s;
xxl.job.admin.timeout=3
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-executor-sample
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
xxl.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯使用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9998
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=30
address=ip:port
一个执行器对应 一台主机下的某一端口提供的服务
创建xxl-job处理器的配置类
package org.daolong.config;import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import groovy.util.logging.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class XxlJobConfig {private static final Logger log = LoggerFactory.getLogger(XxlJobConfig.class);// 调度中心配置@Value("${xxl.job.admin.addresses:}")private String adminAddresses;@Value("${xxl.job.admin.accessToken:default_token}")private String accessToken;@Value("${xxl.job.admin.timeout:3}")private int timeout;// 执行器配置@Value("${xxl.job.executor.appname:}")private String appname;@Value("${xxl.job.executor.address:}")private String address;@Value("${xxl.job.executor.ip:}")private String ip;@Value("${xxl.job.executor.port:9999}")private int port;@Value("${xxl.job.executor.logpath:}")private String logPath;@Value("${xxl.job.executor.logretentiondays:30}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {log.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}
}
创建调度任务,交给Spring管理
package org.daolong.job;import com.xxl.job.core.handler.annotation.XxlJob;
import groovy.util.logging.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;@Component
@Slf4j
public class DemoJob {private static final Logger log = LoggerFactory.getLogger(DemoJob.class);private static SimpleDateFormat simpleDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 调度任务使用@XxlJob JobHandler为demoJob@XxlJob("demoJob")public void demoJob() {DemoJob.log.info("demoJob is running..."+simpleDate.format(new java.util.Date()));}
}
自此,处理器的准备工作已经完成,启动xxl-job-demo
重新访问任务调度中心
工作流程
在调度中心的任务管理中新增任务,设置cron表达式为每10秒钟执行一次
启动测试任务
控制台输出的结果符合预期
可以在调度中心中查看调度日志
Bean模式和GLUE模式
BEAN模式(推荐默认方式)
特点
- 静态加载:任务逻辑以Java类形式预编译在项目中
- 强类型:通过
@XxlJob
注解或实现IJobHandler
接口开发 - 高性能:直接调用已加载的类,无运行时解析开销
配置方式
java
复制
@XxlJob("demoJobHandler")
public ReturnT<String> execute(String param) {// 业务逻辑return ReturnT.SUCCESS;
}
适用场景
✅ 稳定核心业务
✅ 需要复杂事务管理的任务
✅ 对性能要求高的高频任务
GLUE模式(动态脚本)
特点
- 动态更新:代码通过调度中心Web界面实时编辑生效
- 多语言:支持Java/Shell/Python等脚本语言
- 灵活性:无需重启执行器即可更新逻辑
配置方式
- 在调度中心选择
GLUE(Java)
类型 - 直接在Web编辑器编写代码:
public class GlueJob implements IJobHandler {public ReturnT<String> execute(String param) {XxlJobLogger.log("动态执行的逻辑");return ReturnT.SUCCESS;}
}
适用场景
✅ 快速迭代的临时任务
✅ 需要频繁调整参数的小型任务
✅ 多语言混合调度环境
关键对比
维度 | BEAN模式 | GLUE模式 |
部署成本 | 需重新打包部署 | 实时生效,无需重启 |
调试效率 | 依赖本地IDE调试 | 支持Web界面直接编辑测试 |
性能影响 | 无运行时解析损耗 | 每次执行需动态编译/解释 |
代码安全 | 代码受版本控制保护 | 依赖调度中心的权限管控 |
GLUE模式示例
快速开始使用的是bean模式,现在给出一个GLUE模式的示例。
在xxl-job-demo中新增一个HelloService,
package org.daolong.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;import java.time.LocalDateTime;@Service
public class HelloService {private static final Logger log = LoggerFactory.getLogger(HelloService.class);public String hello() {log.info("Hello World!"+ LocalDateTime.now());return "Hello World!";}public String hello(String name) {return "Hello " + name + "!";}
}
在调度中心中新增一个GLUE模式的任务
在GLUE IDE中进行编辑
启动glue任务,控制台打印的结果符合预期
路由策略
策略名称 | 策略值 | 工作原理 | 适用场景 |
第一个(FIRST) |
| 选择集群中第一个注册的执行器 | 需要固定执行节点的场景 |
最后一个(LAST) |
| 选择集群中最后一个注册的执行器 | 特殊调试或备份节点测试 |
轮询(ROUND) |
| 按注册顺序依次选择执行器(均匀分配) | 常规负载均衡需求(默认策略) |
随机(RANDOM) |
| 从在线执行器中随机选取一个 | 简单无状态任务 |
一致性HASH(HASH) |
| 对任务参数做Hash计算,相同参数始终路由到同一节点 | 参数依赖型任务(如分片处理) |
最不经常使用(LFU) |
| 选择近期调用次数最少的执行器 | 需要避免节点过载的敏感任务 |
最近最久未用(LRU) |
| 选择最久未被调用的执行器 | 长期运行任务的资源释放优化 |
故障转移(FAILOVER) |
| 先选第一个节点,失败后自动切换到下一个 | 高可用要求的任务 |
忙碌转移(BUSYOVER) |
| 自动跳过正在运行任务的执行器 | 防止任务堆积的紧急调度 |
分片广播(SHARDING) |
| 所有执行器同时执行,通过分片参数区分处理数据范围 | 大数据量并行处理 |
执行器集群
配置两个springboot应用(模拟集群),配置好他们各自的服务端口号和执行器端口号
vm options
启动
在执行器中可以看到两个注册结点。
编辑测试任务01,将路由策略改为轮询并启动任务
控制台打印信息如下,符合预期:
8085
8086
路由策略:分片广播
需求
在指定节假日,需要给平台的所有用户去发送祝福的短信。
准备工作
新建xxl_job_demo数据库,导入sql文件
📎xxl_job_demo.sql
<!--MyBatis驱动-->
<dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>
<!--mysql驱动-->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId>
</dependency>
<!--lombok依赖-->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>provided</scope>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.10</version>
</dependency>
# MySQL 数据源配置
spring.datasource.url=jdbc:mysql://localhost:3306/xxl_job_demo?serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
@Setter
@Getter
public class UserMobilePlan {private Long id;//主键private String username;//用户名private String nickname;//昵称private String phone;//手机号码private String info;//备注
}
@Mapper
public interface UserMobilePlanMapper {@Select("select * from t_user_mobile_plan")List<UserMobilePlan> selectAll();
}
@Slf4j
@Component
public class SimpleJob {private static final Logger log = LoggerFactory.getLogger(SimpleJob.class);@Resourceprivate UserMobilePlanMapper userMobilePlanMapper;@XxlJob("sendMsgHandler")public void sendMsgHandler() throws Exception {List<UserMobilePlan> userMobilePlans = userMobilePlanMapper.selectAll();log.info("任务开始时间:" + new Date() + ",处理任务数量:" + userMobilePlans.size());Long startTime = System.currentTimeMillis();userMobilePlans.forEach(item -> {try {//模拟发送短信动作TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}});log.info("任务结束时间:" + new Date());log.info("任务耗时:" + (System.currentTimeMillis() - startTime)/1000 + "秒");}
}
分片前
在调度中心添加任务,每分钟执行一次
启动任务,查看控制台输出
可以看到,在分配前,一个注册结点处理2000条数据的任务需要耗时约31秒。
分片概念讲解
比如我们的案例中有2000+条数据,如果不采取分片形式的话,任务只会在一台机器上执行,这样的话需要30+秒才能执行完任务.
如果采取分片广播的形式的话,一次任务调度将会广播触发对应集群中所有执行器执行该任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
获取分片参数方式:
// 可参考Sample示例执行器中的示例任务"ShardingJobHandler"了解试用
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
通过这两个参数,我们可以通过求模取余的方式,分别查询,分别执行,这样的话就可以提高处理的速度.
之前2000+条数据只在一台机器上执行需要30+秒才能完成任务,分片后,有3台机器可以共同完成2000+条数据,每台机器处理666+条数据,这样的话只需要10秒就能完成任务
分片后
@Mapper
public interface UserMobilePlanMapper {@Select("select * from t_user_mobile_plan")List<UserMobilePlan> selectAll();// 分片查询@Select("select * from t_user_mobile_plan where mod(id,#{shardTotal})=#{shardIndex}")List<UserMobilePlan> selectByShardId(@Param("shardTotal")Integer shardTotal,@Param("shardIndex") Integer shardIndex);
}
@Slf4j
@Component
public class SimpleJob {private static final Logger log = LoggerFactory.getLogger(SimpleJob.class);@Resourceprivate UserMobilePlanMapper userMobilePlanMapper;@XxlJob("sendMsgHandler")public void sendMsgHandler() throws Exception {int shardTotal = XxlJobHelper.getShardTotal();int shardIndex = XxlJobHelper.getShardIndex();log.info("分片总数:" + shardTotal + ",分片序号:" + shardIndex);List<UserMobilePlan> userMobilePlans = userMobilePlanMapper.selectByShardId(shardTotal, shardIndex);//List<UserMobilePlan> userMobilePlans = userMobilePlanMapper.selectAll();log.info("任务开始时间:" + new Date() + ",处理任务数量:" + userMobilePlans.size());Long startTime = System.currentTimeMillis();userMobilePlans.forEach(item -> {try {//模拟发送短信动作TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}});log.info("任务结束时间:" + new Date());log.info("任务耗时:" + (System.currentTimeMillis() - startTime) / 1000 + "秒");}
}
修改路由策略为分片广播
两个执行器
结果如下(执行实时间由原来的31秒变为现在的15秒)
三个执行器
结果如下(只需要耗时10s即可完成任务)