拼团退款中采用分片处理降低对数据库
完整拼团处理系统代码实现
一、核心实体类
/*** 拼团信息实体*/
@Entity
@Table(name = "group_info")
@Data
public class Group {@Id@Column(name = "id", length = 32)private String id; // 使用雪花算法生成的ID@Column(name = "process_slot", nullable = false)private Integer processSlot; // 处理时间段(0-143)@Column(name = "expire_time", nullable = false)private Date expireTime; // 业务过期时间@Column(name = "status", length = 20)@Enumerated(EnumType.STRING)private GroupStatus status; // 状态枚举@Column(name = "created_time")private Date createdTime; // 创建时间
}/*** 拼团状态枚举*/
public enum GroupStatus {OPEN, // 进行中CLOSED, // 已关闭PROCESSING // 处理中
}
二、数据访问层
public interface GroupRepository extends JpaRepository<Group, String> {/*** 查询待处理拼团(分页查询保证性能)* @param currentSlot 当前时间槽* @param pageable 分页参数*/@Query("SELECT g FROM Group g WHERE " +"g.processSlot = :slot AND " +"g.status = 'OPEN' AND " +"g.expireTime < CURRENT_TIMESTAMP")Page<Group> findGroupsToProcess(@Param("slot") Integer currentSlot, Pageable pageable);/*** 批量更新状态(使用IN语句提升效率)*/@Modifying@Query("UPDATE Group g SET g.status = :status " +"WHERE g.id IN :ids")int batchUpdateStatus(@Param("ids") List<String> ids,@Param("status") GroupStatus status);
}
三、服务层核心实现
@Service
@RequiredArgsConstructor
public class GroupService {private final GroupRepository groupRepository;private final RedisLockService redisLockService;private final MQService mqService;/*** 创建拼团(包含分片时间计算)*/@Transactionalpublic void createGroup(GroupCreateRequest request) {Group group = new Group();group.setId(SnowflakeGenerator.nextId()); // 雪花ID生成group.setExpireTime(calculateExpireTime(request.getDuration())); group.setProcessSlot(calculateProcessSlot(group.getId()));group.setStatus(GroupStatus.OPEN);groupRepository.save(group);}/*** 计算处理时间槽(核心算法)*/private Integer calculateProcessSlot(String groupId) {CRC32 crc32 = new CRC32();crc32.update(groupId.getBytes());return (int)(crc32.getValue() % 144); // 144=24h*6}/*** 定时任务处理方法*/@Scheduled(cron = "0 */10 * * * ?") // 每10分钟执行@Async("taskExecutor") // 异步线程池public void processExpiredGroups() {// 1. 计算当前slotlong currentSlot = (System.currentTimeMillis() / (10 * 60 * 1000)) % 144;// 2. 分页查询处理(防止内存溢出)int page = 0;Page<Group> groupPage;do {groupPage = groupRepository.findGroupsToProcess((int)currentSlot, PageRequest.of(page, 500) // 每页500条);// 3. 批量处理batchProcess(groupPage.getContent());page++;} while (groupPage.hasNext());}/*** 批量处理逻辑*/@Transactional(propagation = Propagation.REQUIRES_NEW) // 新事务public void batchProcess(List<Group> groups) {if (groups.isEmpty()) return;// 1. 获取分布式锁(按groupID加锁)String lockKey = "group_process:" + groups.get(0).getId();boolean locked = redisLockService.tryLock(lockKey, 30);if (!locked) return;try {// 2. 更新状态为处理中List<String> ids = groups.stream().map(Group::getId).collect(Collectors.toList());groupRepository.batchUpdateStatus(ids, GroupStatus.PROCESSING);// 3. 发送退款MQ消息groups.forEach(group -> {RefundMessage message = new RefundMessage();message.setGroupId(group.getId());message.setAmount(calculateRefundAmount(group));mqService.sendRefundMessage(message);});} finally {redisLockService.unlock(lockKey);}}
}
四、配置类
@Configuration
@EnableAsync
public class TaskConfig {/*** 定时任务线程池配置*/@Bean("taskExecutor")public TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(100);executor.setThreadNamePrefix("group-process-");executor.initialize();return executor;}/*** RedisTemplate配置*/@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));return template;}
}
五、分布式锁工具类
@Service
@RequiredArgsConstructor
public class RedisLockService {private final RedisTemplate<String, Object> redisTemplate;/*** 获取分布式锁(简化版)*/public boolean tryLock(String key, long expireSeconds) {return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "LOCKED", expireSeconds, TimeUnit.SECONDS));}/*** 释放锁*/public void unlock(String key) {redisTemplate.delete(key);}
}
六、消息队列处理
@Service @RequiredArgsConstructor public class MQService { private final RocketMQTemplate rocketMQTemplate;
/*** 发送退款消息*/
public void sendRefundMessage(RefundMessage message) {rocketMQTemplate.asyncSend("REFUND_TOPIC", MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("退款消息发送成功: {}", message.getGroupId());}@Overridepublic void onException(Throwable e) {log.error("退款消息发送失败,进入重试队列", e);retryQueue.add(message);}});
}/*** 消费退款消息*/
@RocketMQMessageListener(topic = "REFUND_TOPIC",consumerGroup = "refund_consumer_group"
)
public class RefundConsumer implements RocketMQListener<RefundMessage> {@Overridepublic void onMessage(RefundMessage message) {// 调用支付系统退款接口paymentService.refund(message);}
}
}
七、运行验证
1. 创建测试拼团
GroupCreateRequest request = new GroupCreateRequest();
request.setDuration(24); // 24小时有效期
groupService.createGroup(request);// 生成的拼团示例:
// ID: 7837264872346723(雪花ID)
// process_slot: 123(根据CRC32计算)
// expire_time: 创建时间+24小时
2. 查看定时任务日志
Logs for processing group in slot 84:
- Start processing group in slot 84: 2024-04-25 14:10:00 [group-process-1] INFO
- Batch update of 500 group statuses: 2024-04-25 14:10:02 [group-process-1] INFO
- Successful sending of refund message for 500 groups: 2024-04-25 14:10:05 [group-process-1] INFO
3. 数据库状态变化
id | process_slot | status | expire_time |
---|---|---|---|
7837264872346723 | 84 | PROCESSING | 2024-04-25 13:00:00 |
提示:完整运行需要以下中间件:
- MySQL 5.7+
- Redis 5.0+
- RocketMQ 4.9+
建议使用Docker快速搭建测试环境
最后说明
采用crc32计算原因:
性能与均衡性的最佳平衡:相比MD5/SHA等加密哈希速度更快,比原生hashCode分布更均匀.