榜单持久化
榜单持久化的基本流程是这样的:
-
创建表
-
持久化Redis数据到数据库
-
清理Redis数据
现在,创建表的动作已经完成,接下来就轮到Redis数据的持久化了。持久化的步骤如下:
-
读取Redis数据
-
判断数据是否存在
-
不存在,直接结束
-
存在,则继续
-
-
保存数据到数据库
不过,Redis的数据结构如图:
其KEY中包含一个上赛季对应的日期,因此要读取Redis数据,我们必须先得到上赛季的日期。
另外,我们采用了水平分表的策略,每一个赛季都是一个独立表。那么在写数据到数据库时,必须先知道表名称。
综上,最终持久化的业务流程如图:
动态表名
持久化的流程中存在一个问题,我们的数据库持久化采用的是MybatisPlus来实现的。而MybatisPlus读取表名的方式是通过实体类上的@Table
注解,而注解往往是写死的:
那我们该如何让MybatisPlus在执行的时候改变数据写入的表名称呢?
MybatisPlus中提供了一个动态表名的插件:
https://baomidou.com/pages/2a45ff/#dynamictablenameinnerinterceptor
插件的部分源码如下:
可见表名称动态获取就是依赖于tableNameHandlerMapping中的具体的TableNameHandler,这个Map如图:
这个Map的key是旧的表名称,value是TableNameHandler,就是表的名称处理器,用于根据旧名称获取新名称。
TableNameHandler的源码如下:
public interface TableNameHandler {/*** 生成动态表名** @param sql 当前执行 SQL* @param tableName 表名* @return String*/String dynamicTableName(String sql, String tableName);
}
OK,因此我们要做的事情就很简单了,定义DynamicTableNameInnterInterceptor
,向其中添加一个TableNameHandler
,将points_board
这个表名,替换为points_board_赛季id
的名称。
不过,新的问题来了,这个插件中的TableNameHandler该如何获取赛季对应的表名称呢?
计算表名的方式是获取获取上赛季时间,查询数据库中上赛季信息,得到上赛季id。然后拼接得到表名。
当我们批量的写数据到数据库时,如果每次插入都计算一次表名,那性能也太差了。因此,我们肯定是希望一次计算,在TableNameHandler中可以随时获取。
那么该如何实现呢?
2.4.1.2.传递表名
一旦我们计算完表名,以某种方式传递给插件中的TableNameHandler,那么就无需重复计算表名了。
不过,问题来了:要知道动态表名称插件,以及TableNameHandler,都是由MybatisPlus内部调用的。我们无法传递参数。
那么该如何传递表名称呢?
虽然无法传参,但是从计算表名,到动态表名插件执行,调用TableNameHandler,都是在一个线程内完成的。要在一个线程内实现数据共享,该用什么呢?
大家应该很容易想到,就是ThreadLocal.
我们可以在定时任务中计算完动态表名后,将表名存入ThreadLocal,然后在插件中从ThreadLocal中读取即可:
我们在tj-learning
的com.tianji.learning.utils
包下定义一个传递表名称的工具:
package com.tianji.learning.utils;public class TableInfoContext {private static final ThreadLocal<String> TL = new ThreadLocal<>();public static void setInfo(String info) {TL.set(info);}public static String getInfo() {return TL.get();}public static void remove() {TL.remove();}
}
然后在tj-learning
模块下定义一个配置类,用于定义DynamicTableNameInnterInterceptor
插件:
package com.tianji.learning.config;import com.baomidou.mybatisplus.extension.plugins.handler.TableNameHandler;
import com.baomidou.mybatisplus.extension.plugins.inner.DynamicTableNameInnerInterceptor;
import com.tianji.learning.utils.TableInfoContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class MybatisConfiguration {@Beanpublic DynamicTableNameInnerInterceptor dynamicTableNameInnerInterceptor() {// 准备一个Map,用于存储TableNameHandlerMap<String, TableNameHandler> map = new HashMap<>(1);// 存入一个TableNameHandler,用来替换points_board表名称// 替换方式,就是从TableInfoContext中读取保存好的动态表名map.put("points_board", (sql, tableName) -> TableInfoContext.getInfo() == null ? tableName : TableInfoContext.getInfo());return new DynamicTableNameInnerInterceptor(map);}
}
插件虽然定义好了,但是该如何继承到MybatisPlus中呢?
在天机学堂项目中的tj-common模块中,已经实现了MybatisPlus的自动装配,并且定义了很多的MP插件。如果我们在自己的项目中重新定义MP配置,就会导致tj-common中的插件失效。
所以,我们应该修改tj-common
中的MP配置,将DynamicTableNameInnerInterceptor
配置进去。找到tj-common
模块下的MybatisConfig
配置:
动态表名已经准备就绪,接下来我们就可以去定义定时任务,实现榜单持久化了。
在tj-learning
模块的com.tianji.learning.handler.PointsBoardPersistentHandler
中添加一个定时任务:
@XxlJob("savePointsBoard2DB")
public void savePointsBoard2DB(){// 1.获取上月时间LocalDateTime time = LocalDateTime.now().minusMonths(1);// 2.计算动态表名// 2.1.查询赛季信息Integer season = seasonService.querySeasonByTime(time);// 2.2.将表名存入ThreadLocalTableInfoContext.setInfo(POINTS_BOARD_TABLE_PREFIX + season);// 3.查询榜单数据// 3.1.拼接KEYString key = RedisConstants.POINTS_BOARD_KEY_PREFIX + time.format(DateUtils.POINTS_BOARD_SUFFIX_FORMATTER);// 3.2.查询数据int pageNo = 1;int pageSize = 1000;while (true) {List<PointsBoard> boardList = pointsBoardService.queryCurrentBoardList(key, pageNo, pageSize);if (CollUtils.isEmpty(boardList)) {break;}// 4.持久化到数据库// 4.1.把排名信息写入idboardList.forEach(b -> {b.setId(b.getRank().longValue());b.setRank(null);});// 4.2.持久化pointsBoardService.saveBatch(boardList);// 5.翻页pageNo++;}// 任务结束,移除动态表名TableInfoContext.remove();
}
XXL-JOB任务分片
刚才定义的定时持久化任务,通过while死循环,不停的查询数据,直到把所有数据都持久化为止。这样如果数据量达到数百万,交给一个任务执行器来处理会耗费非常多时间。
因此,将来肯定会将学习服务多实例部署,这样就会有多个执行器并行执行。但是,如果交给多个任务执行器,大家执行相同代码,都从第1页逐页处理数据,又会出现重复处理的情况。
怎么办?
这就要用到任务分片的方案了。
怎样才能确保任务不重复呢?我们可以参考扑克牌发牌的原理:
要想知道每一个执行器执行哪些页数据,只要弄清楚两个关键参数即可:
因此,现在的关键就是获取两个数据:
-
逐一给每个人发牌
-
发完一圈后,再回头给第一个人发
-
重复上述动作,直到牌发完为止
最终,每个执行器处理的数据页情况:
-
执行器1:处理第1、4、7、10、13、...页数据
-
执行器2:处理第2、5、8、11、14、...页数据
-
执行器3:处理第3、6、9、12、15、...页数据
-
起始页码:pageNo
-
下一页的跨度:step
-
起始页码:执行器编号是多少,起始页码就是多少
-
页跨度:执行器有几个,跨度就是多少。也就是说你要跳过别人读取过的页码
-
执行器编号
-
执行器数量
@XxlJob("savePointsBoard2DB") public void savePointsBoard2DB(){// 1.获取上月时间LocalDateTime time = LocalDateTime.now().minusMonths(1);// 2.计算动态表名// 2.1.查询赛季信息Integer season = seasonService.querySeasonByTime(time);// 2.2.存入ThreadLocalTableInfoContext.setInfo(POINTS_BOARD_TABLE_PREFIX + season);// 3.查询榜单数据// 3.1.拼接KEYString key = RedisConstants.POINTS_BOARD_KEY_PREFIX + time.format(DateUtils.POINTS_BOARD_SUFFIX_FORMATTER);// 3.2.查询数据int index = XxlJobHelper.getShardIndex();int total = XxlJobHelper.getShardTotal();int pageNo = index + 1; // 起始页,就是分片序号+1int pageSize = 10;while (true) {List<PointsBoard> boardList = pointsBoardService.queryCurrentBoardList(key, pageNo, pageSize);if (CollUtils.isEmpty(boardList)) {break;}// 4.持久化到数据库// 4.1.把排名信息写入idboardList.forEach(b -> {b.setId(b.getRank().longValue());b.setRank(null);});// 4.2.持久化pointsBoardService.saveBatch(boardList);// 5.翻页,跳过N个页,N就是分片数量pageNo+=total;}TableInfoContext.remove(); }
清理Redis缓存任务
当任务持久化以后,我们还要清理Redis中的上赛季的榜单数据,避免过多的内存占用。
在
tj-learning
模块的com.tianji.learning.handler.PointsBoardPersistentHandler
中添加一个定时任务:package com.tianji.learning.handler;import com.tianji.common.utils.CollUtils; import com.tianji.common.utils.DateUtils; import com.tianji.learning.constants.RedisConstants; import com.tianji.learning.domain.po.PointsBoard; import com.tianji.learning.service.IPointsBoardSeasonService; import com.tianji.learning.service.IPointsBoardService; import com.tianji.learning.utils.TableInfoContext; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.RequiredArgsConstructor; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component;import java.time.LocalDateTime; import java.util.List;import static com.tianji.learning.constants.LearningConstants.POINTS_BOARD_TABLE_PREFIX;@Component @RequiredArgsConstructor public class PointsBoardPersistentHandler {private final IPointsBoardSeasonService seasonService;private final IPointsBoardService pointsBoardService;private final StringRedisTemplate redisTemplate;// ... 略@XxlJob("clearPointsBoardFromRedis")public void clearPointsBoardFromRedis(){// 1.获取上月时间LocalDateTime time = LocalDateTime.now().minusMonths(1);// 2.计算keyString key = RedisConstants.POINTS_BOARD_KEY_PREFIX + time.format(DateUtils.POINTS_BOARD_SUFFIX_FORMATTER);// 3.删除redisTemplate.unlink(key);} }
任务链
现在,所有任务都已经定义完毕。接下来就给配置任务调度了。
我们最终期望的任务执行顺序是这样的
要想让任务A、B依次执行,其实就是配置任务B作为任务A的子任务。因此,我们按照下面方式配置:
-
创建历史榜单表(10)的子任务是持久化榜单数据任务(12)
-
持久化榜单数据任务(12)的子任务是清理Redis中的历史榜单(13)
也就是说:10的子任务是12, 12的子任务是13