2025-alibaba-分布式事务组件-Seata
分布式事务
实现思路:两阶段提交协议(2PC)
2PC存在的问题
- 同步阻塞问题
2PC 中的参与者是阻塞的。在第一阶段收到请求后就会预先锁定资源,一直到 commit 后才会释放。
- 单点故障
由于协调者的重要性,一旦协调者TM发生故障,参与者RM会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。
- 数据不一致
若协调者第二阶段发送提交请求时崩溃,可能部分参与者收到commit请求提交了事务,而另一部分参与者未收到commit请求而放弃事务,从而造成数据不一致的问题。
.Seata是什么
Seata的生命周期
1、TM 请求 TC 开启一个全局事务。TC 会生成一个 XID 作为该全局事务的编号。XID会在微服务的调用链路中传播,保证将多个微服务的子事务关联在一起。
2、RM 请求 TC 将本地事务注册为全局事务的分支事务,通过全局事务的 XID 进行关联。
5、TM 请求 TC 告诉 XID 对应的全局事务是进行提交还是回滚。
4、TC 驱动 RM 们将 XID 对应的自己的本地事务进行提交还是回滚。
原理 Seata AT模式的设计思路
Seata AT模式的核心是对业务无侵入,是一种改进后的两阶段提交,其设计思路如下:
- 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
- 二阶段:
-
- 提交异步化,非常快速地完成。
- 回滚通过一阶段的回滚日志进行反向补偿。
一阶段
业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。核心在于对业务sql进行解析,转换成undolog,并同时入库,这是怎么做的呢?
二阶段
- 分布式事务操作成功,则TC通知RM异步删除undolog
- 分布式事务操作失败,TM向TC发送回滚请求,RM 收到协调器TC发来的回滚请求,通过 XID 和 Branch ID 找到相应的回滚日志记录,通过回滚记录生成反向的更新 SQL 并执行,以完成分支的回滚。
seata 快速安装启动
下载地址
快速启动 | Apache Seata
sql 脚本
创建数据库 并运行脚本文件 创建表结构如下
修改seata的配置文件 指定nacos的注册和配置中心
server:
port: 7091
spring:
application:
name: seata-server
logging:
config: classpath:logback-spring.xml
file:
path: ${user.home}/logs/seata
extend:
logstash-appender:
destination: 127.0.0.1:4560
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash
console:
user:
username: seata
password: seata
# 配置注册中心 和 服务发现中心 ---- 注意、注意、注意 group: SEATA_GROUP
seata:
config:
# support: nacos, consul, apollo, zk, etcd3
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: xulk
group: SEATA_GROUP
data-id: seataServer.properties
username: nacos
password: nacos
registry:
# support: nacos, eureka, redis, zk, consul, etcd3, sofa type: nacos 指定nacos为配置中心
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
namespace: xulk
cluster: default
username: nacos
password: nacos
store:
# support: file 、 db 、 redis
mode: file
# server:
# service-port: 8091 #If not configured, the default is '${server.port} + 1000'
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login
nacos 配置文件来源
需要修改的地方
注意 如果数据库是 8.0 驱动需要修改 com.mysql.cj.jdbc.Driver
store.mode=db
store.lock.mode=db
store.session.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=root
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
开始启动
启动成功
登录 http://127.0.0.1:7091 用户名 / 密码 : seata / seata
nacos 注册成功
配置事务分组, 要与client配置的事务分组一致
- 事务分组:seata的资源逻辑,可以按微服务的需要,在应用程序(客户端)对自行定义事务分组,每组取一个名字。
- 集群:seata-server服务端一个或多个节点组成的集群cluster。 应用程序(客户端)使用时需要指定事务逻辑分组与Seata服务端集群的映射关系。
springboot 项目开始集成seata
使用AT模式
微服务导入seata依赖
<!-- seata--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency>
微服务对应数据库中添加undo_log表(仅AT模式) 每个数据库都要添加 日志表
CREATE TABLE `undo_log` (
`branch_id` bigint NOT NULL COMMENT 'branch transaction id',
`xid` varchar(128) NOT NULL COMMENT 'global transaction id',
`context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`),
KEY `ix_log_created` (`log_created`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='AT transaction mode undo table';
微服务application.yml中添加seata配置
server:
port: 8020
spring:
application:
name: order-service
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
username: nacos
password: nacos
namespace: xulk
group: SEATA_GROUP
datasource:
type: com.alibaba.druid.pool.DruidDataSource
druid:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_at_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
username: root
password: root
initial-size: 10
max-active: 100
min-idle: 10
max-wait: 60000
pool-prepared-statements: true
max-pool-prepared-statement-per-connection-size: 20
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 300000
test-while-idle: true
test-on-borrow: false
test-on-return: false
stat-view-servlet:
enabled: true
url-pattern: /druid/*
filter:
stat:
log-slow-sql: true
slow-sql-millis: 1000
merge-sql: false
wall:
config:
multi-statement-allow: true
logging:
level:
com.tuling: debug
seata:
application-id: ${spring.application.name}
# seata 服务分组,要与服务端配置service.vgroup_mapping的后缀对应
tx-service-group: default_tx_group
registry:
# 指定nacos作为注册中心
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
namespace: xulk
group: SEATA_GROUP
username: nacos
password: nacos
config:
# 指定nacos作为配置中心
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: xulk
group: SEATA_GROUP
data-id: seataServer.properties
username: nacos
password: nacos
#暴露actuator端点
management:
endpoints:
web:
exposure:
include: '*'
feign:
sentinel:
enabled: true
controller层
@RestController
@RequestMapping("/order")
@Slf4j
public class OrderController {
@Autowired
private OrderService orderService;
// /order/createOrder
@PostMapping("/createOrder")
public ResultVo createOrder(@RequestBody OrderVo orderVo) throws Exception {
log.info("收到下单请求,用户:{}, 商品编号:{}", orderVo.getUserId(), orderVo.getCommodityCode());
Order order = orderService.saveOrder(orderVo);
return ResultVo.ok().put("order",order);
}
}
测试的 service
要保证哪个接口的事务就在哪个接口上添加 全局事务的注解
@GlobalTransactional(name="createOrder",rollbackFor=Exception.class)
import com.tuling.datasource.entity.Order;
import com.tuling.datasource.entity.OrderStatus;
import com.tuling.datasource.mapper.OrderMapper;
import com.tuling.order.feign.AccountFeignService;
import com.tuling.order.feign.StorageFeignService;
import com.tuling.order.service.OrderService;
import com.tuling.order.vo.OrderVo;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountFeignService accountFeignService;
@Autowired
private StorageFeignService storageFeignService;
@Override
// @Transactional
@GlobalTransactional(name="createOrder",rollbackFor=Exception.class)
public Order saveOrder(OrderVo orderVo) {
log.info("=============用户下单=================");
log.info("当前 XID: {}", RootContext.getXID());
// 保存订单
Order order = new Order();
order.setUserId(orderVo.getUserId());
order.setCommodityCode(orderVo.getCommodityCode());
order.setCount(orderVo.getCount());
order.setMoney(orderVo.getMoney());
order.setStatus(OrderStatus.INIT.getValue());
Integer saveOrderRecord = orderMapper.insert(order);
log.info("保存订单{}", saveOrderRecord > 0 ? "成功" : "失败");
//扣减库存
storageFeignService.deduct(orderVo.getCommodityCode(), orderVo.getCount());
//扣减余额
Boolean debit= accountFeignService.debit(orderVo.getUserId(), orderVo.getMoney());
// if(!debit){
// // 解决 feign整合sentinel降级导致Seata失效的处理
// throw new RuntimeException("账户服务异常降级了");
// }
//更新订单
Integer updateOrderRecord = orderMapper.updateOrderStatus(order.getId(),OrderStatus.SUCCESS.getValue());
log.info("更新订单id:{} {}", order.getId(), updateOrderRecord > 0 ? "成功" : "失败");
return order;
}
}
feign接口
//feign 接口1
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Repository;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(name="storage-service",path="/storage")
@Repository
public interface StorageFeignService {
@RequestMapping(path = "/deduct")
Boolean deduct(@RequestParam("commodityCode") String commodityCode,@RequestParam("count") Integer count);
}
//feign 接口2
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Repository;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(name = "account-service",path = "/account")
@Repository
public interface AccountFeignService {
@RequestMapping("/debit")
Boolean debit(@RequestParam("userId") String userId,@RequestParam("money") int money);
}
通过feign远程调用的 Controller 接口
// Controller 接口 2
import com.tuling.account.service.AccountService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/account")
public class AccountController {
@Autowired
private AccountService accountService;
@RequestMapping("/debit")
public Boolean debit(String userId, int money) throws Exception {
// 用户账户扣款
accountService.debit(userId, money);
return true;
}
}
// Controller 接口 2
import com.tuling.storage.service.StorageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/storage")
public class StorageController {
@Autowired
private StorageService storageService;
@RequestMapping(path = "/deduct")
public Boolean deduct(String commodityCode, Integer count) {
// 扣减库存
storageService.deduct(commodityCode, count);
return true;
}
}
通过feign远程调用的 Controller 接口 调用的 service层
扣减库存 接口需要添加本地事务 @Transactional
import com.tuling.datasource.entity.Storage;
import com.tuling.datasource.mapper.StorageMapper;
import com.tuling.storage.service.StorageService;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class StorageServiceImpl implements StorageService {
@Autowired
private StorageMapper storageMapper;
@Transactional
@Override
public void deduct(String commodityCode, int count){
log.info("=============扣减库存=================");
log.info("当前 XID: {}", RootContext.getXID());
// 检查库存
checkStock(commodityCode,count);
log.info("开始扣减 {} 库存", commodityCode);
Integer record = storageMapper.reduceStorage(commodityCode,count);
log.info("扣减 {} 库存结果:{}", commodityCode, record > 0 ? "操作成功" : "扣减库存失败");
}
private void checkStock(String commodityCode, int count){
log.info("检查 {} 库存", commodityCode);
Storage storage = storageMapper.findByCommodityCode(commodityCode);
if (storage.getCount() < count) {
log.warn("{} 库存不足,当前库存:{}", commodityCode, count);
throw new RuntimeException("库存不足");
}
}
}
扣减用户金额 接口需要添加本地事务 @Transactional
import com.tuling.account.service.AccountService;
import com.tuling.datasource.entity.Account;
import com.tuling.datasource.mapper.AccountMapper;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
private static final String ERROR_USER_ID = "1002";
@Autowired
private AccountMapper accountMapper;
/**
* 扣减用户金额
* @param userId
* @param money
*/
@Transactional
@Override
public void debit(String userId, int money){
log.info("=============用户账户扣款=================");
log.info("当前 XID: {}", RootContext.getXID());
checkBalance(userId, money);
log.info("开始扣减用户 {} 余额", userId);
Integer record = accountMapper.reduceBalance(userId,money);
// if (ERROR_USER_ID.equals(userId)) {
// // 模拟异常
// throw new RuntimeException("account branch exception");
// }
log.info("扣减用户 {} 余额结果:{}", userId, record > 0 ? "操作成功" : "扣减余额失败");
}
private void checkBalance(String userId, int money){
log.info("检查用户 {} 余额", userId);
Account account = accountMapper.selectByUserId(userId);
if (account.getMoney() < money) {
log.warn("用户 {} 余额不足,当前余额:{}", userId, account.getMoney());
throw new RuntimeException("余额不足");
}
}
}
开始测试 请求地址
请求地址 localhost:8020/order/createOrder
请求参数
{
"userId": "1001",
"commodityCode": "2001",
"count": 2,
"money": 70
}
进入订单的断点
订单表插入数据
订单库的 undo_log 表 插入回滚的数据
此时库存还没扣除
放过断点 进入 扣除库存的接口,还没扣除余额(未触发异常信息)
此时seata 锁住了 订单和库存
事务信息
此时已经扣除库存
此时放过断点 由于余额不足,触发异常信息
库存回滚
订单回滚
订单 undo_log 表异步删除
注意、注意、注意 这里有个坑
如果调用远程服务的时候使用熔断降级的功能,由于降级是通过 try catch 的方式实现的,进入降级的方法后,就相当于进入catch模块了,此时全局事务就不能捕捉到异常信息 就无法回滚了,可以通过手动判断的方式,如果出现异常后 手动判断是否降级异常等信息,然后手动在抛出异常
//扣减余额
Boolean debit= accountFeignService.debit(orderVo.getUserId(), orderVo.getMoney());
if(!debit){
// 解决 feign整合sentinel降级导致Seata失效的处理
throw new RuntimeException("账户服务异常降级了");
}
数据库脚本
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for account_tbl
-- ----------------------------
DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
`money` int(0) NULL DEFAULT 0,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `user_id`(`user_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of account_tbl
-- ----------------------------
INSERT INTO `account_tbl` VALUES (1, '1001', 60);
INSERT INTO `account_tbl` VALUES (2, '1002', 50);
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`branch_id` bigint(0) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'global transaction id',
`context` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(0) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE,
INDEX `ix_log_created`(`log_created`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for order_tbl
-- ----------------------------
DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
`commodity_code` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
`count` int(0) NULL DEFAULT 0,
`money` int(0) NULL DEFAULT 0,
`status` int(0) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of order_tbl
-- ----------------------------
INSERT INTO `order_tbl` VALUES (4, '1001', '2001', 2, 10, 1);
INSERT INTO `order_tbl` VALUES (7, '1001', '2001', 2, 10, 1);
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`branch_id` bigint(0) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'global transaction id',
`context` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(0) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE,
INDEX `ix_log_created`(`log_created`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for storage_tbl
-- ----------------------------
DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
`id` int(0) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
`count` int(0) NULL DEFAULT 0,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `commodity_code`(`commodity_code`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of storage_tbl
-- ----------------------------
INSERT INTO `storage_tbl` VALUES (1, '2001', 994);
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`branch_id` bigint(0) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'global transaction id',
`context` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(0) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE,
INDEX `ix_log_created`(`log_created`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;