【RedisLockRegistry】分布式锁
RedisLockRegistry分布式锁
介绍
RedisLockRegistry是Spring框架提供的一种分布式锁机制,它基于Redis来实现对共享资源的保护,防止多个进程同时对同一资源进行修改,从而避免数据不一致或其他问题
基本原理
RedisLockRegistry通过Redis的原子操作来实现分布式锁。其主要原理包括:
- 独占性:同一时刻只能被一个客户端持有,确保互斥性。
- 健壮性:通过设置锁的过期时间来防止死锁,确保锁能够在一定时间内自动释放,避免资源长时间被占用
- 对称性:加锁和解锁必须由同一客户端执行,防止非法释放他人持有的锁
- 高可用性:当部分节点故障时,不影响分布式锁服务的稳定性
核心特点
-
互斥性:同一时刻只有一个客户端能持有锁
-
可重入性:同一个客户端可以多次获取同一个锁
-
超时机制:防止死锁,锁会自动释放
-
高可用:基于 Redis,性能高且可靠
应用场景
-
防止重复处理:如定时任务在集群环境下的执行控制
-
资源争用:如库存扣减、秒杀系统
-
关键业务流程:如支付订单处理
-
分布式系统协调:如主节点选举
使用方法
1.添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-redis</artifactId>
</dependency>
2.配置分布式锁
@Configuration
public class RedisLockConfig {/*** 锁过期毫秒数*/private static final long EXPIRE_AFTER_MILLS = 600000L;@Bean(destroyMethod = "destroy")public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {return new RedisLockRegistry(redisConnectionFactory, "redis-lock", EXPIRE_AFTER_MILLS);}
}
默认有效时间是60秒,如果是默认的时间,则修改为
@Configuration
public class RedisLockConfig {/*** 锁过期毫秒数*/private static final long EXPIRE_AFTER_MILLS = 600000L;@Bean(destroyMethod = "destroy")public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {return new RedisLockRegistry(redisConnectionFactory, "redis-lock");}
}
3.基本使用示例
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.stereotype.Service;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;@Service
public class OrderService {private final RedisLockRegistry redisLockRegistry;public OrderService(RedisLockRegistry redisLockRegistry) {this.redisLockRegistry = redisLockRegistry;}public void processOrder(String orderId) {// 获取锁对象,orderId作为锁的keyLock lock = redisLockRegistry.obtain(orderId);try {// 尝试获取锁,等待最多3秒,锁持有30秒(与配置一致)if (lock.tryLock(3, TimeUnit.SECONDS)) {try {// 获取锁成功,执行业务逻辑System.out.println("处理订单: " + orderId + ", 线程: " + Thread.currentThread().getName());// 模拟业务处理Thread.sleep(1000);} finally {// 释放锁lock.unlock();}} else {// 获取锁失败System.out.println("获取锁失败,订单: " + orderId);}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println("获取锁被中断");}}
}
4.高级用法-可重入锁
public void reentrantMethod(String resourceId) {Lock lock = redisLockRegistry.obtain(resourceId);try {if (lock.tryLock()) {try {System.out.println("外层方法获取锁");// 调用另一个也需要相同锁的方法nestedMethod(resourceId);} finally {lock.unlock();}}} catch (Exception e) {e.printStackTrace();}
}private void nestedMethod(String resourceId) {Lock lock = redisLockRegistry.obtain(resourceId);try {if (lock.tryLock()) {try {System.out.println("内层方法获取锁");// 业务逻辑} finally {lock.unlock();}}} catch (Exception e) {e.printStackTrace();}
}
5.定时任务分布式锁示例
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;@Component
public class ScheduledTasks {private final RedisLockRegistry redisLockRegistry;public ScheduledTasks(RedisLockRegistry redisLockRegistry) {this.redisLockRegistry = redisLockRegistry;}@Scheduled(cron = "0 */5 * * * ?") // 每5分钟执行一次public void distributedCronJob() {Lock lock = redisLockRegistry.obtain("report-generation");try {if (lock.tryLock(10, TimeUnit.SECONDS)) {try {// 生成报表的业务逻辑System.out.println("开始生成报表...");Thread.sleep(5000); // 模拟耗时操作System.out.println("报表生成完成");} finally {lock.unlock();}} else {System.out.println("其他节点正在生成报表,本节点跳过");}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
RedisLockRegistry 分布式锁工具类
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;/*** Redis分布式锁工具类*/
@Component
public class RedisDistributedLock {private final RedisLockRegistry redisLockRegistry;public RedisDistributedLock(RedisLockRegistry redisLockRegistry) {this.redisLockRegistry = redisLockRegistry;}/*** 尝试获取锁(立即返回)** @param lockKey 锁的key* @return 是否获取成功*/public boolean tryLock(String lockKey) {Lock lock = redisLockRegistry.obtain(lockKey);return lock.tryLock();}/*** 尝试获取锁(带超时时间)** @param lockKey 锁的key* @param timeout 超时时间* @param unit 时间单位* @return 是否获取成功*/public boolean tryLock(String lockKey, long timeout, TimeUnit unit) {Lock lock = redisLockRegistry.obtain(lockKey);try {return lock.tryLock(timeout, unit);} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;}}/*** 释放锁** @param lockKey 锁的key*/public void unlock(String lockKey) {Lock lock = redisLockRegistry.obtain(lockKey);lock.unlock();}/*** 执行带锁的业务逻辑(立即返回)** @param lockKey 锁的key* @param processor 业务处理器* @param <T> 返回值类型* @return 业务处理结果,如果获取锁失败返回null*/public <T> T executeWithLock(String lockKey, LockProcessor<T> processor) {if (tryLock(lockKey)) {try {return processor.process();} finally {unlock(lockKey);}}return null;}/*** 执行带锁的业务逻辑(带超时时间)** @param lockKey 锁的key* @param timeout 超时时间* @param unit 时间单位* @param processor 业务处理器* @param <T> 返回值类型* @return 业务处理结果,如果获取锁失败返回null*/public <T> T executeWithLock(String lockKey, long timeout, TimeUnit unit, LockProcessor<T> processor) {if (tryLock(lockKey, timeout, unit)) {try {return processor.process();} finally {unlock(lockKey);}}return null;}/*** 执行带锁的业务逻辑(无返回值,立即返回)** @param lockKey 锁的key* @param processor 无返回值的业务处理器* @return 是否成功获取锁并执行*/public boolean executeWithLock(String lockKey, VoidLockProcessor processor) {if (tryLock(lockKey)) {try {processor.process();return true;} finally {unlock(lockKey);}}return false;}/*** 执行带锁的业务逻辑(无返回值,带超时时间)** @param lockKey 锁的key* @param timeout 超时时间* @param unit 时间单位* @param processor 无返回值的业务处理器* @return 是否成功获取锁并执行*/public boolean executeWithLock(String lockKey, long timeout, TimeUnit unit, VoidLockProcessor processor) {if (tryLock(lockKey, timeout, unit)) {try {processor.process();return true;} finally {unlock(lockKey);}}return false;}/*** 业务处理器接口(有返回值)*/@FunctionalInterfacepublic interface LockProcessor<T> {T process();}/*** 业务处理器接口(无返回值)*/@FunctionalInterfacepublic interface VoidLockProcessor {void process();}
}
使用示例
1.基本用法
@Service
public class OrderService {private final RedisDistributedLock redisDistributedLock;public OrderService(RedisDistributedLock redisDistributedLock) {this.redisDistributedLock = redisDistributedLock;}public void processOrder(String orderId) {if (redisDistributedLock.tryLock(orderId, 3, TimeUnit.SECONDS)) {try {// 处理订单业务逻辑System.out.println("处理订单: " + orderId);} finally {redisDistributedLock.unlock(orderId);}} else {System.out.println("获取锁失败,订单: " + orderId);}}
}
2.使用函数式接口(推荐)
@Service
public class InventoryService {private final RedisDistributedLock redisDistributedLock;public InventoryService(RedisDistributedLock redisDistributedLock) {this.redisDistributedLock = redisDistributedLock;}public boolean deductStock(String productId, int quantity) {return redisDistributedLock.executeWithLock("stock:" + productId, 2, TimeUnit.SECONDS,() -> {// 在这里写扣减库存的业务逻辑System.out.println("扣减商品库存: " + productId + ", 数量: " + quantity);return true; // 返回业务处理结果}) != null;}
}
3.无返回值的使用方式
public void generateReport() {boolean executed = redisDistributedLock.executeWithLock("report-generation",5,TimeUnit.SECONDS,() -> {// 生成报表的业务逻辑System.out.println("开始生成报表...");Thread.sleep(3000);System.out.println("报表生成完成");});if (!executed) {System.out.println("其他节点正在生成报表,本次跳过");}
}
注意:
- 确保锁的key具有唯一性,不同业务使用不同的key前缀
注意事项
-
锁过期时间:设置合理的过期时间,太短可能导致业务未完成锁就释放,太长可能导致其他客户端等待过久
-
异常处理:确保锁在finally块中释放,避免死锁
-
Redis可用性:Redis集群的高可用配置很重要,避免单点故障
-
时钟同步:确保所有使用锁的服务器的系统时钟同步
锁粒度:根据业务需求选择合适的锁粒度,太粗影响并发,太细增加复杂度
参考博客
分布式锁之RedisLockRegistry