RocketMQ实现基于可靠消息的最终一致性
RocketMQ实现基于可靠消息的最终一致性
文章目录
- RocketMQ实现基于可靠消息的最终一致性
- 一、RocketMQ应用场景
- **应用解耦**
- **流量削峰**
- **数据分发**
- 二、RocketMQ 基础概念
- 1. 核心组件
- 2. 消费模式
- 3. 消息可靠性
- 三、消息类型
- 按发送方式分
- 同步发送
- 异步发送
- 单向发送
- 按使用功能特点分
- **普通消息(订阅)**
- **顺序消息**
- **延时消息——订单超时库存归还**
- **事务消息**
- **RocketMQ实现基于可靠消息最终一致性**
- **业务逻辑**
- **订单服务**
- **库存服务**
一、RocketMQ应用场景
在当今分布式系统中,消息队列已经成为服务解耦、流量削峰、异步处理的重要组成部分。无论是订单系统、库存系统,还是支付、日志系统,消息队列都能发挥巨大的作用。
RocketMQ 是阿里开源的一个分布式消息中间件,具有高吞吐、低延迟、高可用、可扩展等优点,非常适合企业级应用。
应用解耦
系统的耦合性越高,容错性就越低。举例:用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统除了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
将原本的“直接调用”改为“发送消息”,由消息队列来转发处理流程。
过程:
- 用户提交订单,请求到“订单系统”。
- 订单系统在本地完成事务后,发送消息到消息队列,例如 Kafka、RabbitMQ、RocketMQ 等。
- 其他系统(库存、支付、物流)各自订阅对应的消息,当消息到来时再异步处理:
- 库存系统:接收到“订单创建”消息后扣减库存;
- 支付系统:接收到消息后创建支付请求;
- 物流系统:准备发货。
优点:
- 某个系统挂了,不影响主流程,下单流程仍可继续,消息会保存在队列中,系统恢复后可继续消费
- 新增业务只需订阅消息,不改动订单系统逻辑
- 通过 MQ 中转,系统之间不直接调用,降低耦合度
流量削峰
消息队列可将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提高系统的稳定性和用户体验。当系统负载超过阈值,使用消息队列将请求缓存起来(而不是直接组织用户请求),等待系统处理完毕后通知用户下单完毕。
数据分发
通过消息队列可以让数据在多个系统间流通,数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中获取数据即可。
二、RocketMQ 基础概念
1. 核心组件
- Producer:消息生产者,负责发送消息。
- Consumer:消息消费者,负责接收消息。
- Broker:消息中转站,存储和转发消息(邮局)。
- NameServer:注册中心,维护 Broker 路由信息(各个邮局的管理机构)。
- Topic:区分消息的种类,一个发送者可以发送消息给一个或者多个Topic,一个消息的接收者可以订阅一个或者多个Topic消息
- Message Queue:相当于Topic的分区,用于并行发送和接受消息
2. 消费模式
- Push 模式:Broker 主动推送消息给 Consumer。
- Pull 模式:Consumer 主动轮询 Broker 获取消息。
3. 消息可靠性
RocketMQ 支持消息的持久化存储、消息确认机制、消息重试,保证消息在分布式环境下的可靠传递。
三、消息类型
按发送方式分
同步发送
-
同步发送,线程阻塞,投递completes阻塞结束
-
如果发送失败,会在默认的超时时间3秒内重试,最多重试2次
-
投递completes不代表投递成功,要check SendResult.sendStatus来判断是否投递成功
-
SendResult里面有发送状态的枚举:SendStatus,同步的消息投递有一个状态返回值
-
retry的实现原理,只有ack的SendStatus=SEND_OK才会停止retry(发送同步消息且Ack为SEND_OK,只代表消息成功的写入了MQ当中,并不代表该消息成功被消费)
异步发送
- 当前线程一定要等待异步线程毁掉结束再关闭producer,因为是异步的,不会阻塞,提前关闭producer会导致未回调链接就断开
- 异步消息不retry,投递失败回调onException()方法,只有同步消息才会retry
- 异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,比如用户视频上传后通知启动转码服务,转码完成后通知转码结果
单向发送
- 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功
- 此方式发送消息的过程耗时非常短,一般在微妙级别
按使用功能特点分
普通消息(订阅)
生产者需要关注消息发送成功即可,消费者消费到消息即可,不需要保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量很高,适合大部分场景
顺序消息
哪条消息先进入,那条消息就先被消费。分为全局顺序消息和分区顺序消息
全局顺序消息:所有消息都在同一个队列中(相当于只有一个分区的分区顺序消息)(代价大,并发性能低)
分区顺序消息:相同业务标识的消息始终发送到同一个队列,从而保证该业务内消息有序(并发性好)
延时消息——订单超时库存归还
延时的机制是在服务端实现的,也及时Broker收到了消息,但是经过一段时间以后才发送
事务消息
RocketMQ 分布式事务是一种 最终一致性事务模型,流程分为三步:
- 发送预消息(half message):消息不会被消费。
- 执行本地事务:如创建订单、扣库存。
- 返回事务状态给 MQ:
- 成功 → MQ 提交消息,被消费者消费;
- 失败 → MQ 回滚消息,不再消费;
- 超时未回复(未知状态) → RocketMQ 调用 CheckLocalTransaction 查询事务状态。
RocketMQ实现基于可靠消息最终一致性
微服务间事务的问题:1.磁盘满了,服务宕机 2.代码出现异常,直接崩溃停止 3.网络拥塞或抖动
先扣减库存,再新建订单:
-
本地执行失败,则调用库存归还接口
-
非本地执行失败,而是代码异常程序崩溃,此时不知道本地执行情况
先创建订单,后扣减库存:
将库存服务加入到本地事务中,先在本地进行订单相关业务处理,最后扣减库存成功后才提交整体事务,否则回滚。
问题:
- 扣减库存请求发送出去,但是网络拥塞,则会多次发送请求重试,实际多次扣减库存。
- 本地订单服务向库存服务发送请求,发送后本地订单服务宕机,本地订单事务不会提交,但是库存成功扣减。或者网络拥塞造成实际库存服务扣减成功,但是本地服务以为扣减失败,本地事务进行回滚,但是库存还是扣减了。
可靠消息要确保:消息一定能被消费者成功消费
- 积分服务:积分没有上限,加积分一定可以成功
- 库存服务:库存不能为负数,所以扣减库存允许失败
项目中的最终解决方案:
RocketMQ 的事务消息用于处理“消息可靠发送 + 本地事务一致性”场景。发送事务消息后,可以:
- 执行本地事务
- 根据本地事务的执行结果来提交、回滚或挂起(未知)事务消息
- 若返回未知,RocketMQ 会定时回查消息的事务状态
业务逻辑
订单服务
新建订单方法:
使用RocketMQ初始化事务监听器和事务消息生产者,将订单信息发送出去,指定topic为库存归还(预先发送库存归还消息,如果本地事务执行成功则回滚消息,如果本地事务执行失败则提交消息)
发送消息后,RocketMQ会自动调用事务监听器的方法,执行本地事务
func (*OrderServer) CreateOrder(ctx context.Context, req *proto.OrderRequest) (*proto.OrderInfoResponse, error) {// 初始化订单事务监听器orderListener := OrderListener{Ctx: ctx}// 创建事务消息生产者p, err := rocketmq.NewTransactionProducer(&orderListener,producer.WithNameServer([]string{"192.168.26.2:9876"}),)if err != nil {zap.S().Errorf("生成producer失败: %s", err.Error())return nil, err}// 启动事务消息if err = p.Start(); err != nil {zap.S().Errorf("启动producer失败: %s", err.Error())return nil, err}// 构造订单信息order := model.OrderInfo{User: req.UserId,OrderSn: GenerateOrderSn(req.UserId),Address: req.Address,SignerName: req.Name,SingerMobile: req.Mobile,Post: req.Post,}jsonString, _ := json.Marshal(order)// 发送事务消息(订单信息)(预先发送库存归还消息,如果本地事务执行成功则回滚消息,如果本地事务执行失败则提交消息)_, err = p.SendMessageInTransaction(context.Background(),primitive.NewMessage("order_reback", jsonString),)if err != nil {fmt.Printf("发送失败: %s\n", err)return nil, status.Error(codes.Internal, "发送消息失败")}// 判断事务监听器中的状态码是否为成功,如果不成功(本地事务执行失败)就返回错误if orderListener.Code != codes.OK {return nil, status.Error(orderListener.Code, orderListener.Detail)}return &proto.OrderInfoResponse{Id: orderListener.ID, OrderSn: order.OrderSn, Total: orderListener.OrderAmount}, nil
}
事务监听器:
实现事务监听接口(此接口有两个方法:执行本地事务和检查本地事务)的结构体(方法调用者),在其中加入自定义字段,存储接口中的方法结束后,需要返回的信息(直接将信息存储到方法调用者中的相应字段里,以便在新建订单方法中可以直接查看)
由于接口中的方法,返回的是primitive.LocalTransactionState(事务状态:提交或回滚或未知),所以在执行完本地事务后,其它信息字段不能直接返回,需要存储到方法调用者结构体中
type OrderListener struct {Code codes.Code // 返回的grpc中的codeDetail string // 返回的信息ID int32 // 订单IDOrderAmount float32 // 订单总金额Ctx context.Context
}
执行本地事务方法:
-
从购物车中获取到选中的商品
-
商品价格查询 - 访问商品服务 (跨微服务)
-
库存扣减 - 访问库存服务 (跨微服务)。扣减库存之前,如果出错则回滚消息撤销库存归还;扣减库存之后,如果出错,则提交消息进行库存归还
-
向 订单的基本信息表 和 订单的商品信息表 中插入数据
-
从购物车中删除已购买的记录
-
发送延时消息:每创建一个订单,都将该订单信息作为延时消息发送出去,指定topic为订单超时(本意:假设订单30分钟后超时,那么创建一个订单30分钟后,就该对此订单进行检查,判断此订单是否已支付,防止用户下单后一直不支付占用库存)
func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {/*新建订单时的本地事务1. 从购物车中获取到选中的商品2. 商品价格查询 - 访问商品服务 (跨微服务)3. 库存扣减 - 访问库存服务 (跨微服务)4. 向 订单的基本信息表 和 订单的商品信息表 中插入数据5. 从购物车中删除已购买的记录*/var orderInfo model.OrderInfo// 将订单信息序列化到结构体中_ = json.Unmarshal(msg.Body, &orderInfo)parentSpan := opentracing.SpanFromContext(o.Ctx)var goodsIds []int32 // 购物车中选中的商品id集合(为了通过id集合获取商品信息)var shopCarts []model.ShoppingCart // 购物车中该用户选中的商品集合goodsNumsMap := make(map[int32]int32) // 购物车种该用过户选中的商品id和商品数量shopCartSpan := opentracing.GlobalTracer().StartSpan("select_shopcart", opentracing.ChildOf(parentSpan.Context()))// 1.去购物车表中查找,该用户选中的商品集合if result := global.DB.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Find(&shopCarts); result.RowsAffected == 0 {// 如果没有选中结算的商品,则返回错误信息o.Code = codes.InvalidArgumento.Detail = "没有选中结算的商品"// 这时本地事务出错,但还没有进行库存扣减,所以不需要调库存归还的消息,所以回滚消息return primitive.RollbackMessageState}shopCartSpan.Finish()// 2.封装商品id集合,和 商品id:商品数量 的mapfor _, shopCart := range shopCarts {goodsIds = append(goodsIds, shopCart.Goods)goodsNumsMap[shopCart.Goods] = shopCart.Nums}queryGoodsSpan := opentracing.GlobalTracer().StartSpan("query_goods", opentracing.ChildOf(parentSpan.Context()))// 3.调用商品微服务,批量获取商品信息(根据商品id集合)goods, err := global.GoodsSrvClient.BatchGetGoods(context.Background(), &proto.BatchGoodsIdInfo{Id: goodsIds})if err != nil {o.Code = codes.Internalo.Detail = "批量查询商品信息失败"// 这时本地事务出错,但还没有进行库存扣减,所以不需要调库存归还的消息,所以回滚消息return primitive.RollbackMessageState}queryGoodsSpan.Finish()var orderAmount float32 // 订单总金额var orderGoods []*model.OrderGoods // 订单商品详情集合var goodsInvInfo []*proto.GoodsInvInfo // 订单商品数量集合// 4.遍历商品信息集合,封装 订单总金额 和 订单商品详情集合 和 订单商品数量集合for _, good := range goods.Data {// 订单总金额(封装到订单详情中,用于向订单表中插入数据):商品单价*商品数量orderAmount += good.ShopPrice * float32(goodsNumsMap[good.Id])// 订单中的商品详情集合(用于向订单商品表中插入数据)(这里没有填充Order订单号字段,后续填充)orderGoods = append(orderGoods, &model.OrderGoods{Goods: good.Id,GoodsName: good.Name,GoodsImage: good.GoodsFrontImage,GoodsPrice: good.ShopPrice,Nums: goodsNumsMap[good.Id],})// 订单中的商品数量集合(用于扣减库存)goodsInvInfo = append(goodsInvInfo, &proto.GoodsInvInfo{GoodsId: good.Id,Num: goodsNumsMap[good.Id],})}queryInvSpan := opentracing.GlobalTracer().StartSpan("query_inv", opentracing.ChildOf(parentSpan.Context()))// 5.调用库存服务,扣减库存(根据订单号和商品数量详情集合)if _, err = global.InventorySrvClient.Sell(context.Background(), &proto.SellInfo{OrderSn: orderInfo.OrderSn, GoodsInfo: goodsInvInfo}); err != nil {// 对错误进行判断处理st, _ := status.FromError(err)// 如果是业务逻辑错误if st.Code() == codes.InvalidArgument || st.Code() == codes.Internal || st.Code() == codes.ResourceExhausted {o.Code = st.Code()o.Detail = "扣减库存失败"// 这时扣减库存服务出错,库存服务内部进行数据库事务回滚,所以还没有进行库存扣减,不需要调库存归还的消息,所以回滚消息return primitive.RollbackMessageState}// TODO如果是网络问题(实际调用成功,但是因为网络问题返回失败)}// 运行到这里,库存已经扣减成功。这里是分界线,分界线前执行失败则回滚事务消息,分界线后执行失败则提交事务消息queryInvSpan.Finish()// 6.向订单表中插入订单// 开启数据库事务tx := global.DB.Begin()orderInfo.OrderMount = orderAmountsaveOrderSpan := opentracing.GlobalTracer().StartSpan("save_order", opentracing.ChildOf(parentSpan.Context()))if result := tx.Save(&orderInfo); result.RowsAffected == 0 {// 数据库事务回滚tx.Rollback()o.Code = codes.Internalo.Detail = "创建订单失败"// 提交消息,进行库存归还return primitive.CommitMessageState}saveOrderSpan.Finish()o.OrderAmount = orderAmounto.ID = orderInfo.ID// 7.遍历订单中的商品详情集合(填充orderGoods中的Order订单号字段)for _, orderGood := range orderGoods {orderGood.Order = orderInfo.ID}saveOrderGoodsSpan := opentracing.GlobalTracer().StartSpan("save_order_goods", opentracing.ChildOf(parentSpan.Context()))// 8.向订单商品表中批量插入订单商品集合if result := tx.CreateInBatches(orderGoods, 100); result.RowsAffected == 0 {tx.Rollback() // 回滚事务o.Code = codes.Internalo.Detail = "批量插入订单商品失败"// 提交消息,进行库存归还return primitive.CommitMessageState}saveOrderGoodsSpan.Finish()// 9.删除购物车中选中的商品deleteShopCartSpan := opentracing.GlobalTracer().StartSpan("delete_shopcart", opentracing.ChildOf(parentSpan.Context()))if result := tx.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Delete(&model.ShoppingCart{}); result.RowsAffected == 0 {tx.Rollback() // 回滚事务o.Code = codes.Internalo.Detail = "删除购物车记录失败"// 提交消息,进行库存归还return primitive.CommitMessageState}deleteShopCartSpan.Finish()// 10.发送延时消息p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.26.2:9876"}))if err != nil {panic("生成producer失败")}// 不要在一个进程中使用多个producer,不要随便调用shutdown因为会影响其他的producerif err = p.Start(); err != nil {panic("启动producer失败")}// 向order_timeout订单超时的topic中发送延时消息(订单信息)msg = primitive.NewMessage("order_timeout", msg.Body)msg.WithDelayTimeLevel(3) // 延时10秒(上线后设置为30分钟即level16)_, err = p.SendSync(context.Background(), msg)if err != nil {zap.S().Errorf("发送延时消息失败: %v\n", err)tx.Rollback() // 回滚事务o.Code = codes.Internalo.Detail = "发送延时消息失败"return primitive.CommitMessageState}// 11.提交事务// 先提交数据库事务tx.Commit()o.Code = codes.OK// 本地事务执行成功,回滚事务消息return primitive.RollbackMessageState
}
检查本地事务方法:
查找订单表中是否有此订单,如果订单表中没有此订单,说明本地事务执行失败,所以直接提交消息;如果存在此订单,说明本地事务执行成功,所以回滚消息(缺陷:订单表中没有此订单,不代表本地事务已经将库存扣减了,可能还没扣减前就已经出错了,这是提交消息会造成库存多扣减(没有保证幂等性))
func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {var orderInfo model.OrderInfo_ = json.Unmarshal(msg.Body, &orderInfo)// 查找订单表if result := global.DB.Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&orderInfo); result.RowsAffected == 0 {// 如果订单表中没有此订单,说明本地事务执行失败,所以直接提交消息// TODO缺陷:订单表中没有此订单,不代表本地事务已经将库存扣减了,可能还没扣减前就已经出错了,这是提交消息会造成库存多扣减(没有保证幂等性)return primitive.CommitMessageState // 不能说明这里就是库存已经扣减了}// 如果存在此订单,说明本地事务执行成功,所以回滚消息return primitive.RollbackMessageState
}
main函数:
创建一个消费者,订阅topic为订单超时,指定处理函数为OrderTimeout
// 创建一个RocketMQ消费者,处理订单超时的消息c, _ := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"192.168.26.2:9876"}),consumer.WithGroupName("mxshop-order"),)// 订阅了名为 "order_timeout" 的主题,使用了默认的消息选择器,指定了 handler.OrderTimeout 作为消息处理函数,if err := c.Subscribe("order_timeout", consumer.MessageSelector{}, handler.OrderTimeout); err != nil {fmt.Println("读取消息失败")}_ = c.Start()
OrderTimeout函数:
- 逐个订单进行处理,先查询订单的支付状态,如果是“交易成功”则无需处理
- 如果不是“交易成功”,说明订单没有支付,但是已经超时,所以进行库存归还
- 将订单状态修改为“交易结束”
- 更新订单表中此订单的订单状态
- 创建一个RocketMQ生产者,向topic库存归还发送普通消息
func OrderTimeout(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {// 遍历所有消息中的订单for i := range msgs {// 将订单信息反序列化var orderInfo model.OrderInfo_ = json.Unmarshal(msgs[i].Body, &orderInfo)fmt.Printf("获取到订单超时消息: %v\n", time.Now())// 查询订单的支付状态var order model.OrderInfoif result := global.DB.Model(model.OrderInfo{}).Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&order); result.RowsAffected == 0 {// 如果订单表中没有此订单,则无需操作return consumer.ConsumeSuccess, nil}// 如果订单表中有此订单if order.Status != "TRADE_SUCCESS" {// 如果订单的状态不是交易成功,说明订单没有支付,但是已经超时,所以进行库存归还tx := global.DB.Begin()// 修改订单的状态为交易结束order.Status = "TRADE_CLOSED"// 更新订单表中此订单的订单状态tx.Save(&order)// 创建一个生产者p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.26.2:9876"}))if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}// 发送消息:将此订单消息发送出去进行库存归还,指定topic为库存归还// 在inventory_srv/main.go中创建消费者,从此topic中消费消息,进行库存归还_, err = p.SendSync(context.Background(), primitive.NewMessage("order_reback", msgs[i].Body))if err != nil {// 回滚数据库事务tx.Rollback()fmt.Printf("发送失败: %s\n", err)return consumer.ConsumeRetryLater, nil}return consumer.ConsumeSuccess, nil}}// 如果订单的状态是交易成功,则不用库存归还return consumer.ConsumeSuccess, nil
}
库存服务
main函数:
创建一个消费者,订阅topic为库存归还,指定处理函数为AutoReback
// 创建一个消费者c, _ := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"192.168.26.2:9876"}),consumer.WithGroupName("mxshop-inventory"),)// 订阅了名为 "order_reback" 的主题,使用了默认的消息选择器,指定了 handler.AutoReback 作为消息处理函数if err := c.Subscribe("order_reback", consumer.MessageSelector{}, handler.AutoReback); err != nil {fmt.Println("读取消息失败")}_ = c.Start()
AutoReback函数:
逐个订单进行处理,将库存进行归还并将库存状态设置为2已归还
func AutoReback(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {type OrderInfo struct {OrderSn string // 订单号}// 遍历消息,需要库存归还的订单for i := range msgs {// 重复归还的问题,接口应该确保幂等性,不能因为消息的重复发送导致一个订单的库存归还多次,没有扣减的库存不归还// 解决方式:新建一张表,这张表记录了详细的订单扣减细节,以及归还细节var orderInfo OrderInfoerr := json.Unmarshal(msgs[i].Body, &orderInfo)if err != nil {zap.S().Errorf("解析json失败: %v\n", msgs[i].Body)return consumer.ConsumeSuccess, nil}// 开启数据库事务tx := global.DB.Begin()var sellDetail model.StockSellDetail// 查询StockSellDetail表中,此订单号并且库存状态为1已扣减的订单if result := tx.Model(&model.StockSellDetail{}).Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn, Status: 1}).First(&sellDetail); result.RowsAffected == 0 {// 如果未查询到,说明库存已归还return consumer.ConsumeSuccess, nil}// 如果查询到则逐个商品归还库存for _, orderGood := range sellDetail.Detail {// sql:update stocks set stocks = stocks + orderGood.Num where goods = orderGood.Goodsif result := tx.Model(&model.Inventory{}).Where(&model.Inventory{Goods: orderGood.Goods}).Update("stocks", gorm.Expr("stocks+?", orderGood.Num)); result.RowsAffected == 0 {tx.Rollback()return consumer.ConsumeRetryLater, nil}}// 更新StockSellDetail表中,此订单号的库存状态为2已归还if result := tx.Model(&model.StockSellDetail{}).Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn}).Update("status", 2); result.RowsAffected == 0 {tx.Rollback()return consumer.ConsumeRetryLater, nil}tx.Commit()return consumer.ConsumeSuccess, nil}return consumer.ConsumeSuccess, nil
}// 确认扣减库存
func (*InventoryServer) ConfirmSell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {client := goredislib.NewClient(&goredislib.Options{Addr: "192.168.26.2:6379",})pool := goredis.NewPool(client)rs := redsync.New(pool)tx := global.DB.Begin()for _, goodInfo := range req.GoodsInfo {var inv model.InventoryNew//if result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where(&model.Inventory{Goods:goodInfo.GoodsId}).First(&inv); result.RowsAffected == 0 {// tx.Rollback() //回滚之前的操作// return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")//}//for {mutex := rs.NewMutex(fmt.Sprintf("goods_%d", goodInfo.GoodsId))if err := mutex.Lock(); err != nil {return nil, status.Errorf(codes.Internal, "获取redis分布式锁异常")}if result := global.DB.Where(&model.Inventory{Goods: goodInfo.GoodsId}).First(&inv); result.RowsAffected == 0 {tx.Rollback() //回滚之前的操作return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")}//判断库存是否充足if inv.Stocks < goodInfo.Num {tx.Rollback() //回滚之前的操作return nil, status.Errorf(codes.ResourceExhausted, "库存不足")}//扣减, 会出现数据不一致的问题 - 锁,分布式锁inv.Stocks -= goodInfo.Num // 对库存进行扣减inv.Freeze -= goodInfo.Num // 对冻结库存进行恢复tx.Save(&inv)if ok, err := mutex.Unlock(); !ok || err != nil {return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")}}tx.Commit() // 需要自己手动提交操作//m.Unlock() //释放锁return &emptypb.Empty{}, nil
}