黑马点评redis改 part 3
优惠券秒杀
全局唯一id
每个店铺都可以发布优惠券:
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:实际开发中数据库ID一般不会参与业务逻辑 增加一个订单号字段就好
- id的规律性太明显
- 受单表数据量的限制
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
- 唯一性 时间戳 uuid雪花算法
- 高可用
- 高性能
- 递增性 不是递增的话每次插入数据,都会重建索引,当数据量大的时候重建索引的时候比较耗时
- 安全性
我们redis里string数据结构是自增特性的,有一个increase的命令,而这个首先可以确保唯一,因为什么redis是独立于数据库之外的,第二高可用是redis将来有集群方案和主从方案哨兵方案
全局id生成器里,为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
ID的组成部分:
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
我们在utils中新建一个RedisIdWorker,实现一个基于redis的id生成器,
package com.hmdp.utils;import org.springframework.data.redis.core.StringRedisTemplate;import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;public class RedisIdWorker {//开始时间戳private static final long BEGIN_TIMESTAMP = 1640995200L;private static final int COUNT_BITS=32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix) {//1.生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;//2.生成序列号//2.1.获取当前日期,精确到天String date =now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));//2.2.白增长long count =stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix +":"+ date);//这里讲的应该是序列号会一直增而不会随着时间戳变化而刷新回1的意思吧,所有的业务生成id次数超过也是可能的//3.拼接并返回//直接加也行,不过没有或运算效率高return timestamp << COUNT_BITS | count;}
}
HmDianPingApplicationTests.java如下
package com.hmdp;import com.hmdp.entity.Shop;
import com.hmdp.service.impl.ShopServiceImpl;
import com.hmdp.utils.CacheClient;
import com.hmdp.utils.RedisIdWorker;import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cache.CacheManager;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;import static com.hmdp.utils.RedisConstants.CACHE_SHOP_KEY;@SpringBootTest
class HmDianPingApplicationTests {@Resourceprivate ShopServiceImpl shopService;@Resourceprivate CacheClient cacheClient;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate ExecutorService es=Executors.newFixedThreadPool(500);// CountDownLatch大致的原理是将任务切分为N个,让N个子线程执行,并且有一个计数器也设置为N,哪个子线程完成了就N-1@Testvoid testIdWorker() throws InterruptedException {CountDownLatch latch = new CountDownLatch(300);Runnable task=()->{for(int i =0;i<100;i++) {long id = redisIdWorker.nextId("order");System.out.println("id =" + id);}latch.countDown();};long begin = System.currentTimeMillis();for (int i = 0; i < 300; i++) {es.submit(task);}latch.await();long end = System.currentTimeMillis();System.out.println("time =" + (end - begin));}@Testvoid testSaveShop()throws InterruptedException {Shop shop= shopService.getById(1L);cacheClient.setWithLogicalExpire(CACHE_SHOP_KEY+1L,shop, 10L,TimeUnit.SECONDS);}}
死了妈一样没有任何引用问题为什么报错???
这一节课我们其实讲的是基于redis自增的策略,对吧?事实上除了这种策略以外,全球eid的策略还有很多种别的,比方说UUID,UID是大家非常熟悉的一种方案了,对吧?直接利用吉利可以自带的uid工具类就能够生成了,这种生成策略它生成的其实是一个16进制的一长串的数值,这时候因为是16进制,所以说它反馈的结果其实是个字符串结构,并且的话也不是这种单调递增的一种特性,因此虽然可以作为ead,但是不够友好,没有满足我们这些所说的那些个特性,所以说这种用的比较少,我们采用是redis的自增方案,这种方案相对来讲各种特性都能够满足,而且整体是单调递增的,数值的长度也不大,总共也不超过了。
而且它是一个数字类型,它存储在数据库里占的空间相对来讲也比较小,也比较友好一些。然后其实除了这些以外,还会有这种思路,福利卡学生算法,什么福利和算法的话,也是世界知名的一种全局VIP的一种生成策略,它也采用的是一个浪费型的64位数字,有兴趣同学可以去百度一下看一看它的一个原理,你会发现跟我们还是有一些接近的,只不过它的自动采购是当前机器的这种自增内部维护的,所以说它需要维护一个机器ID,因为我们是用的race都是不管你是任何的分布式系统,它都是用redis作为,所以它不需要维护机械维,相对来讲结构更简单一点。
那么雪崩算法也是一个非常不错的算法,它不依赖于类似,所以说它的性能来讲可能理论上讲会比redis要更好一点。但是它有自己的缺点,就是对于时钟依赖也是比较高的,如果时间不准确,可能会出现一些异常的问题。
最后一种方法是利用数据库自增的,我们刚才不是说数据库自动不行吗?这怎么还说还能使用数据库字等等。这其实是,因为我们在这数据库的增长,不是说我们在新增订单表的时候把订单而是单独整一张表,这张表专门用来做子灯,这样一来不管你的订单表是10张表还是8张表,他们的ID其实还不是自动的,他们的ID从哪来?从专门用来做自动的那张表去获取,等于是这N张表用的是不是同一个表的自动ID,这样的话其实就可以实现这样的一个唯一效果了,其实你可以把它理解成是什么?就是redis自增的数据库版,也就是因为它不再使用类似的,而是用数据库的。
所以说原理上来讲也可能很像,但是从性能来角度来考虑,数据库的性能肯定不如redis智能性能更好,对不对?所以说企业里头去使用税务自动的时候往往会采取一些方案,比如说批量的去获取ID,然后在内存的缓存起来,然后再去使用这样一种方案,我觉得也可以一定程度上提高它的性能,所以这些方案的话,后面这三种方案在企业里都有去应用,同学们可以自己去百度一下,研究一下这些方案它的一些不同点。
那么我们的认识之中的策略核心,大家需要注意的,首先它的整体结构时间戳加自增ID,对吧?首先说加总裁也是整体结构,然后在redis保存 key的时候有一个需要注意的事项,就是我们是每天一个key,还有两个优势,第一是方便统计每天的订单量,每月订单量和每年订单量。第二,他还可以去限定什么 T字,中的一个值不会让它太大,以至于超过了我们存储的上限。另外就是它的结构,刚才说了时间出现计数器的方式
实现优惠券秒杀下单
每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购
表关系如下:
●tb_voucher:优惠券的基本信息,优惠金额、使用规则等
●tb_seckilL_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息
为http://localhost:8081/voucher/seckill在postman里面post信息
{"shopId":1,"title":"100元代金券","subTitle":"周一至周五均可使用","rules":"全场通用\\n无需预约\\n可无限叠加\\不兑现、不找零\\n仅限堂食","payValue":8000,"actualValue":10000,"type": 1,"stock":100,"beginTime":"2024-08-24T11:45:14","endTime":"2024-08-24T19:19:10"
}
可以看到sql
这里如果餐厅名字和图片不显示的因为前面解决缓存击穿使用了逻辑过期时间,人需要数据预热,如果你把redis数据清除了就没预热的数据了,可以把解决缓存击穿那个改成用缓存穿透的,就不用添加热点数据了; 如果出现新添加的秒杀券不显示的,去数据库该过期时间往后延一延就ok
下单时需要判断两点:
秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
库存是否充足,不足则无法下单
VoucherOrderController.java
package com.hmdp.controller;import com.hmdp.dto.Result;
import com.hmdp.service.IVoucherOrderService;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {@Resourceprivate IVoucherOrderService voucherOrderService;@PostMapping("seckill/{id}")public Result seckillVoucher(@PathVariable("id") Long voucherId) {return voucherOrderService.seckillVoucher(voucherId);}
}
IVoucherOrderService.java
package com.hmdp.service;import com.hmdp.dto.Result;
import com.hmdp.entity.VoucherOrder;
import com.baomidou.mybatisplus.extension.service.IService;public interface IVoucherOrderService extends IService<VoucherOrder> {Result seckillVoucher(Long voucherId);
}
VoucherOrderServicelmpl.java(可能有nullntr的问题)
package com.hmdp.service.impl;import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IVoucherService;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.time.LocalDateTime;@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠券拿到信息;其实后续Redission可以直接用信号量来锁库存SeckillVoucher voucher =seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//尚未开始return Result.fail("秒杀已经结束!");}//4.判断库存是否充足if (voucher.getStock()<1) {//库存不足return Result.fail("库存不足!");}//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id",voucherId).update();if (!success) {//扣减失败return Result.fail("库存不足!");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 6.1. 订单idlong orderId = redisIdWorker.nextId( "order");voucherOrder.setId(orderId);// 6.2. 用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 6.3. 代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//7.返回订单idreturn Result.ok(orderId);}
}
超卖问题
线程数:200
Ramp-Up时间(秒):1
去RefreshTokenInterceptor类中获取对应的authorization值(可用redis题目的方式获取),再将值填入JMeter的登录状态头中;以新增一个信息头管理器 然后把authorization参数值带上就可以了
发给你的文件里有这个jmx可以直接用,删除刚刚你那条记录,你发现100张卷卖出109张,明显的超卖,居然是-9张
为什么?实际上就是在库存为1的时候 有多个线程都通过了if判断
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:我们之前在解决缓存击穿的时候,所采用的锁就是悲观锁
- 悲观锁
认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。例如Synchronized、Lock都属于悲观锁 。当然性能不咋地 - 乐观锁
认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。
如果没有修改则认为是安全的,自己才更新数据。
如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。
VoucherOrderServicelmpl.java修改一下
//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1")//这里一定是相等的吧,应该是之前查到的,你现在现查,肯定相同.eq("voucher_id",voucherId).eq("stock",voucher.getStock()).update();if (!success) {//扣减失败return Result.fail("库存不足!");}
以后对库存去做一个判断,完了以后去扣减库存。那么按照我们之前分析的所谓的乐观锁方案,其实就是在执行扣减的这一刻去加上一个条件,判断什么?我更新时的这一刻,库存值与我查询到的库存值是不是同一个?如果是证明在我之前没人修改过,我是不是就可以放心大胆修改了。所以说我需要在word条件里去添加一个对stock值的判断,它的值就是我查到的值,我查到的值是不是就是stock值
这个代码给大家解释一下,这里的set其实等于set条件,也就是setstock=stock-1,下面这两个是where条件?它们等于?Where?Id等于一个?Stock等于一个?好,所以其实对罗伯斯的判
断,只要我在更新时where条件库存值,等于我查询到的库存值证明我上面这个判断是没有问题的,证明在我之前没有人修改过,我放心大胆去扣是不是就没问题?
为什么不是把一开始查到的库存用变量存起来去做where判断?而是再调用一次,调用的不就是此刻库存的吗?那不就一定成立吗?
tb voucher order数据全部删除,优惠卷重新改成100张余额
结果只卖出去20张,这是为什么呢???
现在假设说我的库存还剩100,然后有无数的线程都涌入进来了,没加锁那这些线程是不是就并行执行,比如说100个线程,都查到了100,然后我们接下来只有一个线程会去执行这个更新的动作,判断是否大于0,然后就是那样,现在变成99了,那这剩下的99个线程都会认为什么在我之前有人改了,不能去犯错误就取消;失败率大大提高
我们不再判断库存是否与我查到的相等了,我们只要判断库存大于零,
//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1")//这里一定是相等的吧,应该是之前查到的,你现在现查,肯定相同.eq("voucher_id",voucherId).gt("stock",0).update();if (!success) {//扣减失败return Result.fail("库存不足!");}
删掉数据库的内容, 重新设置余额,我们可以看到数据库100张正好卖完(这其实也是悲观锁,数据库的悲观
update语句有排它锁,不可能同时两个事务修改该数据
一人一单
修改秒杀业务,要求同一个优惠券,一个用户只能下一单,在当前情况下我们在这个订单表里面的所有的,这100张订单竟然都是同一个用户买
能否 给用户id和优惠券id建立联合的唯一索引???
修改秒杀逻辑
package com.hmdp.service.impl;import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IVoucherService;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.time.LocalDateTime;@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠券拿到信息;其实后续Redission可以直接用信号量来锁库存SeckillVoucher voucher =seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//尚未开始return Result.fail("秒杀已经结束!");}//4.判断库存是否充足if (voucher.getStock()<1) {//库存不足return Result.fail("库存不足!");}//6.一人一单Long userId = UserHolder.getUser().getId();//6.1.查询订单int count = query().eq("user_id",userId).eq("voucher_id", voucherId).count();//6.2.判断是否存在if(count>0) {//用户已经购买过了return Result.fail("用户已经购买过一次!");}//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1")//这里一定是相等的吧,应该是之前查到的,你现在现查,肯定相同.eq("voucher_id",voucherId).gt("stock",0).update();if (!success) {//扣减失败return Result.fail("库存不足!");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1. 订单idlong orderId = redisIdWorker.nextId( "order");voucherOrder.setId(orderId);// 7.2. 用户idvoucherOrder.setUserId(userId);// 7.3. 代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回订单idreturn Result.ok(orderId);}
}
该代码在一人一单校验 环节存在明显的线程安全问题,可能导致并发下单。具体分析如下:
是的,这句话是正确的。该代码在**一人一单校验**环节存在明显的线程安全问题,可能导致并发下单。
**问题根源**
1. **竞态条件(Race Condition)**
代码中的 `查询订单` 和 `创建订单` 是两个独立的操作,不具有原子性。当多个请求同时到达时:
- 线程A查询订单,发现用户未下单(`count=0`)。
- 线程B几乎同时查询订单,也得到 `count=0`。
- 两个线程都会通过校验,继续执行后续的扣库存和下单逻辑,导致同一用户生成多个订单。2. **事务边界问题**
虽然方法标注了 `@Transactional`,但事务的生效范围是**方法内整体**,而数据库的隔离级别(如 `READ COMMITTED`)无法保证 `query().count()` 的实时可见性。在并发场景下,事务内的中间状态可能对其他事务不可见。**解决方案**
需通过以下方式保证原子性:1. **数据库唯一约束**
- 在 `voucher_order` 表的 `(user_id, voucher_id)` 字段上添加**唯一索引**。
- 捕获数据库插入异常(如 `DuplicateKeyException`),防止重复下单。
```java
// 创建订单时捕获异常
try {
save(voucherOrder);
} catch (DuplicateKeyException e) {
return Result.fail("用户已下单");
}
```
**优点**:简单可靠,直接依赖数据库约束。
**缺点**:需处理异常,可能影响性能。2. **分布式锁**
- 使用 Redis 或其他分布式锁,确保同一用户同一优惠券的下单操作串行化。
```java
// 示例:使用Redisson的RLock
RLock lock = redissonClient.getLock("lock:order:user:" + userId + ":voucher:" + voucherId);
try {
lock.lock();
// 执行原有逻辑
} finally {
lock.unlock();
}
```
**优点**:完全避免并发冲突。
**缺点**:增加系统复杂度,需处理锁竞争和死锁。3. **乐观锁(CAS)**
- 在扣减库存时,结合版本号或条件更新(如 `stock=stock-1 WHERE stock>0`)保证库存扣减的原子性。
- 但需注意:这只能解决库存超卖问题,无法直接解决一人多单问题。*代码改进建议**
结合唯一索引和事务回滚:@Transactional @Override public Result seckillVoucher(Long voucherId) {// ...其他校验逻辑(时间、库存)...// 一人一单校验(依赖数据库唯一索引)Long userId = UserHolder.getUser().getId();try {// 直接尝试插入订单,依赖唯一索引抛出异常VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(redisIdWorker.nextId("order"));voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);save(voucherOrder); // 若重复插入,抛出DuplicateKeyException} catch (DuplicateKeyException e) {return Result.fail("用户已下单");}// 扣减库存(需保证原子性)boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {// 扣减失败,回滚订单(需手动处理或依赖事务)throw new RuntimeException("库存扣减失败");}return Result.ok(orderId); }
原代码的线程安全问题确实存在,必须通过**数据库约束**或**分布式锁**确保原子性。推荐优先使用数据库唯一索引,因其简单且可靠。此处的问题依然是由于隔离级别的问题,默认RR
对于一人一单的判断是在这里 , 先查询出订单,根据用户id和vouchar id。那我拿这个查结果一定是0.那么
if(count>0) {//用户已经购买过了return Result.fail("用户已经购买过一次!"); }
就一定是不是就往下走直接下单了,那因为呢你是一个多线程并发操作,你就会出现多个线程穿插执行的情况,那个一人一单也是如此;现在呢假设说我们的数据库里根本就没有订单,有100个线程都来执行这个逻辑,大家都来做查询查到都是0,也就是说现在没有任何一个人去做插入,要并发的情况下都是0,都不成立,都往下走,于是就连续插入了n多条订单;如果不允许多平台多端登录的话这里确实已经解决了,后续讲的是一个用户瞬间并发的解决方案,涉及的知识点有点多
我们需要把这一段逻辑啊给它做一个悲观锁封装
而是当前用户我们以用户id再去加锁,这样的话我们是不是可以去把锁的范围缩小了,也就是说同一个用户我去加一把锁,不同用户加不同锁。因此在这里不用synchronize的这种方法,用关键字
userId.toString,我们期望的是id值一样的作为一把锁,但是每一个请求来这个id对象是不是都是一个全新的id对象,因此的话呢这个对象变了锁也变了,这不行,我们要求的是值一样所有使用tostring。但是tostring就能保证它是按照值来加锁的吗?其实在这个tostring方法里面底层调用的啊是这个long的一个静态的特征函数,在它的内部其实是new了一个字符串,那么每调用一次tostring都是一个全新的字符串对象,也就是这个锁对象是还会再变,即使你id一样(常量池才会复用,new的话是每次在堆里开辟一个新对象,因此值一样对象地址也不同)JVM学的
package com.hmdp.service.impl;import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IVoucherService;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.time.LocalDateTime;@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠券拿到信息;其实后续Redission可以直接用信号量来锁库存SeckillVoucher voucher =seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())){//尚未开始return Result.fail("秒杀已经结束!");}//4.判断库存是否充足if (voucher.getStock()<1) {//库存不足return Result.fail("库存不足!");}return createVoucherOrder(voucherId);}//你会注意到上面那个标黄了,这里事务是没用的,事务注解的本质是一个切面类,这里只有采用代理对象调用才会生效,直接在本类调用是不经过切面的@Transactionalpublic Result createVoucherOrder(Long voucherId) {//我们的同步锁就是this,是当前对象//另外呢这个事务的范围其实是更新数据库的一个范围:也就是说做减库存操作和创建电子操作而不是整个操作//6.一人一单Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {//实现同步代码块//6.1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();//6.2.判断是否存在if (count > 0) {//用户已经购买过了return Result.fail("用户已经购买过一次!");}//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1")//这里一定是相等的吧,应该是之前查到的,你现在现查,肯定相同.eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {//扣减失败return Result.fail("库存不足!");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1. 订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2. 用户idvoucherOrder.setUserId(userId);// 7.3. 代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回订单idreturn Result.ok(orderId);}}
}
我们现在呢是在方法内部来去加的锁 ,那之前我们synchronized加在方法上的话,是不是对整个方法加锁那现在在方法内部加锁就会存在一个问题啊,比方说我们现在这里开启事务开始执行,执行了之后我们获取锁或锁了以后,我开始做这个查询对吧,那然后查询完了以后,我就减库存,提交订单,提交订单完了以后,我先释放锁才会提交事务,对不对啊,因为这个事务是被我们Spring管理了,所以这个事务的提交,它是在我们方法函数执行完以后,由Spring做的提交,那么这个时候其实锁在这个大括号结束了以后,已经释放了,那同学们如果锁释放了,意味着其他线程是不是已经可以进来了,而此时因为事务尚未提交啊,如果有其他线程进来去查询订单的话,那我们刚刚新增的这个订单很有可能还没有写入数据库,对不对,因为你没提交,所以说他再去查询的时候依然不存在,是不是有可能出现并发安全问题,那么因此我们这个锁它锁定的范围有点小了,他应该是把这整个函数锁起来,那这样一来应该是事务提交之后,我们再去释放锁,对不对啊,那你说那怎么办呢,那你这个synchronized加在这里其实就不合适了啊,大家要注意这个范围了啊,那要加在哪里呢,我们把这个删掉,我们应该是加在这个函数到外边对吧,把整个函数锁起来啊,在这个地方去加锁,那当然了,你在这个地方加锁的对象是用户的id吗,所以说你需要在外边去获取这个用户id,然后上锁对吧,哎,这么来做才是ok的。
//4.判断库存是否充足if (voucher.getStock() < 1) {//库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {return createVoucherOrder(voucherId);} }//你会注意到上面那个标黄了,这里事务是没用的,事务注解的本质是一个切面类,这里只有采用代理对象调用才会生效,直接在本类调用是不经过切面的 @Transactional public Result createVoucherOrder(Long voucherId) {//我们的同步锁就是this,是当前对象//另外呢这个事务的范围其实是更新数据库的一个范围:也就是说做减库存操作和创建电子操作而不是整个操作//6.一人一单Long userId = UserHolder.getUser().getId();//6.1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
注意此时的锁跟方法的锁有很大区别:方法的锁是this对象,而现在的锁还是user对象 用 Lock 锁就可以手动解锁了,比较方便
直接加载方法上是对方法加锁,所有线程执行该方法都是单线程的,但是synchronized关键字是对相同的对象进行加锁,不同的对象互不影响,相同的对象为单线程(实际上这里是错误的 事务底层是aop实现的 因此 调用被包装方法应该采用aopcontext)
这里还存在一个事务的问题啊,什么事务问题呢?我们在这里是对当前的`createVoucherOrder`函数加了事务,但没有给外面这个函数加。而外面这个函数在调用的时候是这样调用的,这种调用其实是用`this`调的,对不对?就省去了一个`this`。那么这种调用的话,其实这个`this`拿的是谁呢?是当前的VoucherOrderServiceImpl`对象,而不是它的代理对象。我们知道,事务要想生效,是因为Spring对当前这个类做了动态代理,拿到了它的代理对象,用它来做的事务处理。那现在你这个`this`其实指的是非代理对象,也就是目标对象,所以它是没有事务功能的,对不对?这个是我们讲Spring事务失效的几种可能性之一嘛,那这就是一种可能性啊。那么大家在网上应该也都看到过对应问题的解决方案吧,怎么解决呢?其实,在这个地方的话,我们也可以借助一个API,叫做`AopContext`,利用它里面的`currentProxy`方法,那么就能够拿到当前对象的代理对象了,我们称之为`proxy`。那当前对象是谁呢?其实就是这个接口`IVoucherOrderService`的代理对象对吧?所以这里就用这个来接收就可以了啊,然后给它做个强转,它返回的是`Object`,但我们知道它肯定是`IVoucherOrderService`,那我们就拿到了当前代理对象了。这个时候呢,我们用代理对象来调用这个`createVoucherOrder`函数,而不是用`this`,那么这样的话,事务就能生效了,因为这个代理对象是Spring创建的嘛,所以它是带有事务的这样一个函数啊。那现在不存在的原因是因为什么呢?因为这个接口里是不存在这个函数的,我们现在是在实现类里做的对吧?所以,你把这个函数也给它创建一下,创建到我们的`IVoucherOrderService`接口里,那现在接口里有了,我们才能够基于接口的调用嘛,那现在这个事务就能生效了。
黑马SpringWeb的AOP:事务是基于AOP实现 AOP会将目标对象方法增强(在原方法前面加上开启事务 后面加上提交)生成一个新的代理对象放到bean中 所以只有代理对象有事务功能 而原对象没有
Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {IVoucherOrderService proxy= (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}}//记得在service里加一下
pom中添加依赖
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId> </dependency>
为启动类暴露这个代理对象,默认是不暴露的
@MapperScan("com.hmdp.mapper") @SpringBootApplication @EnableAspectJAutoProxy(exposeProxy=true) public class HmDianPingApplication {public static void main(String[] args) {SpringApplication.run(HmDianPingApplication.class, args);} }
或者在成员变量中直接注入自己,通过注入自己的对象去调用事务
重新运行成功只卖出一张票
ctrl+D复制一份, 选择添加虚拟机选项,就是jvm,是视频里environment的vm options
可以改下名字,实现两个两个
修改nginx下面的nginx.conf
proxy_pass_request_headers on;#more_clear_input_headers Accept-Encoding; proxy_next_upstream error timeout; #proxy_pass http://127.0.0.1:8081;proxy_pass http://backend;}}upstream backend {server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;}
}
需要把这里的43行打开,42行注释掉,然后去把任务管理器的所有ngix的进程通通杀死,然后再重启服务。
nginx.exe -s reload
是的,不同端口不断发送,sql中优惠卷恢复原貌,打上断点
同一个用户这里有一个并发锁的控制,只能一个用户进来,现在我们打开我们的postman
两台tomcat,两个jvm,两个字符串常量池,两把锁,当然都进来了
现在我们先来看一下,在正常情况下,我们之前是怎么做的。之前我们是不是单体项目?我们假设把这个灰色的框当成我们的一个单体项目。然后我们启动以后,其中有一个线程来查询订单,然后这个时候查完了以后,他去判断一下这个订单是否存在。如果说不存在,他就会去插入这个新订单。他执行完了以后,另外一个线程来查询,查询完了以后,因为这边已经插入了,所以说这个查询他再去判断的时候是否存在,肯定是存在的,是不是就报错啊,无法插入。这是正常串行执行的情况。
那现在呢,在多线程并发的情况下,在没有加锁的情况下,它不可能每一次都是这么正常的串行执行,它可能会出现交叉执行的情况。一旦出现交叉执行,线程一查询订单,现在发现不存在,线程二也来查询,也是不存在。好,紧接着线程一和线程二都去插入新订单,那么这个订单是不是就被插入了两次?这就是线程安全问题产生的原因。
后来我们干了什么事?后来我们加了锁,对不对?我们加了锁以后,也就是说一个线程来了以后,必须先获取锁,拿到锁了以后才可以去执行查询订单的这个动作,还有判断的动作。那假如说他拿到以后,他去执行插入新订单,然后去释放锁。此时另外一个线程,他也想要来获取锁,但是因为这把锁在这一时刻已经被线程一拿到了,所以线程二是拿不到的。他会失败。失败会干什么?根据synchronized的原理,失败了以后,他是不会等待的,那它会等待锁释放。等了很长时间,这边释放了,他终于可以获取成功了,然后才能执行。那你这个时候再去执行查询,那边已经插入了,你再来查是不是已经存在了,那么这个地方就会报错,不会插入,是不是就确保了串行执行了啊?这是我们理想的情况啊。
那现在发生了什么呢?现在我们不再是一台机器了,是多台了。那大家要知道的是,在当前这一个JVM内部锁的原理是什么?其实是在我们JVM内部维护了一个锁的监视器对象。这个监视器对象,我们用的是什么?用的是`id`,是不是在我们的常量池里边?那么在这个JVM内部,它维护了这一个池子。当`id`相同的情况下,他们是不是永远是同一个锁?也就是锁的监视器是同一个。所以说无论是线程一也好,还是线程二也好,他们来获取锁的时候,当线程一来获取锁的时候,那么锁监视器就会记录线程一的这个名称。那么当线程二再来获取的时候,他一看说不行,这已经有了,那他还能获取吗?不能,就不能了,这就是它的一个原理。
但是呢,当我们做集群部署的时候,一个新的部署就意味着这是一个全新的Tomcat,那就意味着这是一个全新的JVM。也就是说有两套JVM。那两个JVM是不是有各自的堆栈、各种方法区之类的?所以说呢,我们的JVM2,它也会有自己的常量池。所以呢,他用`id`作为锁的时候,它的监视器对象就会有一个全新的锁监视器了,跟JVM1的那个监视器是不是同一个呀?那现在当我们的线程三来获取锁的时候,他走的是自己的这个监视器。那这个监视器显然是空的呀,他那边已经有了,在这边没有,所以呢他也能获取锁成功,对不对?好,当然了,线程四肯定还是失败的,这没问题。也就是说我们这个锁监视器在当前JVM内部可以监视到这些线程实现互斥,但是呢,如果你有多个JVM,就会有多个锁监视器,是不是每一个JVM内部都会有一个线程是成功的?如果我们将来集群比如说部署了十台、20台,也就意味着并行的,至少会有十个线程是同时在运行的,那不就又一次出现了线程安全问题了?明白了吗?所以产生安全问题的原因是什么?在集群模式下,或者有一些是在分布式系统下,有多个JVM的存在,每个JVM里都有自己的锁,导致了每一个锁都可以有一个线程获取,于是就出现了并行运行,那么就可能出现安全问题。
好,那么要想解决这个问题,我们必须得想办法让多个JVM只能使用同一把锁,对不对?那么这样的锁可不是我们JVM里面提供的,需要我们自己去实现。在下节课我们就来讲一讲这种跨JVM的,或者讲跨进程的锁。
分布式锁
我们已经发现在集群模式下,`synchronized`的锁失效了。`synchronized`啊,它只能够保证单个JVM内部的多个线程之间的互斥,而没有办法让我们集群下的多个JVM进程之间互斥。那要想解决这个问题,我们必须使用分布式锁。所以这节课呢,来分析一下分布式锁的工作原理,当然也会动手去实践一个分布式锁。
好,下面呢我们先来分析一下分布式锁的工作原理。那这里呢有两个JVM。`synchronized`啊,就是利用JVM内部的锁监视器来控制线程的。在JVM的内部,因为只有一个锁监视器,所以只会有一个线程获取锁,可以实现线程间的互斥。但是当有多个JVM的时候呢,就会有多个锁监视器,那么就会有多个线程获取到锁,那这样呢就没有办法实现多JVM进程之间的互斥了。那要想解决这个问题,我们肯定不能再去使用JVM内部的锁监视器了吧,我们必须让多个JVM都去使用同一个锁监视器。那因此呢,它一定是一个在JVM外部的、多个JVM进程都可以看到的这样一个锁监视器。这个时候呢,无论是单个JVM内部的线程,还是多个JVM的线程,都应该来找这个监视器来获取锁。那这样呢也就只会有一个线程获取到就能够实现多进程之间的互斥了。
好,那我们来分析一下这个过程。现在呢假设我们的这个JVM1里边的线程一来去执行业务,那肯定要去获取锁。这时候他来获取呢,就是找我们这个外部的锁监视器。一旦获取成功,就会去记录当前获取的是这个线程一。那紧接着呢,此时如果有其他线程也来获取锁,比方说是在JVM2内部的一个线程三,那么它线程三来获取锁。那么这个时候呢,因为锁监视器里已经有线程一持有了,所以线程三获取一定会失败。那失败了他要怎么办呢?那它一旦失败,肯定要去等待锁释放,是不是啊?所以呢他就不停地等待。那这个时候呢,我们的线程一获取锁成功了,他就可以去执行自己的业务代码了。比方说,他下面要去查询订单,因为一人一单要做判断,判断一下这个订单是否存在。如果说它不存在,那现在他是第一个来的,肯定不存在。那这个时候他就可以去插入新订单了。好,那么插入之后他的业务完成,他就可以去释放锁了。当然了,此时用户已经有这个订单了。所以他释放完了以后呢,这边的线程三他拿到锁了。这个时候他去执行业务,获取锁成功,执行业务,查询。那他查询的时候呢,因为对方已经插入了,他一定能够查询到。查询到代表存在,存在就不能去插入了,所以就报错了。所以这样是不是就避免了这种安全问题的发生呢?而且呢,这个时候不仅仅是线程三,那么不管是我们JVM内部的线程,还是多个JVM的线程,比如说新增的线程四其实都是一样的。只要在线程一获取锁成功以后,他们来获取肯定都是失败的,因为他们都是找同一个锁监视器嘛,所以他们都会获取失败,那么都会出现一个阻塞等待,就跟线程三一样。所以呢,这个时候就能够实现什么呢?无论是单个JVM内部的线程,还是跨JVM的线程,都可以达到一个互斥效果。好,这就是分布式锁的一个工作原理,其实还是非常简单的吧。所以它最关键的点就是什么?一定要让多个JVM进程都能够看到同一个锁监视器,也就是多进程可见,最后呢还要互斥,也就是只有一个线程能拿到锁。
mysql zookeeper 都可以做分布式锁 只要是独立于jvm的 都可以
分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:
MySQL | Redis | Zookeeper | |
互斥 | 利用mysql本身的互斥锁机制 | 利用setnx这样的互斥命令 | 利用节点的唯一性和有序性实现互斥 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接,自动释放锁 | 利用锁超时时间,到期释放 | 临时节点,断开连接自动释放 |
好的同学们,那比较常见的分布式锁实现方式主要有这么三大类吧。第一类是使用MySQL,或者说使用数据库来实现;第二类是使用Redis来实现,也就是类似于这种缓存的;第三类就是使用ZooKeeper这样的东西来实现。这个东西有些同学可能没学过,但不重要,因为今天我们的重点是基于Redis来实现。我们先来分析一下这三种实现方式在互斥、高可用、高性能和安全性上的一些差异。
首先,互斥。MySQL怎么来实现互斥呢?我们都知道MySQL数据库或者其他数据库都具备一个事务机制,对不对?那么在事务执行的时候,或者说我们在执行写操作的时候,MySQL是不是自动会给你分配一个互斥的锁?这样一来,在多个事务之间是不是就是互斥的,只有一个人能去执行?我们完全可以利用这个原理来实现一个锁。比方说我们现在有一段业务需要去实现这样的分布式互斥锁,那么我们就可以在业务执行前先去MySQL里申请一个锁,然后去执行我们的业务。当业务执行完了以后,我们去提交事务,那这样一来锁不就释放了吗?那当我们的业务抛异常的时候,它会自动地触发回滚,锁是不是也释放了呀?这样一来,我们这个互斥效果还有锁的释放和获取是不是都很容易实现了对吧?这是MySQL或者说数据库的这种方式,怎么去实现锁呢?利用数据库本身的事务锁去做。
而它的可用性就依赖于MySQL本身的可用性能。我们知道MySQL是可以支持主从模式的,所以说它的可用性应该说还是不错的。那至于性能,我们所有的性能就会受限于MySQL的性能。我们知道像MySQL它的性能跟Redis相比肯定还是有些差距的,所以它的性能属于是比较一般,不能说好,但也不是很差。
那它的安全性呢?也就是说一旦出现异常情况,这个锁能不能及时地释放呢?其实是可以的。因为我们知道在MySQL里面,你利用这种事务机制去获取了锁,但是一旦你系统崩溃了,其实连接断开以后,锁是会自动去释放的,数据也会回滚,对不对?所以说这个问题不用考虑,非常好。这是基于MySQL这样的数据库去实现分布式锁。
那么用Redis去实现呢?这种方式我们其实以前介绍过,它利用的就是`SETNX`这样的一个互斥命令。还记得吗?我们是不是说过`SETNX`呢?是指我们去往Redis里`SET`一个数据的时候,只有数据不存在时才能`SET`成功,如果已经存在了,那就会`SET`失败。那么你想想看,如果我们有无数的线程,大家都来执行`SETNX`,并且`key`是同一个`key`,那肯定只能有一个人成功,其他人都会失败。那这样不就实现互斥了吗?这是它的互斥原理。那将来要释放锁也很简单,我只要把这个`key`删了,那么其他人是不是就能够去获取这个锁了,就能够`SETNX`成功了嘛?就这个意思。所以这是它获取锁和释放锁的原理,我们之前是不是讲过的。
当然了,它还有很多一些细节需要我们去考虑。就目前来讲,就是基于这个简单的这样一个`SETNX`效果就行了。那么它的可用性就不用多说了吧?我们知道Redis不仅仅只是主从,它还支持这种集群模式,所以它的可用性是能够得到保障的。而性能就更不用讲了,Redis性能是远远好于我们的MySQL的,所以说它实现锁的性能也是非常好的。
而安全性上呢?那我们获取锁成功,也就是我们`SETNX`执行完了,一旦我的服务出现故障了,那锁能不能得到释放呢?这个就不行了啊。因为我们利用`SETNX`是设置了一个`key`到Redis当中,那如果你服务宕机了,将来没有人去执行这个删除的动作,那么这个`key`就会一直在那里,那锁是不是就得不到释放?那这样一来其他人也拿不到锁,是不是就产生这种类似于死锁的效果了?那么就出现问题了。
所以为了应对这个问题呢,我们在利用`SETNX`获取锁的时候,必须想一个办法,将来一旦出现故障,锁也能释放。有什么办法呢?这里可以直接告诉大家,就是利用Redis当中的`key`的过期机制。我们知道Redis的`key`是可以设置过期时间的。那大家想一下,如果我在`SETNX`获取锁的时候,同时给它加上一个过期的时间,那么将来一旦我的服务出现了故障,没有人去手动释放,但是一旦到期以后,这个锁是不是也会自动释放呀?所以可以利用这个超时时间来解决这个安全性的问题。
不过呢,这个锁的释放时间设置多长呢?如果太长了,那这个锁呀它的无效等待时间就会比较多;那如果时间太短了,那万一我的业务还没执行完呢?所以说呢这种方式去做安全性的一个保证是可以的,但是还需要去完善。具体怎么解决呢,我们在后续会给大家去讲。
这是Redis,那么还有最后一种就是ZooKeeper了。ZooKeeper呢,它这个锁的原理其实是利用了它内部的这种节点机制。ZooKeeper内部是可以去创建这种数据节点的,而节点具备这种唯一性、有序性,另外还可以创建这种临时节点。所谓的唯一性就是我们去创建节点的时候,节点不能重复。所以它有序性就是每创建一个节点,节点的ID就是递增的。那利用这个特性怎么去获取锁实现互斥呢?这里我们可以利用这个有序性来实现互斥。比如说现在我们有很多线程都来在ZooKeeper里创建节点,然后这样一来,每一个节点它的ID是不是就是单调递增的?那如果我们约定,ID最小的那个算是获取锁成功,那这样一来是不是就实现了互斥?因为最小的是不是只有一个?当然你也可以利用唯一性,比如说大家都去创建节点,并且这个节点名称大家都是一样的,这样来是不是只有一个人成功?所以也可以,但一般情况下,我们都会利用这个有序性去实现这种互斥。
那这是获取锁,释放锁也很简单。将来呢你把节点删除,是不是你就不是最小的,另外一个人就变成最小的。所以这是释放锁。所以ZooKeeper里面,锁释放锁还是比较简单的。
那从可用性来考虑呢,它也是不错的,因为ZooKeeper天生也是支持集群的,所以它的可用性很好。而性能上呢,因为ZooKeeper它的集群强调的是强的一致性,而这种强的一致性都会导致它的主从之间做数据同步,去消耗一定的时间,所以它的性能相对来讲是比Redis要差一些的。
而安全性来讲,它做得也很好。因为它的这些节点往往创建的是临时节点,这个时候一旦出现故障,比如说服务宕机了,那么断开连接以后,它的节点会自动释放掉,所以锁就等于释放了。所以这块儿也不用去考虑。
所以其实从安全性的角度来讲,ZooKeeper和MySQL它们两个原理基本上类似,安全性是比较好的,比Redis来讲要稍微好一些。Redis只能利用这种超时机制去做。那么但是呢从可用性和性能上来讲,Redis是非常好的。那么因此呢,接下来我们就会带着大家去学习一下如何基于Redis来实现分布式锁。那至于MySQL和ZooKeeper的这种方式,我们在这里就不多讲了,因为我们整个教程主要是来学习Redis的。那么大家如果有兴趣,也可以去网上看一看其他两种方式实现的一个原理。
实现分布式锁时需要实现的两个基本方法:
获取锁:
互斥:确保只能有一个线程获取锁
#添加锁,利用setnx的互斥特性
SETNx lock thread1
#添加锁过期时间,避免服务岩机引起的死锁
EXPIRE lock 10
释放锁:
手动释放
超时释放:获取锁时添加一个超时时间
#释放锁,删除即可
DEL key
实现真正的原子性
#添加锁,NX是互斥、EX是设置超时时间
SET lock thread1 NX EX 10
非阻塞:尝试一次,成功返回true,失败返回false
Redis优化秒杀
基于Redis实现分布式锁初级版本
需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能。
在utils定义Ilock接口和SimpleRedisLock类
package com.hmdp.utils;public interface ILock {/*** 尝试获取锁* @param timeoutSec 锁持有的超时时间,过期后自动释放* @return true代表获取锁成功; false代表获取锁失败*/boolean tryLock(long timeoutSec);/*** 释放锁*/void unlock();
}
package com.hmdp.utils;import org.springframework.data.redis.core.StringRedisTemplate;import java.util.concurrent.TimeUnit;public class SimpleRedisLock implements ILock {private String name;//锁的名称,你可以给锁加一个统一前缀private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";@Overridepublic boolean tryLock(long timeoutSec) {//获取线程标识long threadId = Thread.currentThread().getId();//值不能随便设置,因为会有特殊情况,第一个线程执行到删除锁之前,key过期了,//其他线程就进来执行操作,这样就会把其他线程的锁给删了,这个值会用来比较是否是该线程的锁。具体可以看谷粒商城p158//获取锁Boolean success =stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,threadId+"",timeoutSec, TimeUnit.SECONDS);//由于拆箱不能直接returnreturn Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//释放锁stringRedisTemplate.delete(KEY_PREFIX+name);}
}
修改 VoucherOrderServicelmpl.java中机制改为自己实现的锁
/***************************************************///4.判断库存是否充足if (voucher.getStock() < 1) {//库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//创建锁对象SimpleRedisLock lock=new SimpleRedisLock("ordere:" +userId,stringRedisTemplate);//获取锁boolean isLock=lock.tryLock(1200);if(!isLock){return Result.fail("不能重复下单");}try {//获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}finally {//释放锁lock.unlock();}//try finally 事务可以生效,因为没有捕获异常。如果catch捕获了异常,需要抛出RuntimeException类型异常,不然事务失效。}
/*************************************************/
要每个用户都能拿到自己的锁,因为这里加锁的目的就是为了实现一人一单,超卖是通过update自带的行锁解决的,不是分布式锁解决的;如果 同一个人在锁过期了之后再来下单不是又能重复下单了嘛? 所以锁的过期时间要久一点
这种情况是同一个用户发了多次请求的情况吧,如果是不同的用户,那么userId不一样的,那么key就不一样(最主要是因为线程无法分清楚自己的锁)
好的同学们,在刚刚我们已经实现了基于Redis分布式锁的一个初级版本。在这个版本当中,获取锁的时候,我们采用了`SETNX`和`EX`的这种方式来实现这个互斥。在释放锁的时候呢,我们采用的是`DEL`直接把锁删掉,那么其他人就能够去获取锁了,实现方式非常的简单。在大多数情况下,这个锁都能够正常工作,但是,在一些极端情况下,它依然会存在一些问题。所以这节课我们就来分析一下,它可能存在什么样的问题。
在这儿我首先准备了这样一个时间线,我们可以把它理解成Redis锁的一个持有周期。然后呢,在这里我们假设说来了一个线程,这是一个线程1。那么这个线程1他在执行的过程中,首先要尝试去获取锁对吧,那么他就会向Redis去发起一个请求,要获取锁。那因为他是第一个来的,所以说他能够正确的获取锁,没有任何人去阻拦他,对吧。那么这个地方我用蓝色的线来标识,就是当前这个线程1拿到了锁。然后拿锁了以后呢,他要开始去执行自己的业务了,对不对?但是因为某种原因,他的业务产生了阻塞,那么这样一来,他的锁的持有周期是不是就会变长?哎,到什么时候为止呢?两种情况,一种呢就是他执行完了,他去释放,还有一种啊就是他阻塞时间太长了,甚至于超过了我们设置的那个超时时间。大家别忘了,我们获取锁的时候,是不是有一个`EX`超时时间,它就是一个兜底方案啊,避免就是因为没有人释放锁。虽然说我们设了个超时时间,但是如果这个业务阻塞的太久了,以至于都超过了这个超时时间了,同学们仔细看,那么这个时候是不是也会触发这个锁的一个超时释放啊?那么也就是说业务还没完呢,他提前释放了。那么它一旦提前释放,同学们先来看,这个时候其他线程再来获取锁的时候,是不是就能趁虚而入,获取成功了?如果说这个线程2此时来获取啊,他就成功了,这里是用紫色标识的,对吧。
那他成功以后是不是要去执行自己的业务了?没错。而就在线程2刚刚获取锁了以后,假设说线程1醒了,那么他的业务完成了。同学们,他的业务完成了以后,他要干什么?没错,业务完了肯定要释放锁,对吧?那我们释放锁的动作是怎么做的?我们是不是直接`DEL`,也就是说我们的线程1二话不说,上来直接来了个`DEL`删除,于是呢锁被释放了。谁的所谓什么了?诶,紫色的这个,这是线程2的锁被释放了。同学们,线程1把别人的锁是不是给释放了呀?那这个时候我们信号知不知道?唉,不知道,现在还在去执行自己的业务呢。就在这时,线程3来了,他趁虚而入,他说哎我也来获取一下,结果是因为锁被删了,他也能获取成功,开始执行自己的业务。同学们想要看,此时此刻是不是就同时有两个线程都拿到了锁,都在执行业务。所以又一次出现了什么呀?这种并行执行的情况,那么线程安全的问题就有可能再次发生。唉,这就是所谓的极端情况啊,同学们。我们分析一下这种极端情况产生的原因是什么?首先呢是因为业务阻塞导致了我们这个锁提前释放了,好,这是第一。其二呢,当这个哥们儿醒过来以后,这个时候的锁已经不是线程1的锁,而是线程2的锁,但是线程1二话不说,上来就把别人的锁给删了。这就像什么呢?啊,你那个放了学了,去这个自行车棚里啊,想去推自己的自行车,当然了,你得先开锁,你在这开了半天发现打不开是吧?啊,于是你一气之下拿了一个大钳子,直接把锁给剪断了,剪断了以后抬头一看不是自己的车,是不是啊?你这个锁就给弄错了,你把别人的锁给剪了。好,就是这个问题。所以这里归根结底要发生安全问题的最重要的原因就是线程1在释放锁的时候,他把别人的锁给删了,对吧?如果说你在释放锁的时候,不要那么着急,你先看一眼这个锁是不是你的了,你有没有资格去释放没有?所以说要想解决这个问题,关键在于什么?是不是在释放锁的时候要去做一个什么判断,对吧?判断什么?判断一下锁的标识是否一致,什么意思呢?
就是说当你醒了以后,你要去释放锁的时候,你要看一看这个锁里面存的这个标识啊,跟你当前线程是否一样的。大家别忘了啊,我们之前在去获取锁的时候,是不是存了一个线程标识进去。你看一下我们之前获取锁的时候,存在这个`33`是不是就成了个线程的标识,还记得吧。好,那如果说我在释放锁的时候,我把这个线程标识啊取出来,判断一下跟我当前这个线程是否一样,是不是就可以避免这个问题了?如果发现不一样,比如说当前是紫色的啊,我是蓝的,那肯定不行,那么我就什么都不做,它表示内容代表不一致,我就什么都不做。这样一来是不是就会避免误删别人的锁啊?那同样道理的线程二也是,线程二呢,他去执行自己的业务的时候,但因为线程一没有删他的锁,这样来的锁依然存在,因此线程二是不是就可以正常的执行,不受干扰的去执行了,就不会存在刚才那种情况。
那么当线程二执行业务完了以后,他却释放锁的时候,他也要做这个动作,任何一次释放锁,咱们以后都要做这个动作判断锁标识。那因为这个地方锁呢依然是紫色跟它一致,所以标识一样,跟自己返回,ok,那没事呢,他是不是就可以正常的去释放锁了啊?于是再发一次请求去做释放锁,那么它的锁就被释放掉了。那么此时线程三才能够去获取锁,去执行自己的业务。那这样一来是不是就避免了这个问题的发生?
好,那我们总结一下,其实要解决这个问题的关键就是什么呀?是不是在释放锁时做一个判断啊。那因此我们这个业务流程与原来相比,就会有一些变化。以前呢我们这里上来就去释放锁,现在不行了。现在第一,在获取锁的时候,你要把线程的这个标识给他存进去啊,我们之前存的是不是线程id,反正不管怎样,你要存一个标识进去。第二,在执行完了业务要释放锁的时候,你一定要判断一下这个标识是不是自己的,如果是你才能释放,如果不是,你是不是就不能释放呀?其实这个时候不是就代表你的锁早就没了,你就不用管就行了吗?也就是说我们业务就变成这样一个样子。ok,这就是我们刚才那个分布式锁它所存在的一个问题,以及对应的这个解决方案了。
改进Redis的分布式锁
需求:修改之前的分布式锁实现,满足:
1.在获取锁时存入线程标示(可以用UUID表示)
2. 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
如果一致则释放锁
如果不一致则不释放锁
SimpleRedisLock.java
private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";@Overridepublic boolean tryLock(long timeoutSec) {//获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();//值不能随便设置,因为会有特殊情况,第一个线程执行到删除锁之前,key过期了,//其他线程就进来执行操作,这样就会把其他线程的锁给删了,这个值会用来比较是否是该线程的锁。具体可以看谷粒商城p158//获取锁Boolean success =stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,threadId,timeoutSec, TimeUnit.SECONDS);//由于拆箱不能直接returnreturn Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//释放锁//这里static会使同一个jvm的所有线程的uuid一样,但是线程id不一样,而两个jvm的线程id有可能相同,但是uuid不一样//相同jvm有线程号id区分,不同jvm有uuid区分//获取线程标示String threadId=ID_PREFIX+Thread.currentThread().getId();//获取锁中的标示IString id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);//判断标示是否一致if(threadId.equals(id)) {stringRedisTemplate.delete(KEY_PREFIX + name);}}
uuid是区分不同服务器的,线程id是区分相同服务器的,static final的功能就是让同一个JVM进程只有一个UUID,这样就可以通过UUID区分不同的JVM进程,用线程id区分相同JVM进程下的线程
那同样呢,是前面这个Redis锁的一个周期,下边呢是一个线程。我们假设线程一上来去申请这个锁,因为就他自己嘛,所以这个获取锁肯定能够成功。于是他开始执行自己的业务。现在我们假设这个业务并没有阻塞啊,他成功执行完了。紧接着他要去释放锁了,而且释放锁是不是要先判断锁标识?于是他去判断锁标识,那么这个判断,因为锁是他自己,是不是一定是成功的,所以返回的是ok啊。
好,那么紧接着他要干什么了,同学们?紧接着是不是要执行释放锁的动作了?同学们注意了,判断锁标识和释放锁是两个动作啊,所以说呢判断是成功了,紧接着要释放。但是就在要释放时产生了阻塞,有这种可能性吧?唉,有的人会说了,这怎么会阻塞呢?判断完了直接就是释放了,中间又没有别的代码,为什么会堵塞呢?啊,同学们想想看啊,其实在JVM里有一个东西啊,叫做垃圾回收,对不对?所以当我们的JVM去做这种服务,那么它其实是会阻塞我们的所有的代码,所以这个时候就会产生阻塞。不是因为你的业务阻塞,是因为这边我们本身产生阻塞,是有这种可能性的啊。
那一旦发生了阻塞,那也就是说我轮到我去释放了啊,但是我被阻塞了,是不是就没有释放呀?而这个阻塞的时间如果足够长,很有可能就触发了我们那个锁的一个超时释放。那么锁一旦超时释放,其他的线程是不是就又可以趁虚而入了?比如说线程二,他呢就来获取锁,那因为这个锁被超时释放掉了,他是不是可以成功获取?于是他开始执行自己的业务。
而就在他获取锁成功的那一刻,如果说现在GC结束了,那么阻塞结束我们的线程恢复运行,而此时他去执行什么动作?哎,没错,要执行释放锁的动作了。因为判断是不是已经执行过了,他认为锁还是自己的,但其实现在锁是自己的吗?不是,已经是线程二的了,对不对?但是呢它不判断了,因为已经判断过了,所以直接执行释放锁,于是就把线程二的锁给删掉了,又一次发生了误删。那么此时又来了个线程三,趁虚而入,获取锁成功执行自己的业务。你看这种并发的问题是不是又一次发生了?
所以同学们这一次产生的原因是什么?我们做了判断了,对不对?那为什么又出问题了?唉,就是因为判断锁标识和释放是两个动作,这两个动作之间产生了阻塞,最后出现了问题。那因此要想避免这个问题的发生,我们必须确保什么?必须确保判断锁标识的动作和释放锁的动作这两个得成一个原子性的操作,也就是说一起执行不能出现间隔。那我们怎么样保证两个动作的原子性呢?是锁的值不一样啊,不是锁的键不一样啊,锁名称一样,但是锁的线程标识不一样。
实际上就是线程1判断后发现锁的线程标识和当前线程一样,于是根据锁名释放锁,但是业务阻塞,锁超时释放,线程2拿到同样名称的锁,线程1根据锁名称把线程2的锁误删了。
lua脚本
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
这里重点介绍Redis提供的调用函数,语法如下:#执行redis命令
redis.call('命令名称','key','其它参数', ...)例如,我们要执行setnamejack,则脚本是这样:#执行 setname jack
redis.call('set','name','jack')例如,我们要先执行setnameRose,再执行getname,则脚本如下:#先执行 setname jack
redis.call('set','name','jack')
#再执行getname
local name = redis.call('get', 'name')
#返回
return name
理解成java的面向对象,前面那个redis就是提供了一个对象
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
127.0.0.1:6379>help @scriptingEVAL script numkeys key [key...] arg [arg...]summary:Execute a Lua script server sidesince: 2.6.0
例如,我们要执行redis.call('set','name',jack")这个脚本,语法如下:
#调用脚本
EVAL "return redis.call('set','name','jack')" 0
前面是脚本内容 0是脚本需要的key类型的参数个数
进入centos7,依次输入以下操作:
redis-cli
AUTH 123321 #你的密码
EVAL "return redis.call('set','name','Rose')" 0keys * # 可以通过这个看一下是否录入
如果脚本中的key、Value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:
释放锁的业务流程是这样的:
1.获取锁中的线程标示
2.判断是否与指定的标示(当前线程标示)一致
3.如果一致则释放锁(删除)
4.如果不一致则什么都不做
-- 获取锁中的线程标示getkey
local id =redis.call('get',KEYS[1])
--比较线程标示与锁中的标示是否一致
if(id ==ARGV[1]) then
--释放锁delkeyreturn redis.call('del', KEYS[1])
end
return 0
lua再次改进Redis的分布式锁
Lua本身不具备原子性的,是因为redis执行命名是单线程执行的,它会把Lua脚本作为一个命令执行,会阻塞期间接收到其他1线程命令,这就保证了Lua脚本的原子性
提示:RedisTemplate调用Lua脚本的APl如下:
在resource文件夹下设置unlock.lua文件,
--比较线程标式与锁中的标识是否一致
if(redis.call('get',KEYS[1])==ARGV[1])then--释放锁 del keyreturn redis.call('del',KEYS[1])
end
return 0
simpleredislock如下
package com.hmdp.utils;import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.yaml.snakeyaml.events.Event;import java.util.Collections;
import java.util.concurrent.TimeUnit;public class SimpleRedisLock implements ILock {private String name;//锁的名称,你可以给锁加一个统一前缀private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";private static final DefaultRedisScript <Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock(long timeoutSec) {//获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();//值不能随便设置,因为会有特殊情况,第一个线程执行到删除锁之前,key过期了,//其他线程就进来执行操作,这样就会把其他线程的锁给删了,这个值会用来比较是否是该线程的锁。具体可以看谷粒商城p158//获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);//由于拆箱不能直接returnreturn Boolean.TRUE.equals(success);}@Overridepublic void unlock() {//stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX+name),ID_PREFIX + Thread.currentThread().getId());}/*@Overridepublic void unlock() {//释放锁//这里static会使同一个jvm的所有线程的uuid一样,但是线程id不一样,而两个jvm的线程id有可能相同,但是uuid不一样//相同jvm有线程号id区分,不同jvm有uuid区分//获取线程标示String threadId=ID_PREFIX+Thread.currentThread().getId();//获取锁中的标示IString id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);//判断标示是否一致if(threadId.equals(id)) {stringRedisTemplate.delete(KEY_PREFIX + name);}}*/}
基于Redis的分布式锁实现思路:
利用setnxex获取锁,并设置过期时间,保存线程标示
释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
利用setnx满足互斥性
利用setex保证故障时锁依然能释放,避免死锁,提高安全性
利用Redis集群保证高可用和高并发特性
只是解决了误删锁,但是并没有完全解决一人多单的可能,你把两个的断点都打到查询用户是否下单的那个count的那里,可能两个线程得到的都是0,这样还是会一人多单(业务没走完锁呗释放了,就出现一人多单的问题了,这里并没有解决一人多单问题)
ReentrantLock不是分布式锁吧
Redisson
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-MemoryDataGrid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
- 8.分布式锁(Lock)和同步器(Synchronizer)
- 8.1.可重入锁(Reentrant Lock)
- 8.2.公平锁(Fair Lock)
- 8.3.联锁(MultiLock)
- 8.4.红锁 (RedLock)
- 8.5.读写锁(ReadWriteLock)
- 8.6.信号量(Semaphore)
- 8.7.可过期性信号量(PermitExpirableSemaphore)
- 8.8.闭锁(CountDownLatch)
官网地址:https://redisson.org
GitHub地址:https://github.com/redisson/redisson
在pom.xml中添加依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>
在config中新建RedissonConfig,按照自己的redis配置输入
package com.hmdp.config;import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redisson() {//配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.111.130:6379").setPassword("123456");//创建RedissonClient对象return Redisson.create(config);}
}
voucherOrderserviceimpl中的锁对象修改
Long userId = UserHolder.getUser().getId();//创建锁对象//SimpleRedisLock lock=new SimpleRedisLock("ordere:" +userId,stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:"+userId);//获取锁boolean isLock=lock.tryLock();//其实是3个参数,也可以选择无参if(!isLock){return Result.fail("不能重复下单");}
RedissonTest.java
package com.hmdp;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;@Slf4j
@SpringBootTest
class RedissonTest {@Resourceprivate RedissonClient redissonClient;private RLock lock;@BeforeEachvoid setUp() {lock = redissonClient.getLock("order");}@Testvoid method1() throws InterruptedException {// 尝试获取锁boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);if (!isLock) {log.error("获取锁失败 .... 1");return;}try {log.info("获取锁成功 .... 1");method2();log.info("开始执行业务 ... 1");} finally {log.warn("准备释放锁 .... 1");lock.unlock();}}void method2() {// 尝试获取锁boolean isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败 .... 2");return;}try {log.info("获取锁成功 .... 2");log.info("开始执行业务 ... 2");} finally {log.warn("准备释放锁 .... 2");lock.unlock();}}
}
Redisson可重入锁原理
自定义的分布式锁有一个缺陷,就是无法实现可重入,而Redis它就可以做到。那这是为什么呢?这节课我们就一起来研究一下。咱们自定义的分布式锁采用的是`SETNX`和`EXPIRE`命令,也就是简单的key-value,整个锁的流程是这样子的:
在一开始尝试获取锁,其实就是执行这个`SET`命令,当然要加上`NX`和`EX`参数。那`NX`的目的就是为了实现一个互斥,满足分布式锁的基本要求。同时呢,在获取锁的时候,我们要存入这个线程的标识,其目的就是将来在释放锁的时候做判断,避免误删,只有锁是自己的才去做这个删除。
那么这样的一个流程为什么不能重入呢?我们一起来看一下这样一个demo。
// 创建锁对象
RLock lock = redissonClient.getLock("lock");@Test
void method1() {boolean isLock = lock.tryLock();if(!isLock){log.error("获取锁失败,1");return;}try {log.info("获取锁成功,1");method2();} finally {log.info("释放锁,1");lock.unlock();}
}void method2(){boolean isLock = lock.tryLock();if(!isLock){log.error("获取锁失败,2");return;}try {log.info("获取锁成功,2");} finally {log.info("释放锁,2");lock.unlock();}
}
首先在这里我们会去创建一个锁的对象,接下来呢有一个测试方法`method1`,在`method1`里会首先尝试获取锁,获取锁了以后,如果失败,他就会报错,而如果成功,那么他就会去调用一个方法`method2`,而在`method2`里又一次尝试获取锁。那么`method1`和`method2`啊,他们两个是在一个线程里的,那一个线程连续两次去获取锁,这其实就是锁的重入了。
那我们来看一下,如果按照我们这个流程,它能不能实现重入。首先`method1`尝试获取锁,那按照我们这里就会去`SET`这个锁名称以及锁标识进去,那在这就是`lock`以及比如说这个线程名叫`t1`,我们把它存进去了。接下来呢往下执行调用这个`method2`,那么`method2`又一次尝试获取锁,那么又要执行这个`SET lock t1`的命令,那因为这里加了`NX`的一个参数,也就是说只有第一个能成功,那这里已经有值了,所以说`method2`在执行的时候一定是失败。所以说我们就没有办法实现重入了。
那怎么样解决这个问题呢?其实啊,要想实现可重入锁,我们可以参考JDK里面提供的`ReentrantLock`,它的原理啊。那如果大家不知道我们JDK的`ReentrantLock`它的原理的话,建议大家也去B站上搜索,我们黑马程序员的一个课程啊,叫做Java并发编程(JUC)。你在这搜Java并发编程排第一的,这个up主是黑马程序员的,那就是我们出品的一个课程了。在这个并发课程里边,其实就有讲到`ReentrantLock`它的原理。
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 如果已经获得了锁,线程还是当前线程,表示发生了锁重入else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}// sync 继承过来的方法,方便阅读,放在此处
protected final boolean tryRelease(int releases) {int c = getState() - releases;if ((Thread.currentThread() != getExclusiveOwnerThread()))throw new IllegalMonitorStateException();boolean free = false;// 只有锁重入,只有 state 减为 0,才释放成功if (c == 0) {
/**************************************************************/free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}
那他为什么能实现可重入呢?哎,在这简单给大家说一下啊,其实所谓的可重入无非就是在获取锁的时候,当我判断这个锁已经有人的情况下,那我会看一下是不是我自己,也就是说是不是同一个线程。如果是同一个线程的话,我也会让他获取锁,但是要多做一件事,就是有一个计数器,去记录你重入的次数,就是说你总共获取了几次。那么你在`tryAcquire`,就是获取锁的时候,我们这个次数会去累加,不断加一,不断加一。而将来你再去`release`,释放锁的时候,这个次数会减一。哎,这就是可重入的一个基本的一个原理啊。
因此再去实现时,也可以参考它。那也就是说我们其实不仅仅要在锁里边要去记录获取锁的线程,还要去记录这个线程它的重复的次数,总共拿了几次。每拿一次次数就加一,让同学们思考一下,现在呢我们既要记录线程标识,又要记录重复的次数,那这样一来,我们的这个`String`类型的结构显然就不行了吧。什么样的数据结构可以满足,在一个key里存储两个东西?其实可以使用它里面的哈希结构,因为哈希结构,它的value里面是不是可以又分成两个,一个field的一个value。那这个时候呢,我们就可以在key的位置记录锁的名称,然后在field的位置去记录线程标识,在value位置记录这个锁的重复次数
KEY | VALUE | |
field | value | |
lock | thread1 | 1 |
比方说在这里,我们创建锁对象,然后去`tryLock`,那么第一次来的时候,在这里我们会记录这个线程标识,并且把这个次数重置为1,因为他是第一个来的嘛。好,然后往下走,我们认为它会锁成功了吗?那接下来调用`method2`,而在`method2`的又一次尝试过去锁,那么他首先要做的肯定是看一看这个锁是不是有人了。那这个锁已经有人了,这个时候不代表失败,还要再判断一下获取锁这个线程是不是我自己呀。哎,那因为`method1`是调`method2`,所以他俩是在一个线程里的,所以这个标识肯定是一样的呀。那标识一样怎么办,ok,只需要把重复的次数怎么样,哎,加一就可以了,代表呢我是第二次获取锁了。以此类推啊,如果说再有重复,那就继续累加三、四,这样子好了。这个时候`method2`拿到锁了,已经重入了,是不是可以执行自己的业务了呀?好,执行完了自己的业务最终要干什么,要释放锁。那么释放锁能不能像以前一样啊,首先判断一下标识是不是自己哦,一判断是的,就是我,然后接下来直接删除,能不能这么做?哎,答案是不能啊。同学们思考一下,如果我在这啊,在`method2`里直接把这个锁删了,你要知道,`method2`结束并不代表整个业务结束吧,因为`method2`出来以后,还要去执行`method1`的剩余业务。虽然我们这没写,但是很有可能在真实场景下是有其他业务的。那也就是说在`method1`业务尚未执行完的时候,`method2`直接就把锁给删了,那么此时是不是就有其他线程可以进来了,是不是就有可能发生一些并发的安全问题了?
所以说啊,对于这种可重入锁来讲,在内部被调用的方法里边,释放锁的时候是不能直接删除锁的。那么他采取的措施是什么,是把重入次数减一。也就是说每释放一次我就减一就行了。那么问题来了,我们什么时候才能释放锁,真的把这个锁给他删除呢?哎,那肯定是当我们所有的业务都执行完,走到最外层的这个方法,它在结束释放时才能删除。但问题来了,我们怎么知道这个业务释放的时候,是不是已经到最外层了呢?让大家思考一下啊,我们每次获取锁这个重复次数就会加一,而每次释放锁这个值会减一。但是在方法当中,获取锁和释放锁都是成对出现的,也就是说你加了多少次,将来释放锁的时候就一定会减多少次。那同学们当方法走到最外层时,这个重复次数的值一定会被减成零,对不对?比方说现在我`method2`,他在释放锁时,这个值已经减到了一好,他业务结束了,走到了`method1`,`method1`就是最外层。那此时又一次释放锁,那么又减一次,它的值就变成了零。所以说呢,我们每一次在释放锁的时候,除了要去把重复次数减一以外,还应该判断一下这个值是不是已经为零了。如果已经是零了,那就证明我已经到了最外层的这个方法了,也就是说没有其他业务需要执行了,此时你就可以放心大胆地把这个锁删除了。如果说不是零,证明啊还有别的业务,那么你就不要管了,交给其他业务去处理就行了。这就是我们可重入锁的最终原理了,也就是说我们利用哈希结构代替了字符串结构,不仅仅存储线程标识,还有存储可重入的次数。那因此呢,获取锁和释放锁的操作啊,跟以前就会有很大的差别了。
那以前我们采用的是`String`类型的结构,采用的命令呢是`SET NX EX`,其中`SET NX`啊是来判断锁是否存在实现互斥,而`EX`是设置过期时间。但是现在呢我们用的是哈希结构,哈希结构里可没有这样的组合命令啊,没有`NX`这样的东西。所以说呀,我们只能是把之前那个逻辑给它拆开,先判断锁是否存在手动判断啊,没有`NX`怎么判断呀?其实就是用一个`EXISTS`判断一下这个key啊,是不是存在。那结果有两种啊,要么不存在,那么存在。如果不存在,那就好办了呀,证明我是第一个来的呀,就像这个`method1`对吧?那怎么办呀,肯定是来去获取锁标识啊,添加进去,并且这个值要改成一对不对?唉,这是第一次获取啊。好了,那获取完了以后没结束啊,因为在之前除了`NX`,还有`EX`,也就设置过期时间是个兜底方案。那在这儿呢我们同样要收入过几时间,那只不过改成了手动去设置,改成了`EXPIRE`的命令好。那只有这两步都走完了,才等于以前那个`NX`和`EX`,这个时候获取锁才算成功,你才可以去执行自己的业务啊。
那如果说我们判断的时候,这个锁它已经存在了呢,就相当于是现在掉了`method2`,他又来获取了。那如果已经存在,是不是我就直接失败了呢?如果是以前是这样子啊,但这不是因为我们是允许重入,所以说呢我们在这判断锁已经存在的情况下,我们还要看一下这个标识啊,是不是自己是不是同一个线程。所以说有一个判断线程标识的一个业务逻辑啊。如果说判断完了以后发现不是自己,那证明这个锁根本就不是自己拿的,那这个时候就真的是失败了。但是如果我们判断完了,发现这个标识就是自己啊,同一个线程,那证明是重入。那重入我们之前说了,只需要把次数怎么样,哎,加一是不是就可以了啊?那次数加一了以后就可以去执行`method2`了。当然了,在执行`method2`的时候,我们还要先重置一下有效期,因为啊在`method1`执行的过程中,可能已经消耗了一定的锁的时间了,所以在这最好重置一下,让`method2`有充足的时间去执行自己的业务。那到这儿啊,其实就完成了一次锁的重入了。
好,那等`method2`执行完自己的业务,是不是就可以去释放锁了呀?那释放锁的时候,以前我们是先判断锁是不是自己的,如果是我们再去删啊,避免误删了。这里呢其实也是类似,首先呢肯定也得先判断锁是不是自己的。如果说这个锁根本就不是你的,那证明什么,证明你这个锁啊已经释放了,可能是超时了,对不对?所以说呢这种情况下你就不要管了。但是如果这个锁是自己的,那么我们就要去释放了。当然这个释放不是上来就删,我们讲了,我们要先把锁的重复次数减一啊,然后呢才能去判断一下什么呢,这个值是不是已经到零了啊。如果说他不是零,那证明啊这个时候我们根本就不是最外层,还有别的业务,所以说我们不能删,而是干什么呢,哎,重置有效期啊。然后去执行其他业务去。需要注意的是呢,这里同样需要去重置锁有效期啊,给后续业务执行啊留够充足的执行时间啊。比方说这里`method1`,当`method1`业务执行完毕,执行到释放锁的时候,同样还会走这个逻辑。首先呢重复次数减一啊,然后呢判断锁标识看它是否为零,结果一判断呀,哎,现在真的是零,那就说明已经到了最外层了。那这个时候就可以放心大胆的去释放锁了。哎,这个呢就是我们整个释放锁的一个流程了。
那大家可以看到啊,无论是获取锁还是释放锁,跟以前相比啊,是不是就复杂很多了?而且呢这个地方啊代码有多个步骤,所以说呀,像这样的逻辑,咱不可能啊再用Java代码去实现了。那这套东西一定要采用什么哎,Lua脚本来确保获取锁和释放锁的原子性。那这个Lua脚本又该怎么写呢?
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断是否存在
if(redis.call('exists', key) == 0) then-- 不存在,获取锁redis.call('hset', key, threadId, '1');-- 设置有效期redis.call('expire', key, releaseTime);return 1; -- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then-- 不存在,获取锁,重入次数+1redis.call('hincrby', key, threadId, '1');-- 设置有效期redis.call('expire', key, releaseTime);return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
在这呢我就带着大家简单来看一看啊,我们就不一个一个去写了啊。其实流程跟这些差不多嘛。首先呢在这里有三个参数啊,这里的key啊就是我们锁的这个名称就是个`lock`,然后这里的`t1`啊就是线程的标识,`releaseTime`是锁的释放时间。与之前相比啊,要多了这样一个参数,为什么需要释放时间呢?因为在获取锁的过程当中,我们要去设置有效期,所以说呢这个也作为参数之一了。然后呢脚本就开始了啊,那脚本开始以后,首先要判断的就是锁是否存在,我们讲的就是`lock`是否存在,不是用`NX`啊,是手动判断的,所以在这里用了`EXISTS`命令,那这个key呢比如说是`lock`,这里就是`EXISTS lock`,看是否存在。那返回零或者一,如果是零,就证明是不存在好。那不存在,说明你是第一个,这个时候就可以放心大胆来获取锁,就是存一个值进去存储,因为是哈希结构,所以用的是`HSET`好,指定key,在指定线程标识,在指定这个一,也就是第一次来的时候做这个初始化的动作。这个工作做完了以后呢,不要忘了还要给锁设置有效期,也就是执行这个`EXPIRE`命令,然后呢设置这个释放时间,也就是`releaseTime`。到这儿呢啊,获取锁其实就算是成功了。那这一部分啊其实对应的就是我们会为锁离啊,这一部分的一个流程啊,也就是说是锁不存在的情况。那如果说锁已经存在了呢,那锁如果存在,我们就要去判断锁标识啊是不是自己的。那怎么判断呢?唉,这里我们传了一个线程的标识threadId,你就判断一下这个标识啊,在我们这个哈希里边存不存在就行了。所以这里用了`HEXISTS`命令,传这个key,传这个线程标识。如果说它存在,那么结果就是一,这个时候就证明这个锁是自己的好。那锁如果是自己的干什么呀?我们是不是锁的这个重复的次数要去加一啊,对不对?重复次数加一,所以说在这执行了个`HINCRBY`,就是自增的意思啊。那也就是说对应的这个key对应这个标识,然后加一啊。也就是执行这个加完了以后重置有效期,所以大家看又一次`EXPIRE`啊,所以呢这一段逻辑其实也就是我们刚换的啊,这一条线就这条线这个分支。那也是一种获取锁成功的情况啊。那最后一种情况呢,也就是说锁标识不是自己的,跟你没有任何关系,也就锁就失败了。那也就是做到这`return 0`,所以在这整个流程里边啊,返回一的就是成功了,零就是失败的好。这是获取锁的一个流程了。
那我们再来看一下释放锁。释放锁呢也就是我们的下半部分啊,首先同样是这三个参数啊,key数字名称,线程标识锁的释放时间。一上来以后啊,同样我们要先去做这个判断,判断什么呢?判断这个锁的标识是不是自己啊。那同样呢就是拿着线程id留`EXISTS`去判断啊,放完了后,如果是一就是自己,如果是零,那就不是自己。不是自己就走在这条路,逻辑吧,代表锁怎么了,是不是已经被释放掉了?唉,没错啊。好然后往下走,这个时候呢那就证明锁是自己的啊,因为上面不成立,往下走是不是锁是自己。如果锁是自己怎么办,能做出简易除数减一怎么减,还是`HINCRBY`,只不过这里改成-1啊,`HINCRBY`是自增吗,增减增-1不就是减一吗?哎,重复次数减一完了去判断标识啊,是不是零?那这个时候就是看它是否大于零啊。但如果大于零,证明什么,证明现在还没有到最外层,是不是应该去重置有效期啊?然后去执行业务呀。所以在这`EXPIRE`重置有效期。那这一部分逻辑,同学们其实就对应了这里开始啊,这一条分支对吧。那如果说我们最终的判断它是零呢?是零是不是就可以去释放锁做这个`DEL`的操作了?所以啊这一段逻辑对应的,其实就是从这开始的这部分。ok,那整个脚本我们就看完了。
那么我们的Redisson啊是不是这么实现的呢?下面呢我们就一起跟踪一下源码来看一下。好,我们打开IDEA,在这呢我提前准备好了一个单元测试啊,这里是Redisson提供的一个锁,然后呢在`method1`方法当中啊,我会尝试获取锁,然后会去失败了,就结束,会成功了,我就叫`method2`,在`method2`里再次获取锁,然后呢去执行业务,最后释放锁。这就是一个可重入的一个demo了对吧。下面呢我们在这打上一个断点啊,去运行一下,看一下啊。在这呢我们走debug,好进入的点,然后这里呢我们去放行一下,走好回锁成功是个`true`对吧。我们打开这个桌面客户端刷新一下,可以看到呢这里有个`order`的,因为我们锁名称起的是`order`,可以看到这里边有一个key,这个就是那个锁的标识啊,后面呢是一个value值为一,就是重复次数为一嘛。这样呢我们往下走,然后走走进入`method2`,进入`method2`,又一次会锁,你看又是`true`,对不对?那这个时候我们再来看一下Redis,好,来在这刷新一下啊。好大家看看到值是不是变成二了,也就是说重入次数是不是加了一哎,没问题啊。好往下走,然后拿走走到这里往下释放锁,那么释放锁以后重置次数应该会减一,对不对?我们再来看一下,在这里啊,再去重置一下,看是不是减一了?唉,没有真的删除啊,是减一,然后出来出来以后又回到了谁,回到了`method1`,在`method1`里,他往下走也会去释放锁,那么这次释放锁再看好,回来这里再点一次是不是就没有了?说明这个锁被删除了。所以这就是我们刚刚分析的那个逻辑啊,通过重复次数来去记录这个可重入锁好,整个流程呢就分析完毕了啊。
来看一下整个这里也是按照我们刚才分析的,先是锁`method1`,然后呢锁`method2`执行业务二,世贸锁`method2`执行业务一,世贸锁`method1`,就是按照一个重复的一个流程吧,没有问题。那么我们也可以去跟踪一下我们Redisson的源码,来看一下哈。`tryLock`,那么在`tryLock`里,我们往下跟找到它的实现类啊,这里我们采用的是一个最普通的`RLock`对吧。好在这呢掉了一个`getLock`,里边有个`tryLockInner`,我们根据这个在这个方法里面继续融入进去,这里呢`tryAcquireOnce`走,那在这里呢他会先判断这个`leaseTime`是否等于-1,这`leaseTime`呀,就是我们之前讲过的锁自动释放的一个时间。这里其实我们是没有传的,那在没传的情况下,大家可以看到他给的是一个-1啊。所以说呢在进入这个方法以后啊,他去判断的时候,这个地方肯定不成立,那于是呢就会往下走,走到这行,那这一行呢他会去`tryLock`,也就是尝试获取锁,所其实啊跟这块两个逻辑是一样的,只不过呢如果说你给了释放时间,他就会用你给的那个值啊。如果你没给,他会走一个默认值。所以最终呢都是会进到这个方法啊,那我们跟进去好,还是这个`releaseLock`。进来了,进来以后呢,大家就会看到这里有一段逻辑,这段逻辑啊就是一个Lua脚本,我们可以看到Redisson呢是把这个脚本啊通过字符串也是直接写死的,不需要我们之前是通过一个文件啊,啊这种方式其实也可以。那这段脚本呢大家如果仔细去看啊,跟我们刚刚PPT上的脚本就是一样的。首先判断锁是否存在,如果等于零,代表不存在,不存在怎么办呀?哎,当然是去获取锁了。他在这个地方是直接`HINCRBY`了,也就是自增一自增一啊,如果你不存在,它会自动帮你创建啊,所以跟那个`HSET`的效果是一样的。紧接着设置过期时间,然后呢结束好,这是不存在,直接回溯。那如果存在呢,存在也不要紧,我判断一下这个锁是不是自己等于一,代表是自己是自己怎么办,同样是加一,然后试着过期时间,然后结束。所以这两个就是获取锁成功的情况。那其他情况就是失败了。
好,那么这个流程就看完了。那然后呢我们再来看一下释放锁啊,我再回来。那释放锁上走的是这个`unlock`,我们跟进去。那么`unlock`能组的`RLock`啊,首先在这里调用这个`unlockSync`,继续跟入。在这里我们继续跟这个`unlockEnterSync`,好释放,同样是找到这个`RLock`里面。大家可以看到同样有一段Lua脚本,
那么这段Lua脚本啊,就是我们刚刚分析的一段啊。首先一样先判断锁是不是自己的啊,是否存在好。如果说呢它等于零,就是不存在,那就直接结束了啊。因为这种就是说锁早就已经释放了,你就不用管了。然后呢哎让他锁抽的次数减一,你看`HINCRBY`,然后-1,这就是减一的意思。解完了以后啊,判断一下锁重入次数是不是零。如果大于零啊,证明好我们现在还不在最下层,怎么办,重置有效期`EXPIRE`,那这一部分逻辑,同学们其实就对应了这里开始啊,这一条分支对吧。那如果说我们最终的判断它是零呢,是零是不是就可以去释放锁做这个`DEL`删除他。这里还执行了一个`PUBLISH`,这个是干什么的?后面我们会讲他其实是发了一条消息啊,也就是去通知大家这个锁我释放了。那有什么用呢?后边我们再去给大家讲好了。
我们回到PPT,到这儿呢啊Redisson可重入锁的这个原理,我们就分析完毕了。获取锁和释放锁的整个流程啊,在这里通过这幅图啊就给大家讲述完毕了。其核心就是利用一个哈希结构啊,然后去记录获取锁的线程以及重入的次数,与我们啊`ReentrantLock` JDK里面提供哪个,它的原理是一致的。好了,那我们这节课的内容啊,就到这里。下节课呢我们继续去分析啊Redisson它提供的其他的一些功能。
redisson如何解决三大问题?
01
不可重入
同一个线程无法多次获取同把锁
02
不可重试
获取锁只尝试一次就返回false,没有重试机制
03
超时释放
锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
04
主从一致性
如果Redis提供了主从集群,主从同步存在延迟,当主岩机时,如果从并同步主中的锁数据,则会出现锁实现
实战篇-20.分布式锁-Redisson的锁重试和WatchDog机制_哔哩哔哩_bilibili详细阐述的RedissonLock.java的完整流程的全部代码,哎呀感慨前人的智慧,听说代码随想录星球里面还有什么手搓spring,RPC哎呀我操,这就是cpp选手的高深,重视底层代码
实战篇-21.分布式锁-Redisson的multiLock原理_哔哩哔哩_bilibili
好同学们,经过前面几节课的学习啊,我们已经分析了Redisson分布式锁如何实现啊锁的可重入啊,还有锁获取时的自动续期,以及啊锁释放时间的一个自动续约。解决了前面的这三个问题哦,但是现在啊还有最后的一个问题,就是主从一致性的问题还没有得到解决。所以呢这节课我们就一起来分析一下啊,Redisson是怎么解决主从一致性问题的。
首先呢我们先来分析一下主从一致性问题产生的原因。在之前所有的案例当中啊,我们采用的都是单节点的Redis。大家不妨思考一下,如果这一台Redis发生了故障啊,那么会引发什么样的问题?那可以想象的是,我们所有依赖于Redis的业务是不是都会出现问题,包括我们的分布式锁。那在一些核心业务当中,那这样的情况肯定是不允许发生的。所以说啊为了解决这个问题啊,提高Redis的可用性,往往在实际的应用当中,我们会觉得搭建Redis的主从模式。
那什么叫做主从呢?其实呢就是由多台Redis,只不过呢他们的角色不同,有一台呢我们可以把它作为主节点,而剩下的呢作为从节点。而主从他们的职责也不一样,往往会做读写的分离。也就是说呀,在主节点里用来处理所有Redis的写的操作,比如说增删改,而从节点只负责处理读的一些操作。那这样就有问题了呀,既然主节点处理写操作,那所有的数据都是在主节点里存在的,Slave节点,也就是从节点没有数据,怎么来处理读的请求呢?所以呢主和从之间啊需要做数据的同步,主节点啊会不断的把自己的数据同步给从节点,确保主从之间数据是一致的。但是呢毕竟啊不是在同一台机器,它主从之间会有一定的延迟。所以呢这个数据的同步啊也会存在一定的延时,尽管这个延时很短,但是它也存在啊。我们所说的主从一致性问题啊,正是因为这样的延时而导致的。
比方说啊现在我们有一个Java应用啊,他现在要来获取锁,要执行一个`SET lock t1 1 NX EX 10`这样一个命令,这不就是一个写操作吗?那这个操作执行到主节点的时候,主节点上就会去保存这样的一个锁的标识,`t1`。而后啊,主节点就会向我们的从节点进行同步。但是就在此时主节点发生了故障,也就是说同步尚未完成,这个时候我们Redis里会有哨兵啊,去监控集群状态。当他发现主节点当机的以后,那首先客户端连接会断开,而后啊,它会从Slave当中那两个从节点点去选出一个,作为新的主节点。但是因为之前主从同步未完成啊,也就是说锁已经丢失了,对不对?所以此时我们的Java应用,再来访问这个新的主节点时就会发现,锁就没有了。也就是说锁失效了,此时啊再有其他线程来获取锁也能获取成功,是不是就会出现并发的安全问题了啊?这就是主从一致性导致的锁失效问题。
那么Redisson又是怎么解决这个问题的呢?唉,他的思路啊非常简单粗暴,既然主从关系是导致一致性问题发生的原因,那干脆我就不要主从了呗。我的所有的节点都变成了独立的Redis节点,相互之间没有任何关系,没有主从啊,都可以去做读写。那么此时我们获取锁的方式就变了,以前我们获取锁只需要找到Master节点,然后在它里面获取锁就行了。但是现在我们必须依次的向多个Redis节点,都去获取锁啊,不管你是三个Redis节点还是五个Redis节点,必须依次都获取锁,都保存了这个锁的标识,才算是你获取锁成功。
那此时会不会出现安全问题呢?首先啊因为我们没有主从,所以是不是不会有主从一致性问题。其次可用性,现在真的有一个节点宕机了,我们的Redis可不可以用呢?哎,还是可以用的呀,因为你宕机了,我们还有俩节点的吗。那只要这两个节点在存活着,我的锁是不是依然有效啊。而这个可用性是不会随着节点的增多,而越来越高啊。所以如果你觉得三个不够,你是不是可以加五个节点。所以我们不仅仅解决了主从一致性的问题,是不是也保证了它的可用性啊。
当然如果你觉得这样还不够,你还想让它的可用性更强一点,说如果现在挂一个,我就少一台啊,那怎么办?没关系,我们也可以给这里的节点去建立主从关系,让他们去做主从同步。那加了主从同步以后会不会出现安全问题呢?其实不会,让大家思考一下,现在假设说真的有一个节点发生了宕机,就他那他宕机的时候刚好没有完成同步啊,那也就是说现在这个Slave上是不是没有这个锁的标识。现在呢利用我们之前所说的原理啊,那么这个Slave点就会变成新的主节点,对不对?那它下面没有锁标识,此时有一个线程趁虚而入,想要来获取锁,能不能获取成功?答案是不能,因为我们说了,只有在每一个节点都拿到锁才算成功。现在尽管有一个节点是空的,你能在他这获取锁,但是其他两个节点能不能拿到,还不能吧。所以说呢,你最终还是失败的。那也就是说只要没有任意一个节点在存活着,那么其他线程就不可能拿到锁,就不会出现锁失效的问题。
那这样一套方案呢,它保留了这种主从同步机制,确保了整个Redis集群的这种高可用的特性,同时也避免了主从一致引发的锁失效问题。那这套方案呀,在Redis里有一个名字叫`Multi Lock`啊,也就是连锁,把多个独立的锁联合在一起,变成一个联合起来的锁。而我们在使用这样的锁的时候啊,也是比较灵活的啊,你可以就弄几个独立节点啊,不建主从关系也是可以的,你也可以建立主从关系啊,让它的可能性变得更强。
那接下来呢我们在测试的时候就不去做主从了,因为这样太复杂了,需要的节点太多了啊,就需要六个了对吧。在这儿呢我们就去整三个独立节点来去测试这个连锁就可以了。那我提前呢已经准备好了,我利用虚拟机已经搭建好了三个Redis节点啊,可以看到呢这里我提前已经建立好了连接了啊,第一个是我们最早搭建的那个6379的啊,然后呢再往下,这里有一个6380和16381啊,这两个是新的啊,提前准备好的。那将来呢我们就利用这三个节点啊,三个独立的节点形成一个连锁好。
那下边呢我们就去编写这个代码,我们打开IDEA,首先呢我们需要去配置这个Redis客户端啊,来找到这个`RedisConfig`,因为我们有三个独立的Redis客户端的节点啊,那现在呢其实只配了一个,对不对,端口是6379。所以说呢我们需要把这个复制一下,还是C啊,然后粘贴两个啊,然后名字也不一样啊,这三个bean它们的类型是一样,但名字必须不一样,分别叫二和三吧。然后呢端口是6379,这块呢是6380,还有一个呢是6381。然后呢这块密码就不需要了,因为我们这块新安装的这两个Redis节点,我没有配密码啊。所以这样我们就把三个独立的Redis客户端给配好了,那么将来呢我们就利用这三个,分别获取三个独立的锁,把这三个独立锁合在一起变成连锁。
RedissonConfig/*********************************************************/
@Bean
public RedissonClient redissonClient2(){Config config =new Config();config.useSingleServer().setAddress("redis://198.168.111.130:6380")return Redisson.create(config);
}@Bean
public RedissonClient redissonClient3(){Config config =new Config();config.useSingleServer().setAddress("redis://198.168.111.130:6381")return Redisson.create(config);
}
那怎么做呢,来我们的单元测试好,在这个地方我们去做这个连锁啊。那怎么去创建连锁呢,首先大家可以看到,在这我们只注入了一个Redis客户端,现在不行,需要有三个啊,分别是这个二和三啊。因为你在这边配的名字是叫二和三,对不对。好,有三个client,那将来就可以获取三个独立的锁。在这呢我们给它的名字也都改了,分别叫lock1,然后lock 2和lock 3。
RedissonTest@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;/********************************/
void setUp(){RLock lock1 = redissonClient.getLock("order");RLock lock2 = redissonClient2.getLock("order");RLock lock3 = redissonClient3.getLock("order");//创建联锁multilocklock = redissonClient.getMultiLoc(lock1,lock2,lock3);
}
那对应到这边啊,分别就用二和三啊来去获取。这样呢,这也就是三个独立节点对应的三个独立锁,对不对。好,那接下来才是真正去创建连锁好,那么这个连锁叫`MultiLock`,怎么创建的呢,有多种方式啊。那给大家来个最简单的`lock`,等于啊`RedissonClient`点啊,`getMultiLock`,可以看到它的参数列表里边是个可变参数,接受了多个lock,对不对。那我只需要把这三个lock,依次啊给它放进去就可以了。那这样一来呢,它就自动的帮我们把这三个连在一起,变成连锁了,非常的简单吧。
那有同学会说了,你在这里为什么用了`RedissonClient`来干,而不是用这两个去get呢?哎大家来看啊,其实你用哪个都一样跟进去啊,`ctrl`加`b`进入Redisson的getmultilock函数,进来以后你会发现这里是new的,看到没有。那我去调的时候,不管用谁调,最终是不是都是牛啊,甚至于我将来我自己扭行不行,没问题。所以说呢其实用哪个client来调,没有区别好。那传进来的是一个可变参数,我们知道可变参数是数组,数组传进来以后干了什么事,可以发现啊,这个可变参数啊被它转变成了一个集合,然后一起添加到成员变量`locks`里去了。那么这个`locks`就是个集合呗,也就是说我们的多个独立的锁,是不是都知道这个集合里了。那按照连锁的原理,将来啊他在获取锁时,是不是应该依次啊把这个集合里边的每一个锁,都尝试去获取一遍,都成功了才算成功呀。是这样子的啊。
好啊,那么这块看完了,我们回去啊。那拿到锁了以后,它的使用方式上跟以前没有区别,这代码都不用动啊,直接就可以,这样就行了。那下面呢我们就先测试一下,看看是不是真的能获取锁啊,然后我们再去分析它的原理,看看跟我们想的是否一样。那我们第八个运行走,好那现在进入断点了啊,那我们先放行一下走,那大家可以看到这个地方是true,那证明获取锁是不是成功了。那我们呢打开这个Redis端看一下啊,首先在这刷新,那我们可以看到这多了一个`multiLock`嘛,啊前面标识后面是不是那个锁的重置次数。大家看这个啊,同样这里会多出来一个,你看是不是在这三个节点上都获取到锁了,哎没问题哦。好然后我们放心啊,放行,其实可以进到`method2`,然后进到这边去获取锁,再来一次好,我们再走又成功,此时我们再来看一下好,这边是不是变成二了,说明是第二次重入嘛,对不对,这里也是二啊,可以看出来连锁中的每一个独立的锁啊,都是一个可重入锁啊,与之前的实现啊没什么差别啊,只不过把三个联合起来分别去获取而已啊。再回来,那我们去释放锁,那释放锁啊,第一次释放锁重入次数是减一,对不对,回到Redis里看一下,好变成一变成一,你看是不是都剪掉了,然后再回答IDEA,再释放一次,到这边直接放行吧,好全部释放完成,那么这个时候我们回到Redis里发现没有了,对不对,说明这个锁是不是被删掉了啊,真的释放掉了。
看到这儿大家就发现了啊,所谓的连锁就是多个独立的锁,而每一个独立的锁,就跟我们之前讲的原理是不是一模一样的,哎没错,正是如此。那我们接下来呢回答IDEA,通过源码的跟踪啊,我们再来看一下它的一个实现方式,是不是我们所想的那样。我们跟入这个`chooseLock`,这次就不要跟这个了,跟下边这个`multiLock`了。跟进来以后呢,这里有一个`tryLock`,你可以看到第一个是等待时间,第二个时间单位,而这个释放时间咱没传,没传,就是-1呗。跟进去根据以后呢,哎进到这里了,这就跟以前有一些差别了,同学们一上来这一段是在干什么呢?这段其实是在判断你有没有传释放时间,这个释放时间啊,如果不等于-1,代表你除了对不对。好,那我们其实有没有传,我们没传没传,是不是就往下走了,哎没错啊。那如果你传了呢,如果你传了的话,那么他进来以后会判断一下`waitTime`是否是-1,如果`waitTime`是否一说明什么,说明你只想获取一次,也就是说不重视好。那你的释放时间是多久,我就用多久。但是如果`waitTime`不等于-1,说明什么,说明你想要去做重视啊,所获取失败我要重视。那要重视,这个时候他就不会用你原来的释放时间了,他会用你`waitTime`乘以二。为什么呢?因为重视可能耗时较久,万一你的生长时间小于等待时间的话,那我还没重视完了,你就是忘了,这不有问题了吗?所以说呢,他在这里会放弃你这个类似`time`,而是用`waitTime`代替啊。好,这是对于一个释放时间的处理这块,我们不管啊,因为我们传的是-1乘-1,这就不成立,对不对,那就往下走。
那么往下走的话呢,大家会发现在这里啊,他会先获取当前时间,接下来呢这里有个`remainTime`,`remainTime`就是剩余时间初始化是-1,但是呢如果`waitTime`不等于-1,它其实就会用`waitTime`代替这个`remainTime`,也就是说这个`remainTime`就是剩余的等待时间。紧接着又来了一个叫`calc`,`calc`计算锁的等待时间,这又是怎么回事呢?你跟着这个方法,你会发现啊,在`math`里它原封不动返回了,也就是说其实这个锁等待时间和剩余等待时间,他俩是不是一样的呀?啊没错啊。那接下来再往下有一个叫`fieldLocksLimit`,失败的锁的一个限制,那这个值是多少呢?跟进去一看就知道了,好是零,也就是说锁失败的限制是零。这东西有什么用,等一会儿你就知道了。往下接着看,这个地方有一个集合名字叫`acquiredLocks`,`acquire`是获取的啊,过去是什么,代表已经获取的,所以这个集合里保存的呀是获取成功的。那他刚创建出来肯定是零,对不对,那么接下来开始for循环遍历遍历遍历`locks`,点`iterator`,这不叠在一起吗,`locks`是我们三个独立的锁,对不对,那这里其实是在便利我们的三个独立的锁,便利完拿到每一个锁,那接下来该干嘛了,同学们按照我们说的,是不是应该依次去获取每一把锁呀?所以进来以后他就开始去获取了啊,那或许呢因为有两种啊,第一种呢就是空才没有传播的`time`,没传位的`time`呢,那它其实就尝试一次,所以说呢他在获取锁时候是空仓的,`tryLock`,这个咱们以前是不是见过空三,就是指一次啊不从事,但是如果你传了`waitTime`呢,那就是走下边就是带有重视的`tryLock`,传等待时间,还有释放时间的啊,这两个源码咱们之前是不是都跟踪过的,所以这里就不再赘述了啊。那不管怎么样,最终你踹了以后会得到一个结果,那么这个结果啊他这里是布尔值啊,也就是true或false代表会有所成功,或者获取锁怎么了,失败好。那拿到这个结果了以后呢,往下走,他会做一个判断判断获取所有没有成功,那这个地方如果是true,那不就成功了吗?成功了就把当前锁放到这个`acquiredLocks`里,就是什么,就是啊已经成功的锁的集合里边,没错吧,已经成功的锁在集合里啊。然后那`else`是不是会有失败,`else`咱们先不管了,咱们先管成功,那我锁成功了,我就添加进去,添加进去,完了往下走,这块是判断这个`remainTime`,剩余等待时间是不是-1,那如果不等于-1,说明啊现在剩余时间是不是很充足,那么我就会去做一个计算啊,我用当前时间减开始的那个时间,这以前咱们是不是见过呀,它是计算获取锁的一个耗时,那我用剩余的时间减去耗时,得到的是不是就是现在剩的时间呀,那现在剩的时间是不是小于零啊,如果小于零,证明啊刚才获取锁已经把等待时间给耗尽了,那我们知道啊,等待时间耗尽了,代表是不是锁超时了,那因此只能`return false`代表失败,但是在`return false`之前,他干了一件事啊,他会先去把`acquiredLocks`啊,也就是已经获取到的这些锁给他干嘛呢?`unlock`释放掉,因为你已经失败了呀,你前面拿到的锁就不能不能再拿着了,因为你拿着别人是不是拿不了了,所以给他干脆释放掉就算了,那如果时间还很充足呢,哎那没关系,我是不是可以继续去拿下一把锁呀,那到这刚好for循环结束了,来往上哎,是不是刚好进行下一次for循环了,好,那下一次for循环来又干嘛,再拿到锁,再尝试获取锁好,再判断锁有没有成功,如果成功添加了这个已获取的锁里,然后再判断一下剩余时间,剩余时间还充足吗,哦充足充足,我再进行下一次for循环好,再来获取下一把锁,这样直到我把所有的锁都拿到为止,那么这样一来,我们的这个集合里是不是就拿到所有的锁了,然后等for循环结束,`return true`,是不是就结束了?当然这是锁成功啊。
那么有没有可能说我第一次来拿锁就失败了呢?上来第一个啊,便利的第一个`lock`的数直接失败了,这个值是个`false`,有这种可能性吧。好,那如果他是`false`,走的是什么,走的是`else`,他进了`else`以后,他会做什么呢?首先做个判断,判断一下`lock.size`,这个是锁的总的数量,对不对,比如说我们这里是三再去减去啊,已经获取的锁的数量,那因为你前面获取都失败了呀,所以这个是不是零啊,那3-0的是不是就是三,它会判断它等不等于这个东西,这个东西咱们以前看过这是什么呀,是不是锁失败的一个上限,失败的上限是几呢?我们之前看到过是零,对不对,那也就是说这个地方啊,因为我们一个都没拿到,这里是零,前面是三三减0=3等不等于零,不成立啊。什么时候成立,只有已经获取的锁的数量等于锁的总数量,这个减完是不是才是零,才能`break`跳出循环。那也换言之,只有你把所有的锁都拿到了,是不是才能结束,否则能不能结束啊,就不能,那不能就干嘛往下走,往下走了以后判断他是不是零啊,是零是零的话啊,假设说你已经拿到锁了,这里是不是把已经拿到锁给释放掉啊,然后判断一下`waitTime`是否是-1,`waitTime`,如果是-1,证明什么,证明你是不想做重试的,不想重试那一次失败了,直接失败结束了好。那如果你想重试呢,想重试好办呀,我先把你现在已经拿到锁的清空啊,不管你已经拿到几把了啊,你现在假如说你一把都没拿到,反正我也给你清空,然后把迭代器往前迭代,其实就是把这个指针是不是指的第一个,为什么你要重试啊,重试是不是要从头再来,从第一把锁开始,所以这里把指针重置,然后好结束啊,又开始for循环重来,这样呢你是不是可以重头再试了,要么呢我们把锁的锁都拿到结束,要么就是失败或重试,知道什么呢,就是这个`waitTime`小于零,超时为止,这就是啊整个我们获取锁的一个逻辑了,跟我们之前分析的是不是差不多好。
那等所有的锁都拿到了,最终肯定要`return true`嘛,对不对,但是在`return true`之前啊,他还有一段业务逻辑,我们可以看到在这儿他会判断啊,这个`leaseTime`是否等于-1,那这个又是干什么呢?这个`leaseTime`啊是锁的释放时间啊,我们之前是没传的,是-1,所以`if`不成立,是不是直接往下走了,但是假设说你传了`time`呢,那就意味着你需要自己指定锁的释放时间,在这种情况下,他会干什么事呢?我们进来,你会发现他会去便利已经拿到的每一把锁,然后给他执行`EXPIRE`命令,我们知道`EXPIRE`是不是设置锁的有效期的意思,那也就是说它会给每一个锁都重新设置一下有效期。为什么要做这件事呢?同学们,我们获取锁的时候,会有多个Redis所需要依次获取第一把获取的锁,在获取之后立即就开始倒计时了,而最后一把锁是刚开始倒计时,也就是说在这个集合内的多把锁啊,其中第一把他的剩余有效期,一定会比最后一把的剩余有效期要短一些,对不对,这样呢就有可能会出现一个问题,就是有些释放了,有些没释放的情况。那么为了避免这个问题的发生,他在这干了一件什么事情呢,等所有锁都拿完了,我再重新给每一把锁都配一下有效期,确保大家的有效期是一样的。那为什么只有在不等于-1的时候,才需要做这些事呢?同学们,当`leaseTime`等于-1的时候会触发什么呀?我们之前讲过是不是会触发看门狗机制啊,那有了看门狗机制,所有锁的有效期都会自动去续,因此需不需要你这里就处理就不需要了。所以说啊我们建议大家一般这个释放时间啊,就不要设置好,让他走看门狗,它们最终呢`return true`结束。
那么我们获取锁的逻辑啊,也就分析完毕了。那我们回到PPT啊,最后呢我们来做一个总结啊,在分布式锁这个章节,我们其实啊总共讲了三大类锁啊。那第一类呢就是不可重入的Redis分布锁,这种其实就是我们最早自定义的那种嘛,那它的原理是利用了`SETNX`的互斥性,然后利用`EX`设置过期时间,避免死锁。在释放锁的时候啊,去判断线程标识啊,避免误删。那它的缺陷啊就是不可重入啊,而且呢无法实现锁的自动续期机制啊,并且呢锁超时啊,可能会有这样的一个自动释放的一个风险啊。但是它的实现方式非常简单啊,在大多数场景下还是比较实用的啊。只不过呢对于有可重复性需求,重置需求,或者是安全性要求较高的业务场景,就不太适合了,此时我们可以选择可重入的Redisson锁。
它的原理是利用了哈希结构记录线程标识和重复次数啊,这样就可以实现可重入。再利用watch dog延续这个锁的一个剩余时间,不断的去重新续约,确保这个锁不会因为自动超时而释放,除非是服务宕机。最后呢,再利用这个信号量去控制这个锁的一个重试机制啊,利用发布订阅这样一个方案。那这种方案的话呢,对于CPU的利用率也比较高,不会无效的这种等待啊,所以呢它的性能啊各方面都还是不错的。但它的缺陷啊就是Redis宕机引起锁失效的问题啊,就是主从的一致性问题。那要解决这个问题呢,最后我们又给大家讲了Redis里的`MultiLock`,也就是连锁。那连锁机制呢它是利用多个独立的Redis节点啊,那这样一来节点之间没有主从关系就不会啊,因为主从一致导致锁失效了。但是在获取锁的时候,就不是说只获取一次了,而是要在所有节点都获取成功才算成功。你可以把`MultiLock`看成多个啊,可重入锁的一个集合。那么它的缺点呢就比较明显了,因为成本比较高。当然你可以建立三个独立节点,那最少得三个啊,啊当然大于三个,比如说五个以上会更好。而且呢你也可以给节点再去建立主从关系啊,这样可以提高可用性,只不过就是成本会更高一点,实现起来会更加复杂一点。但是呢是所有里边最安全的一种方案了。
好了到这呢分布式锁的所有知识啊,我们就讲完了。之前呢我们也用Redis的分布式锁,代替自定义分布式锁,实现了一人一单,测试呢也没有什么问题。那到这为止啊,整个秒杀相关的业务我们就做完了。但是啊正是因为我们加入了各种各样的锁,秒杀业务的性能呀也会受到巨大的影响。所以说啊,我们后续还需要对秒杀的业务,去做进一步的优化,从而提升它的性能。那具体该怎么去做呢?
实现异步秒杀
视频里的代码如下,没有换源,实际上还是存在多线程问题的
package com.hmdp.service.impl;import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IVoucherService;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.SimpleRedisLock;
import com.hmdp.utils.UserHolder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.time.LocalDateTime;@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate RedissonClient redissonClient;@Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠券拿到信息;其实后续Redission可以直接用信号量来锁库存SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {//尚未开始return Result.fail("秒杀已经结束!");}//4.判断库存是否充足if (voucher.getStock() < 1) {//库存不足return Result.fail("库存不足!");}return createVoucherOrder(voucherId);}@Resourceprivate RedissonClient redissonClient;@Transactionalpublic Result createVoucherOrder(Long voucherId) {//6.一人一单Long userId = UserHolder.getUser().getId();//分布式锁创建锁对象RLock redisLock =redissonClient.getLock("lock:order:"+userId);//尝试获取锁boolean isLock =redisLock.tryLock();if(!isLock){//获取锁失败,直接返回失败或者重试return Result.fail("不允许重复下单");}try{//6.1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();//6.2.判断是否存在if (count > 0) {//用户已经购买过了return Result.fail("用户已经购买过一次!");}//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1")//这里一定是相等的吧,应该是之前查到的,你现在现查,肯定相同.eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {//扣减失败return Result.fail("库存不足!");}//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1. 订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2. 用户idvoucherOrder.setUserId(userId);// 7.3. 代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//8.返回订单idreturn Result.ok(orderId);}finally{//释放锁redisLock.unlock();}}}
他准备了1000个用户(在sql里面已经保存了)并且登录在redis中
先回顾一下咱们秒杀的业务流程
我们的秒杀业务能接收一个`voucherId`,就是当前这个用户正在抢购的优惠券的ID。而业务功能呢核心是两个:第一呢我们要去扣减这个优惠券的库存,而第二呢就是将用户抢购的优惠券信息写入订单,完成订单的创建。只不过因为是秒杀业务,所以呢他需要加入一些业务的限制啊。首先就是库存绝对不能出现超卖现象,必须判断库存啊是否充足;第二呢还要限制呀,一个用户对于一个优惠券只能下一单,也就是一人一单的功能啊。但因此整个业务啊就变得复杂了。
首先啊,要想判断优惠券的库存,你得先查询优惠券吧?那这个呢肯定是访问数据库的啊。查到了以后啊,去判断一下秒杀的时间,判断一下库存是否充足,如果不足,直接报错啊,避免超卖。那如果充足的情况下,我们再去尝试下单。那接下来呢其实就是对一人一单的这个判断了。那一人一单怎么去判断呢?啊,我们其实就是看一看,在数据库里有没有这个订单。但是啊,可能存在并发安全问题,所以在这我们利用了分布式锁,首先尝试去获取锁,获取成功再去做这个判断。而判断的时候,就是看一下当前用户有没有买过这个券啊,拿着用户ID和券的ID啊,去查询用户订单表,查完以后看一看存不存在。如果说呀已经存在了,那就证明他买过了,那肯定拒绝他呀。而如果没买过,那才可以继续。这个时候放心大胆的去扣减库存和创建订单,就可以了啊。最终业务结束,释放锁啊,整个流程也就完成了。
那大家可以看到啊,其实虽然整体来看业务非常的复杂,但是核心就是两点啊,哪两点呢?第一是对于购买资格的一个判断啊,就是这个用户他能不能买啊,能不能买依据什么依据是库存够不够,还有这个人买没买过。第二才是真正下单啊,就是学扣库存和创建订单。但是呢,因为那这里边呢有大量的数据库的操作啊,所以说呢整个业务它的性能并不是很好。那么我们做一个测试看一看啊。
嗯,在这里啊,因为我们要测试高的一个并发,所以说呢我们不能用一个用户或者两个用户去测啊。在这儿呢我们打开数据库看一下,那我提前准备了1000个用户,可以看到这里有1000条用户。而且呢这一新用户我提前已经让他们登录了啊,我们看一下Redis。在这里我提前准备好了这些用户的登录token啊,提前就创建好了。等一会儿呢,我们就会用这1000个token去发起请求啊。
好我们再回到数据库,在数据库里啊,秒杀的库存我已经提前改成了200,然后呢订单表里面也是空的啊。那接下来呢我们就去利用JMeter进行一个压测,看看目前这个接口的性能怎么样。打开JMeter,在这个地方啊,我准备了1000个请求。那我们怎么样去得到这1000个用户的token呢?大家应该知道啊,我们是把这个登录token放在请求头里的,这个请求头啊,之前我们是写死的,你发现现在我没有写死了啊。他从哪来的呢?这里写了个`${token}`的,这里是引用了一个名为token的变量,这个变量从哪来的呢?在这tokens,那么这个tokens里,大家可以看到它会去读取这个文件啊,这个文件在我们课件资料里叫`token4.txt`。打开看一下,好在这里啊,好,大家可以看到这个里边,就是我提前准备好了1000个token啊。
那现在呢我们把这个文件准备好,告诉了JMeter,那么JMeter就会去读取这里面的每一行数据啊,然后把它复制给一个变量,变量名就叫token。那这样一来呢,我们在请求头中就可以利用美元符大括号引用这个变量了。那这样他发起的请求啊,每一次请求就可以使用不同的token了啊。我们有1000个请求,然后呢是200个库存啊,我们来发起一下走。我们可以看到很快业务就结束了,在这个地方呢可以看到整个这个业务啊,它的一个情况。首先这里的最小值,最大值是我们业务的显示时间啊,可以看到最小的时候显示时间126ms,最大值800多毫秒。那为什么这里的波动这么大呢?哎,这就是因为我们这个并发利用JMeter模拟的,它不是真的一次性发了1000个请求啊,它会有一个时间一开始少,后来越来越多啊。所以说呢,他在随着这个并发的量越来越高的情况下,他的这个请求的响应时间会越来越长。因为并发越高,CPU呢就需要在多个线程之间进行来回的切换,所以这个时候呢会导致某一个请求,他处理的耗时就会增加。你可以看到平均耗时竟然达到了400多毫秒,所以说我们这个业务其实还是比较慢的啊。那这里的吞吐量是1000每秒,这里看好像还可以啊,其实随着并发越来越高,这个值只会越来越低啊。
好,这是我们现在做的一个测试。那接下来呢我们就要对它做优化了,那么具体该怎么样去做优化呢?我们回到PPT啊来去做一个分析啊。首先先回顾一下我们整个秒杀业务的流程啊,那我们前端发起请求到达我们的Index,我们的Index会把请求啊负载均衡到我们的Tomcat,而在他们干的内部啊,我们的业务流程是这样子的:查询优惠券啊,查的目的为了做库存的判断,库存判断如果没问题再去查订单,这个呢是为了做一人一单的检验。那如果这个卷还没有问题,就可以真正的去扣减库存,创建订单了。而在整个这个业务流程当中啊,它是串行执行的啊,也就是说一个执行完再往下执行,是这样子的。所以呢整个业务耗时啊,其实就是每一步的耗时之和。但是在整个这个流程里边,其中查询优惠券、查询订单啊,还有减库存、创建订单,这四步都是走的数据库。而我们知道啊,数据库的并发能力本身就是比较差的,更何况这里减库存和创建订单还是对数据库的写操作。另外呢,为了避免安全问题,我们这里还加了分布式锁。那整个业务的性能可想而知啊,所以呢我们整个业务的耗时就变得比较长。那并发的能力呢就比较弱了。
那现在我们该怎么样去优化它,提高我们这个业务的并发能力呢?哎,在这呢我给大家举一个例子啊,这个例子呢能给我们一个很好的提示啊。比如说我现在呢开了一个饭店啊,然后呢成本有限,我就请了一个人啊,一个小姐姐。那她既负责去接待顾客,还得去做饭啊。现在我们的业务流程是这样子的啊,有一个顾客来了啊,他首先呢去接待啊,你点餐啊,是你要吃什么呀,我得记录一下。然后呢收钱,那接下来他做到后处理去帮这个顾客做饭啊。那做好了以后呢,哎再给这个顾客好,流程结束了。这个流程呢就跟我们现在这个业务啊有点像啊,这个小姐姐上来要负责接待顾客点餐下单,然后呢去做饭,做完最后把这个餐给了顾客才算结束。你看他一个人负责了一条龙的完整服务,所以说呢他接待顾客的业务流程啊,就是整个流程的所有耗时之和。那因为啊做饭其实是比较耗时的,所以就导致了接待顾客的总的这个时间呀,就变长了。那处理一个顾客就要耗这么久的时间,那么他在单位时间内,能够接待的顾客的数量是不是就变少了,你的这个工作效率就变得极低。而且呢后边的顾客可能等不及人家就走了,这些人啊不是我们希望看到的。那该怎么解决呢,哎大家都能想到了,我是不是得再请一个人啊。哎,小姐姐在前面负责接待顾客,后边再请一个后厨来做饭。那这个时候的流程变成什么样子的呢?
好,顾客来了,那小姐姐呢首先去接待一下,然后点餐啊,付钱把这个信息记录一下,你要吃什么啊,交给后厨。后厨呢慢慢去做,到这小姐姐的工作已经结束了,来您旁边请啊,旁边等着来,下一位是不是还继续重复这个过程就行了。那当然了,顾客越来越多啊,我们怎么去分得清楚谁点了什么东西,谁先到谁后到呢?所以呢我们在点餐以后啊,其实要给顾客一个小票上面记录的诶,你是几号,然后你吃了什么,点了什么对吧?但这个信息不仅仅给顾客一份啊,给后厨也一份。后厨那里呢其实就会有一个点餐的列表,贴在墙上。那这个厨师呢就会按照点餐的顺序,依次去做这些东西,然后呢将来做完了以后叫号哎,是谁的,你来拿就完了。那大家来看,因为我们要把这个点餐收钱这一块业务啊,他耗时较短,分给了小姐姐去做。然后呢做饭的义务啊,他是叫九分给了另外一个人啊,后厨去做。因此呢小姐姐的工作效率是不是大大提高。那这个饭店啊它能够处理顾客的速度,是不是也提升了,单位时间内能够接待是不是更多的顾客。那同理啊,咱们这个业务是不是也可以参考这种模式啊,把它分成两部分,第一部分啊就是对于秒杀资格的判断,也就是说判断一下库存是否充足啊,判断一下一人一单这部分的耗时啊其实比较短,就相当于是小姐姐接待顾客的那部分。第二块呢是减库存下单,因为他们是对数据库的写操作,所以号是叫九,就相当于是后厨做饭的那一块。而我们现在要做的事情,就是把这两部分交给两个人去做,那就别说了,那java代码里怎么要交给两个人呢?唉我们知道执行代码的不是人啊,是线程。也就是说,我们只需要把这两块交给两个线程就行了。那请求进来以后,主线程要做的事情啊,就是去判断用户他的一个购买资格,然后如果他有购买资格,我们还可以开启一个独立的线程来处理耗时较久的减库存和下单操作,相当于是后厨。这样咱们这个业务的效率,是不是也就能大大提升了。
当然为了更进一步的提高咱们这个项目的性能,除了要把它分离成两块以外,我们还要尽可能的提高对于秒杀资格判断这部分业务的效率。那现在呢这部分对于秒杀资格判断呀,依然要去查询数据库,所以它的性能依然是会受到数据库的一个限制的。那我们知道相比于数据库来讲,什么东西的性能更好啊,没错就是Redis。所以说呢在这我们完全可以将优惠券信息、订单信息啊缓存在Redis当中,然后把对于秒杀资格的判断啊放到Redis里去做。主线程进来以后,首先呢就去找我们的Redis,完成对于秒杀资格的判断,这部分判断完啊,代表小姐姐的工作都完成了,其实已经可以结束了啊。那如果发现他有资格,我们再去执行后续的减库存下单操作就ok了。
但是就像刚才说的那样,我们要把这块分离要交给两个人去做,你不能说我调完Redis,再调他们盖后边这部分业务,那这样一来不就又变成串联成一条龙的执行了,那它的性能其实没有增加,反而降低了。所以说呢在这里我们不能出去调用它,而是干什么的,我们将开启独立线程去执行。那问题来了,我怎么知道该执行谁呢?给谁哪个用户去创建订单的?所以说我们之前讲过,是不是得有一个小票啊对吧?人家点餐完了得有小票,记住是谁啊,点了什么。那我们在这也是我们要记录什么呢?谁买了什么东西,并且呢还要返回给用户一个编号,将来的话,他是不是可以依据这个去完成付款的操作,后续的操作。哎,没错,所以在这里呢我们要做的第一件事情,其实就是记录一下啊,优惠券信息、用户信息,这里呢就是说谁买了什么,然后再去记录订单的ID,就相当于是那个叫号是吧,你是几号,然后把这封信息存储到一个队列里。那这个存储在一个队列以后,我们将来就可以开启一个独立的线程啊,去读取这个队列里面的信息,完成下单了,就相当于是后厨里边的那个什么了,哎,那个点餐的列表。点餐完成以后,用户可以得到一个参号,那将来呢可以用来叫号。那我们这里返回的则是一个订单的一方面啊,这个用户可以拿这个订单ID去付款,另一方面呢,他拿到ID就代表着他真的是抢单成功了。当然虽然此时啊这个真正的订单还没创建,但是我们将来会确保它一定会创建啊。
将来呢我们会一步的开启一个独立的线程,也就是我们这个后厨是吧,去读取这个队列中的优惠券信息、用户信息啊,完成下单操作。那这样同学们可以看到啊,我们其实接受用户请求以后的业务流程就变成什么了,进来直接判断啊,我们的秒杀资格,而且这个判断是在Redis里做的。做完以后啊,只需要把它保存到队列里,直接就结束了。那整个业务的流程是不是变短了,而且因为是基于Redis,所以性能可以得到极大的提升。整个业务的吞吐能力、并发能力是不是可以大大提高了。
不过呢这里就有一个难点的问题了,就是如何在Redis里完成对于秒杀库存的判断和一人一单的判断。好,那在这里呢我们一起来分析一下啊。要想在Redis里判断库存是否充足以及一人一单,我们肯定需要把优惠券的库存信息,以及有关的订单信息啊,给它缓存在Redis当中。那这里就会有一个问题了,我们应该选择什么样的数据结构,来保存这两个信息。因为是库存比较简单啊,库存就是一个数值嘛,所以说我们只需要一个普通的String结构就行了,他的Key就是优惠券的ID,值呢就是库存的值了。将来去做库存判断的时候,其实就是看一下这个值是不是大于零。如果大于零,那不就代表库存充足吗,我们就可以去做后续的业务了。但是呢大家一定要注意哦,当我们判断用户确实有购买资格之后啊,这个库存值一定要减一,相当于我们要在Redis里提前预减库存啊,否则的话这个值永远不变,那岂不是永远都是库存充足了,那这样呢库存判断就实现了啊。
然后我们再来说一下一人一单,要实现一人一单功能啊,我们就需要在Redis里去记录当前这个优惠券被哪些用户购买过。那以后再有用户来的时候,只需要判断它是否存在,存在证明他购买过,那就不能再买了。那这样功能是不是就实现了。不过呢大家也可以再思考一下啊,什么样的数据结构比较适合用来保存啊,购买过这个优惠券的用户的信息。大家可以把答案打在弹幕上。其实这里呢需要满足的就是两点啊,第一我们知道一个优惠券他库存有很多,所以呢将来购买的用户是不是也会有很多呀,所以要满足第一个特点,就是能够在一个Key里保存很多值,也就是一个列表。第二呢我们讲一人一单,那也就是说在这个优惠券里面保存的这些用户的ID是肯定不能重复的,他要保证唯一性。那什么样的列表具有唯一性呢?哎,这个时候啊答案是不是就很明显了,就是我们的Set集合。我们知道Set集合是可以确保元素的唯一性的,并且可以在一个Key里保存多个值。虽然将来当有用户来抢购的时候,我们只需要在这个Set里记录用户的ID,再有更多用户来啊,也是依次去记录就行了。当然如果有用户重复的来购买,比如说2号用户啊,我们以判断发现在这个集合里已经存在了,那肯定就不允许的购买,他代表他是重复购买。这样一人一单的功能是不是也就实现了。
那这个思路还是挺简单的吧,下面呢我们就一起来梳理一下整个流程啊。首先一上来呢,我们就要先去判断一下库存是否充足,就是看一下这个值是否大于零。如果说库存不足,我们肯定是返回一个错误的信息,比如说在这我们返回一啊,代表一个标识就是库存不足。然后呢业务其实就结束了。但如果库存充足呢,我们还需要去判断一人一单,其实就是看一下这个用户在这里面是否存在,是否下过单。那如果说他下过单啊,也就是存在,那这个时候代表什么是重复下单,同样我们是不是也结束返回一个二标识,是重复下单。那如果说这里不存在呢,那代表他没下过单,那代表有购买资格,我们是不是就可以去扣减库存了啊,这个地方扣减库存不是真正去数据库扣啊,而是把Redis这个值啊扣一下,减一就行了。我们讲过啊,是预减。接下来不要忘了,我们还需要把用户ID是不是保存到这个集合里,这样是不是可以作为下一次再来判断时的一个依据啊。所以说还要做一件事情,就是将用户的ID存入这个优惠券的在Set集合里。到这儿呢我们整个流程才算结束,我们就可以去返回结果了,比如说返回零。那这样来零代表的就是什么,哎,是有购买资格,一和二代表的是不是没有购买资格在这里啊,因为对Redis的判断啊,有很多个判断,业务流程比较长,而我们呢必须确保这一段流程执行时的一个原子性啊。那大家就能够想到了,用什么东西可以确保代码执行的原子性啊。哎没错,就是我们的Lua脚本啊。所以说呢我们将来啊,这部分内容直接用Lua脚本来实现
那么将来我们在Tomcat里面,我们进入到用户请求以后,要干的事情是什么?唉,上来呢就先去调用这个Lua的脚本,根据Lua脚本执行的结果是零一或者是二,来去做一个后续的判断。那如果说你不是零啊,那么代表就是失败的情况了,可能是一,可能是二对吧。那这个时候我们肯定就返回一个错误信息,就行了啊,然后就结束了。但如果说这里是零,那证明什么啊,如果是零,证明是不是你有购买资格。那有购买资格,我们就需要去按照之前说的,把这些信息首先保存在一个堆里面,什么信息呢,优惠券ID,用户ID,订单ID啊,这相当于是小票吗,谁购买了什么东西,还有你的编号是什么。保存下来,方便将来我们的异步线程去执行它,完成真正的下单。其次呢,我们还需要返回一个订单的ID,也就是那个号啊,给用户好了。那到这里呢,基于Redis的秒杀业务的流程就分析完毕了。
可以看到在这个版本当中,我们接收到用户请求以后,核心要做的事情啊,仅仅是判断一下用户有没有购买资格,没资格就报错,有资格呢就返回一个ID给他,而耗时较久的秒杀下单减库存等等,核心的这种数据库写操作,并没有在我们这个流程当中出现啊。更何况呀,我们这里判断用户购买资格,也是在Redis里通过脚本来执行的,可以说整个业务流程变得非常的短,而且呢执行的性能又非常的好。因此整个业务的耗时就非常非常短了,可以想象的是,我们这套实现方案当中,它的并发能力就会变得非常的高。
那什么时候再去做下单和减库存的逻辑呢?哎,我们之前也说过了,将来呢,我们只需要开启一个独立的线程来读取啊,提前保存好的这些用户信息啊,优惠券信息就可以完成异步的啊,数据库的这种写的操作了。而且呢在我们返回订单ID给用户的那一刻,其实秒杀业务已经结束,用户已经可以拿着这个ID去付款了。所以呢我们什么时候将优惠券信息,用户信息写入数据库里,完成下单减库存的操作,其实就没有那么重要了啊,时效性上要求就没那么高了。我们完全可以按照数据库能够承受的频率啊,去将数据写入数据库。当然如果你想提高写入数据库时的性能,我们不妨多开几个线程,甚至是把这种单个的写变成批量的写,从而提高一下这个异步操作的一个效率啊。
好吧,那到这为止呢,基于Redis啊这样的异步秒杀的业务流程,我们就分析完毕了。正是因为啊,我们将这种同步的写数据库操作,变成了异步操作啊,他一方面呢缩短了秒杀业务的流程,从而大大提高了秒杀业务的并发,另一方面呢还减轻了数据库的压力啊,可以说是一举多得啊,是一种非常好的方案啊。那下节课呢我们就去尝试着来实现一下,这套异步秒杀的方案。
改进秒杀业务,提提高并发性能
需求:
①新增秒杀优惠券的同时,将优惠券信息保存到Redis中
②基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
③如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
④开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
VoucherServicelmpl.java修改
前面加一个 @Resourceprivate StringRedisTemplate stringRedisTemplate;@Transactionalpublic void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);//报保存秒杀库存到redis中//秒杀时间在前端已经判断了,不在时间内是不显示的stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());}
}
postman给他发一个这个 http://localhost:8081/voucher/seckill
{"shopId":1,"title":"100元代金券","subTitle":"周一至周五均可使用","rules":"全场通用\\n无需预约\\n可无限叠加\\不兑现、不找零\\n仅限堂食","payValue":8000,"actualValue":10000,"type": 1,"stock":100,"beginTime":"2024-08-24T12:13:14","endTime":"2025-09-17T11:11:11"
}
写一个seckill.lua脚本
--首先要判断的就是库存是否重组,得去读取redis当中的这个key(比如seckill:stock:9)的值
--1.参数列表
--1.1.优惠券id
local voucherId =ARGV[1]
--1.2.用户id
local userId =ARGV[2]--2.数据key
--2.1.库存key
local stockKey= 'seckill:stock:'..voucherId
--2.2.订单key
local orderKey= 'seckill:order:'..voucherId--3.脚本业务
--3.1.判断库存是否充足getstockKey
if (tonumber(redis.call('get',stockKey))<=0) then
--3.2,库存不足,返回1
return 1
end
--3.2.判断用户是否下单SISMEMBERorderKeyuserId
if(redis.call('sismember',orderKey,userId) == 1) then
--3.3.存在,说明是重复下单,返回2
return 2
end
--3.4.扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
--3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd',orderKey,userId)
对voucherorderserviceimpl的seckillvoucher重构
@Resourceprivate RedisIdWorker redisIdWorker;@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}@Overridepublic Result seckillVoucher(Long voucherId) {//获取用户Long userId =UserHolder.getUser().getId();//1.执行lua脚本Long result =stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());//2. 判断结果是0int r=result.intValue();if(r!=0){//2.1 不是0就没有购买资格return Result.fail( r==1?"库存不足":"");}//2.2 为0 有购买资格 ,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");//返回订单idreturn Result.ok(0);}
VoucherOrderServiceImpl.java修改如下
package com.hmdp.service.impl;import com.hmdp.dto.Result;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import static com.baomidou.mybatisplus.core.toolkit.Wrappers.query;@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private BlockingQueue<VoucherOrder> orderTasks =new ArrayBlockingQueue<>(1024*1024);//阻塞队列,阻塞队列特点:当一个线程尝试从队列中获取元素,没有元素,线程就会被阻塞,直到队列中有元素,线程才会被唤醒,并去获取元素private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {//1. 获取队列中的订单信息VoucherOrder voucherOrder= orderTasks.take();//2.创建订单handlerVoucherOrder(voucherOrder);}catch (Exception e) {log.error("处理订单异常",e);}}}}private void handlerVoucherOrder(VoucherOrder voucherOrder) {//1.获取用户Long userId = voucherOrder.getUserId();//2.创建锁对象RLock lock = redissonClient.getLock("lock:order:" + userId);//3.获取锁boolean isLock = lock.tryLock();//4.判断是否获取锁成功if (!isLock){///获取锁失败,返回错误或重试log.error("不允许重复下单");return;}try{proxy.createVoucherOrder(voucherOrder);} finally {//释放锁lock.unlock();}}private IVoucherOrderService proxy;@Overridepublic Result seckillVoucher(Long voucherId) {//获取用户Long userId =UserHolder.getUser().getId();//1.执行lua脚本Long result =stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());//2. 判断结果是0int r=result.intValue();if(r!=0){//2.1 不是0就没有购买资格return Result.fail( r==1?"库存不足":"不能重复下单");}//2.2 为0 有购买资格 ,把下单信息保存到阻塞队列VoucherOrder voucherOrder = new VoucherOrder();//订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//2.4.用户idvoucherOrder.setUserId(userId);//2.5.代金券idvoucherOrder.setVoucherId(voucherId);//2.6.放入阻塞队列orderTasks.add(voucherOrder);//3.获取代理对象proxy= (IVoucherOrderService)AopContext.currentProxy();// proxy作为实例变量在多线程环境下可能被覆盖,导致数据不一致。//修复: 在需要时直接获取代理,避免使用实例变量://IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();//proxy.createVoucherOrder(voucherOrder);//返回订单idreturn Result.ok(orderId);}/*@Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠券拿到信息;其实后续Redission可以直接用信号量来锁库存SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {//尚未开始return Result.fail("秒杀尚未开始!");}//3.判断秒杀是否已经结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {//尚未开始return Result.fail("秒杀已经结束!");}//4.判断库存是否充足if (voucher.getStock() < 1) {//库存不足return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//创建锁对象//SimpleRedisLock lock=new SimpleRedisLock("ordere:" +userId,stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:"+userId);//获取锁boolean isLock=lock.tryLock();//其实是3个参数,也可以选择无参if(!isLock){return Result.fail("不能重复下单");}try {//获取代理对象(事务)IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}finally {//释放锁lock.unlock();}//try finally 事务可以生效,因为没有捕获异常。如果catch捕获了异常,需要抛出RuntimeException类型异常,不然事务失效。}
*/@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {//我们的同步锁就是this,是当前对象//另外呢这个事务的范围其实是更新数据库的一个范围:也就是说做减库存操作和创建电子操作而不是整个操作//6.一人一单Long userId = UserHolder.getUser().getId();//6.1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();//6.2.判断是否存在if (count > 0) {//用户已经购买过了log.error("用户已经购买过一次!");return;}//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1")//这里一定是相等的吧,应该是之前查到的,你现在现查,肯定相同.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();if (!success) {//扣减失败log.error("库存不足!");return;}/*//7.创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1. 订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2. 用户idvoucherOrder.setUserId(userId);// 7.3. 代金券idvoucherOrder.setVoucherId(voucherId);*/save(voucherOrder);//异步执行无需返回}}
lua表示在执行的时候也是一把锁;哪怕是多实例部署,这里都不需要加锁,因为Redis是独立于服务之外的,多实例可见的;但是你这里加锁,不还是通过redis,如果前面redis坏了,你这里大概率也不起作用;
报脚本错误应该是因为tonumber函数没办法转成数字,找了源码看了原因是说函数里的类型没办法转为数字,应该是因为Redis存储的键值中含有引号,需要去改一下Redistribution的默认序列化器;报错attempt to compare nil with number的,可能是优惠券信息没有保存到redis中,也就是第一步
IVoucherOrderService修改void createVoucherOrder(VoucherOrder voucherOrder);
秒杀业务的优化思路是什么?
1. 先利用Redis完成库存余量、一人一单判断,完成抢单业务
2. 再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
内存限制问题
数据安全问题