SpringCloud Alibaba篇 - Seata【分布式事务】
⑤ Seata【分布式事务】
5.1 分布式事务的由来
实例
单体应用被拆分成微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用三个独立的数据源,
业务操作需要调用三个服务来完成。此时每个服务内部的数据一致性由本地事务来保证, 但是全局的数据一致性问题没法保证。
总结:一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题。
5.2 Seata简介
Seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务.
官网:http://seata.io/zh-cn/
分布式处理事务过程的 ID+三组件模型
- Transaction ID XID - 全局唯一的事务ID
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
图解
- TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一 的XID;
- XID 在微服务调用链路的上下文中传播;
- RM向TC注册分支事务,将其纳入XID对应全局事务的管辖;
- TM向TC发起针对XID的全局提交或回滚决议;
- TC调度XID下管辖的全部分支事务完成提交或回滚请求。
Seata的简单使用(怎么玩)
在spring的时候是通过注解@Transactional 管理本地事务
5.3 Seata-server的下载与安装
下载地址: https://github.com/seata/seata/releases
- 先从官网中下载好Seata-server并解压到文件夹
先备份file.conf,然后修改file.conf
修改service模块,1.0以后没有这个选项,就默认是my_test_tx_group分组
vgroup_mapping.my_test_tx_group = “fsp_tx_group”
修改store模块
store {
store mode: file、db
mode = “db”
…database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
url = "jdbc
//127.0.0.1:3306/seata"
user = "root"
password = "cvzhanshi"
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}
- 备份file.example.conf,然后修改file.example.conf,修改内容和file.conf一样
- 备份registry.conf,修改文件
设置注册进nacos中
type = "nacos"
nacos {
serverAddr = "localhost:8848"
namespace = ""
cluster = "default"
}
创建数据库和表
创建seata数据库
create database seata character set utf8;
use seata;创建seata数据库需要的表(三张表)
CREATE TABLE IF NOT EXISTS
global_table
(`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;CREATE TABLE IF NOT EXISTS
branch_table
(`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;CREATE TABLE IF NOT EXISTS
lock_table
(`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;启动nacos和seata测试
seate在nacos中注册成功,说明安装完成
5.4 订单/库存/账户业务数据库准备
业务说明:
这里我们会创建三个服务,一个订单服务, 一个库存服务, 一个账户服务。
当用户下单时,会在订单服务中创建一个订单 ,然后通过远程调用库存服务来扣减下单商品的库存,
再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成。该操作跨越三三个数据库,有两次远程调用,很明显会有分布式事务问题。
创建三个数据库
create database seata_order; —订单
create database seata_storage; —库存
create database seata_account; —账户在每个库下建表
seata_order库
CREATE TABLE t_order(
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
`product_id` BIGINT(11)DEFAULT NULL COMMENT '产品id',
`count` INT(11) DEFAULT NULL COMMENT '数量',
`money` DECIMAL(11,0) DEFAULT NULL COMMENT '金额',
`status` INT(1) DEFAULT NULL COMMENT '订单状态: 0:创建中; 1:已完结'
)ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
select * from t_order;
--回滚日志
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
seata_storage库
CREATE TABLE t_storage(
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
`total` INT(11) DEFAULT NULL COMMENT '总库存',
`used` INT(11) DEFAULT NULL COMMENT '已用库存',
`residue` INT(11) DEFAULT NULL COMMENT '剩余库存'
)ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO t_storage(`id`,`product_id`,`total`,`used`,`residue`)VALUES('1','1','100','0','100');
--回滚日志
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
seata_account库
CREATE TABLE t_account(
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',
`user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
`total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
`used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
`residue` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度'
)ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO t_account(`id`,`user_id`,`total`,`used`,`residue`)VALUES('1','1','1000','0','1000');
--回滚日志
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
结果:
5.5 订单/库存/账户业务微服务准备
业务需求:下订单->减库存->扣余额->改(订单)状态
5.5.1 订单模块
- 新建模块seata-order-service2001
导入依赖
com.alibaba.cloud
spring-cloud-starter-alibaba-nacos-discovery
com.alibaba.cloud
spring-cloud-starter-alibaba-seata
io.seata
seata-all
io.seata
seata-spring-boot-starter
io.seata
seata-all
1.2.0
io.seata
seata-spring-boot-starter
1.2.0
org.springframework.cloud
spring-cloud-starter-openfeign
org.mybatis.spring.boot
mybatis-spring-boot-starter
com.alibaba
druid-spring-boot-starter
mysql
mysql-connector-java
org.springframework.boot
spring-boot-starter-jdbc
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-actuator
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
编写配置文件
server:
port: 2001spring:
application:name: seata-order-service
cloud:
alibaba:
seata:
# 自定义事务组名称需要与seata-server中的对应
tx-service-group: my_test_tx_group #因为seata的file.conf文件中没有service模块,事务组名默认为my_test_tx_group
#service要与tx-service-group对齐,vgroupMapping和grouplist在service的下一级,my_test_tx_group在再下一级
service:
vgroupMapping:
#要和tx-service-group的值一致
my_test_tx_group: default
grouplist:
# seata seaver的 地址配置,此处可以集群配置是个数组
default: 192.168.1.104:8091
nacos:
discovery:
server-addr: localhost:8848 #nacos
datasource:
# 当前数据源操作类型
type: com.alibaba.druid.pool.DruidDataSource
# mysql驱动类
driver-class-name: com.mysql.jdbc.Driver
url: jdbc
//localhost:3306/seata_order
username: root
password: lian0911
feign:
hystrix:enabled: false
logging:
level:io:
seata: info
mybatis:
mapperLocations: classpath:mapper/.xml编写 file.conf 文件
内容在解压的seata中从conf中的README.md中有链接
transport {
# tcp, unix-domain-socket
type = "TCP"
#NIO, NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
vgroupMapping.my_test_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
tableMetaCheckerInterval = 60000
reportSuccessEnable = false
sagaBranchRegisterEnable = false
sagaJsonParser = jackson
sagaRetryPersistModeUpdate = false
sagaCompensatePersistModeUpdate = false
tccActionInterceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
defaultGlobalTransactionTimeout = 60000
degradeCheck = false
degradeCheckPeriod = 2000
degradeCheckAllowTimes = 10
interceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
}
undo {
dataValidation = true
onlyCareUpdateColumns = true
logSerialization = "jackson"
logTable = "undo_log"
compress {
enable = true
# allow zip, gzip, deflater, 7z, lz4, bzip2, default is zip
type = zip
# if rollback info size > threshold, then will be compress
# allow k m g t
threshold = 64k
}
}
loadBalance {
type = "RandomLoadBalance"
virtualNodes = 10
}
}
log {
exceptionRate = 100
}
- 编写 registry.conf
内容和file.conf一样查找
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
username = ""
password = ""
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
password = ""
timeout = "0"
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
serverAddr = "127.0.0.1:8500"
aclToken = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
custom {
name = ""
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3、springCloudConfig、custom
type = "file"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
dataId = "seata.properties"
}
consul {
serverAddr = "127.0.0.1:8500"
aclToken = ""
}
apollo {
appId = "seata-server"
apolloMeta = "http://192.168.1.204:8801"
namespace = "application"
apolloAccesskeySecret = ""
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
nodePath = "/seata/seata.properties"
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
custom {
name = ""
}
}
- domain包实体类编写
Order
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private Long id;
private Long userId;
private Long productId;
private Integer count;
private BigDecimal money;
private Integer status; // 订单状态 0:创建中 1:已完结
}
CommonResult
/**
* @author cVzhanshi
* @create 2021-07-12 21:30
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommonResult<T> {
private Integer code;
private String message;
private T data;
public CommonResult(Integer code, String message) {
this(code, message, null);
}
}
- dao层的编写
OrderDao
/**
* @author cVzhanshi
* @create 2021-07-12 21:33
*/
@Mapper
@Repository
public interface OrderDao {
//1 新建订单
void create(Order order);
//2 修改订单状态,从0改为1
void update(@Param("userId") Long userId, @Param("status") Integer status);
}
对应的映射文件classpath:mapper/OrderMapper.xml
<mapper namespace="cn.cvzhanshi.springcloud.alibaba.dao.OrderDao">
<resultMap id="BaseResultMap" type="cn.cvzhanshi.springcloud.alibaba.domain.Order">
<id column="id" property="id" jdbcType="BIGINT" />
<result column="user_id" property="userId" jdbcType="BIGINT" />
<result column="product_id" property="productId" jdbcType="BIGINT" />
<result column="count" property="count" jdbcType="INTEGER" />
<result column="money" property="money" jdbcType="DECIMAL" />
<result column="status" property="status" jdbcType="INTEGER" />
</resultMap>
<insert id="create" parameterType="cn.cvzhanshi.springcloud.alibaba.domain.Order"
useGeneratedKeys="true" keyProperty="id">
insert into t_order(`user_id`, `product_id`, `count`, `money`, `status`)
values(#{userId}, #{productId}, #{count}, #{money}, 0);
</insert>
<update id="update" parameterType="cn.cvzhanshi.springcloud.alibaba.domain.Order">
update t_order set `status` = 1
where `user_id` = #{userId} and `status` = #{status};
</update>
</mapper>
- service层的编写
AccountService
/**
* @author cVzhanshi
* @create 2021-07-12 21:57
*/
@Service
@FeignClient(value = "seata-account-service")
public interface AccountService {
@PostMapping(value = "/account/decrease")
CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}
StorageService
/**
* @author cVzhanshi
* @create 2021-07-12 21:56
*/
@Service
@FeignClient(value = "seata-storage-service")
public interface StorageService {
//减库存
@PostMapping(value = "/storage/decrease")
CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}
OrderService
/**
* @author cVzhanshi
* @create 2021-07-12 21:57
*/
public interface OrderService {
void create(Order order);
}
OrderServiceImpl
/**
* @author cVzhanshi
* @create 2021-07-12 22:04
*/
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderDao orderDao;
@Autowired
private AccountService accountService;
@Autowired
private StorageService storageService;
@Override
public void create(Order order) {
//1. 新建订单
log.info("-------> 开始新建订单");
orderDao.create(order);
//2. 扣减库存
log.info("-------> 订单微服务开始调用库存,做扣减count");
storageService.decrease(order.getProductId(), order.getCount());
log.info("-------> 订单微服务开始调用库存,做扣减完成");
//3. 扣减账号余额
log.info("-------> 订单微服务开始调用账号,做扣减money");
accountService.decrease(order.getUserId(), order.getMoney());
log.info("-------> 订单微服务开始调用账号,做扣减完成");
//4、修改状态
log.info("-------> 修改订单状态");
orderDao.update(order.getUserId(), 0);
log.info("-------> 修改订单状态完成");
log.info("-------> 新建订单完成");
}
}
controller层的编写
/**
- @author cVzhanshi
@create 2021-07-12 22:56
*/
@RestController
public class OrderController {@Autowired
private OrderService orderService;@GetMapping(“/order/create”)
public CommonResult create(Order order){orderService.create(order);
return new CommonResult(200, "订单创建成功!");
}
}
config配置类的编写
MybatisConfig
/**
* @author cVzhanshi
* @create 2021-07-12 22:58
*/
@MapperScan("cn.cvzhanshi.springcloud.alibaba.dao")
@Configuration
public class MybatisConfig {
}
DataSourceProxyConfig
/**
* @author cVzhanshi
* @create 2021-07-12 22:58
*/
//使用Seata对数据源进行代理
@Configuration
public class DataSourceProxyConfig {
@Value("${mybatis.mapperLocations}")
private String mapperLocations;
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource() {
return new DruidDataSource();
}
@Bean
public DataSourceProxy dataSourceProxy(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSourceProxy);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
bean.setMapperLocations(resolver.getResources(mapperLocations));
return bean.getObject();
}
}
主启动类
/**
- @author cVzhanshi
@create 2021-07-12 23:02
*/
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) //取消数据源的自动创建
@EnableDiscoveryClient
@EnableFeignClients
public class SeataOrderMain2001 {public static void main(String[] args) {
SpringApplication.run(SeataOrderMain2001.class,args);
}
}
启动测试
- 出现错误
- 出错原因
seata 版本问题, spring-cloud-alibaba 依赖下的seata-spring-boot-starter 版本是1.1.0,而我用的seata是 1.2.0 版本 。
1.2.0 版本SeataAutoConfiguration.class 没有seataDataSourceBeanPostProcessor bean ,而是用SeataAutoDataSourceProxyCreator代替
- 解决方法
去掉spring-cloud-starter-alibaba-seata 下seata-spring-boot-starter 依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<artifactId>spring-boot-starter</artifactId>
<groupId>org.springframework.boot</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.2.0</version>
</dependency>
- 问题解决,继续测试
5.5.2 库存模块
- 新建模块seata-storage-service2002
- 导入依赖和2001一样直接拷贝
编写配置文件
server:
port: 2002spring:
application:name: seata-storage-service
cloud:
alibaba:
seata:
# 自定义事务组名称需要与seata-server中的对应
tx-service-group: my_test_tx_group #因为seata的file.conf文件中没有service模块,事务组名默认为my_test_tx_group
#service要与tx-service-group对齐,vgroupMapping和grouplist在service的下一级,my_test_tx_group在再下一级
service:
vgroupMapping:
#要和tx-service-group的值一致
my_test_tx_group: default
grouplist:
# seata seaver的 地址配置,此处可以集群配置是个数组
default: 192.168.1.104:8091
nacos:
discovery:
server-addr: localhost:8848 #nacos
datasource:
# 当前数据源操作类型
type: com.alibaba.druid.pool.DruidDataSource
# mysql驱动类
driver-class-name: com.mysql.jdbc.Driver
url: jdbc
//localhost:3306/seata_storage
username: root
password: lian0911
feign:
hystrix:enabled: false
logging:
level:io:
seata: info
mybatis:
mapperLocations: classpath:mapper/*.xmlfile.conf 和registry.conf 与2001一样,直接拷贝
- domain层实体类
CommonResult
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommonResult<T> {
private Integer code;
private String message;
private T data;
public CommonResult(Integer code, String message) {
this(code, message, null);
}
}
Storage
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Storage {
private Long id;
//产品id
private Long productId;
//总库存
private Integer total;
//已用库存
private Integer used;
//剩余库存
private Integer residue;
}
- dao层的编写
StorageDao
/**
* @author cVzhanshi
* @create 2021-07-13 0:17
*/
@Mapper
@Repository
public interface StorageDao {
void decrease(@Param("productId") Long productId, @Param("count") Integer count);
}
对应的映射文件classpath:mapper/StorageMapper.xml
<mapper namespace="com.angenin.springcloud.dao.StorageDao">
<resultMap id="BaseResultMap" type="com.angenin.springcloud.domain.Storage">
<id column="id" property="id" jdbcType="BIGINT"/>
<result column="product_id" property="productId" jdbcType="BIGINT"/>
<result column="total" property="total" jdbcType="INTEGER"/>
<result column="used" property="used" jdbcType="INTEGER"/>
<result column="residue" property="residue" jdbcType="INTEGER"/>
</resultMap>
<update id="decrease">
update t_storage
set used = used + #{count}, residue = residue - #{count}
where product_id= #{productId};
</update>
</mapper>
- service层的编写
StorageService
/**
* @author cVzhanshi
* @create 2021-07-13 0:22
*/
public interface StorageService {
/**
* 扣减库存
* @param productId
* @param count
*/
void decrease(Long productId, Integer count);
}
StorageServiceImpl
/**
* @author cVzhanshi
* @create 2021-07-13 0:23
*/
@Service
public class StorageServiceImpl implements StorageService {
private static final Logger LOGGER = LoggerFactory.getLogger(StorageServiceImpl.class);
@Resource
private StorageDao storageDao;
@Override
public void decrease(Long productId, Integer count) {
LOGGER.info("----> StorageService中扣减库存");
storageDao.decrease(productId, count);
LOGGER.info("----> StorageService中扣减库存完成");
}
}
- controller层的编写
StorageController
@RestController
public class StorageController {
@Resource
private StorageService storageService;
@RequestMapping("/storage/decrease")
public CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count){
storageService.decrease(productId, count);
return new CommonResult(200, "扣减库存成功!");
}
}
- 配置类 MyBatisConfig 和 DataSourceProxyConfig 与2001一样,直接copy
主启动类
/**
- @author cVzhanshi
- @create 2021-07-13 0:29
*/
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@EnableFeignClients
@EnableDiscoveryClient
public class SeataStorageMain2002 {
public static void main(String[] args) {
SpringApplication.run(SeataStorageMain2002.class,args);
}
}
- 启动测试
5.5.3 账户模块
- 新建模块seata-account-service2003
- 导入依赖和2001一样直接拷贝
编写配置文件
server:
port: 2003spring:
application:name: seata-account-service
cloud:
alibaba:
seata:
# 自定义事务组名称需要与seata-server中的对应
tx-service-group: my_test_tx_group #因为seata的file.conf文件中没有service模块,事务组名默认为my_test_tx_group
#service要与tx-service-group对齐,vgroupMapping和grouplist在service的下一级,my_test_tx_group在再下一级
service:
vgroupMapping:
#要和tx-service-group的值一致
my_test_tx_group: default
grouplist:
# seata seaver的 地址配置,此处可以集群配置是个数组
default: 192.168.1.104:8091
nacos:
discovery:
server-addr: localhost:8848 #nacos
datasource:
# 当前数据源操作类型
type: com.alibaba.druid.pool.DruidDataSource
# mysql驱动类
driver-class-name: com.mysql.jdbc.Driver
url: jdbc
//localhost:3306/seata_account
username: root
password: lian0911
feign:
hystrix:enabled: false
logging:
level:io:
seata: info
mybatis:
mapperLocations: classpath:mapper/*.xmlfile.conf 和registry.conf 与2001一样,直接拷贝
- domain层实体类
CommonResult
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CommonResult<T> {
private Integer code;
private String message;
private T data;
public CommonResult(Integer code, String message) {
this(code, message, null);
}
}
Account
/**
* @author cVzhanshi
* @create 2021-07-13 0:37
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Account {
private Long id;
/**
* 用户id
*/
private Long userId;
/**
* 总额度
*/
private Integer total;
/**
* 已用额度
*/
private Integer used;
/**
* 剩余额度
*/
private Integer residue;
}
- dao 层的编写
AccountDao
@Mapper
@Repository
public interface AccountDao {
void decrease(@Param("userId") Long userId, @Param("money") BigDecimal money);
}
对应的映射文件classpath:mapper/ AccountMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.cvzhanshi.springcloud.alibaba.dao.AccountDao">
<resultMap id="BaseResultMap" type="cn.cvzhanshi.springcloud.alibaba.domain.Account">
<id column="id" property="id" jdbcType="BIGINT"/>
<result column="user_id" property="userId" jdbcType="BIGINT"/>
<result column="total" property="total" jdbcType="DECIMAL"/>
<result column="used" property="used" jdbcType="DECIMAL"/>
<result column="residue" property="residue" jdbcType="DECIMAL"/>
</resultMap>
<update id="decrease">
update t_account
set used = used + #{money}, residue = residue - #{money}
where user_id = #{userId};
</update>
</mapper>
- service层的编写
AccountService
public interface AccountService {
void decrease(Long userId, BigDecimal money);
}
AccountServiceImpl
/**
* @author cVzhanshi
* @create 2021-07-13 0:43
*/
@Service
public class AccountServiceImpl implements AccountService {
private static final Logger LOGGER = LoggerFactory.getLogger(AccountServiceImpl.class);
@Resource
private AccountDao accountDao;
@Override
public void decrease(Long userId, BigDecimal money) {
LOGGER.info("---> AccountService中扣减账户余额");
accountDao.decrease(userId, money);
LOGGER.info("---> AccountService中扣减账户余额完成");
}
}
controller层的编写
/**
- @author cVzhanshi
- @create 2021-07-13 0:44
*/
@RestController
public class AccountController {
@Autowired
private AccountService accountService;
@RequestMapping("/account/decrease")
public CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money){
accountService.decrease(userId, money);
return new CommonResult(200, "扣减库存成功!");
}
}
主启动类
/**
- @author cVzhanshi
- @create 2021-07-13 0:46
*/
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@EnableFeignClients
@EnableDiscoveryClient
public class SeataAccountMain2003 {
public static void main(String[] args) {
SpringApplication.run(SeataAccountMain2003.class,args);
}
}
- 启动测试
环境准备完毕
5.5.4 Test
- 正常下单
- 启动nacos、seata、2001、2002、2003
- 访问 http://localhost:2001/order/create?userId=1&productId=1&count=10&money=10
- 查看数据库中的数据变化
order表
account表
storage表
输入日志
- 超时异常
- 停止2003
在2003的AccountServiceImpl里的decrease中添加异常代码
@Override
public void decrease(Long userId, BigDecimal money) {LOGGER.info(“—-> AccountService中扣减账户余额”);
//模拟超时异常,暂停20秒
try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
accountDao.decrease(userId, money);
LOGGER.info(“—-> AccountService中扣减账户余额完成”);
}重新启动2003
- 继续访问
http://localhost:2001/order/create?userId=1&productId=1&count=10&money=10
- 查看数据库
storage表
account表
order表
故障原因:当库存和账户金额扣减后,订单状态并没有设置为已经完成,没有从零改为1
而且由于feign的重试机制,账户余额还有可能被多次扣减
- 超市异常,添加 @GlobalTransactional 注解
- 停止2001
在2001的OderServiceImpl里的create方法上加上 @GlobalTransactional
@Override
//name随便命名,只要不重复即可
//rollbackFor = Exception.class表示出现所有异常都回滚
//rollbackFor表示哪些需要回滚
//noRollbackFor表示哪些不需要回滚
@GlobalTransactional(name = “test-create-order”, rollbackFor = Exception.class)
public void create(Order order) {
}
重启2001。
- 刷新页面
http://localhost:2001/order/create?userId=1&productId=1&count=10&money=10
- 查看数据库
order表
account表
storage表
order表没有插入,相应的账户表和库存表都没有改变,回滚成功
5.6 Seata的再理解
5.6.1 图解TC/TM/RM三组件
- 官方图解
- 自身理解
5.6.2 分布式事务的执行流程
- TM开启分布式事务(TM 向TC注册全局事务记录) ;
- 按业务场景,编排数据库、服务等事务内资源(RM向TC汇报资源准备状态) ;
- TM结束分布式事务,事务一阶段结束(TM通知TC提交/回滚分布式事务) ;
- TC汇总事务信息,决定分布式事务是提交还是回滚;
- TC通知所有RM提交/回滚资源,事务二阶段结束。
模式分类
AT 模式
提供无侵入自动补偿的事务模式,目前已支持 MySQL、 Oracle 、PostgreSQL和 TiDB的AT模式,H2 开发中
TCC 模式
支持 TCC 模式并可与 AT 混用,灵活度更高
SAGA 模式
为长事务提供有效的解决方案
AT模式详解
整体机制 两阶段提交协议的演变:
- 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
在一阶段, Seata 会拦截“业务SQL”,
1、解析SQL语义,找到“业务SQL”要更新的业务数据,在业务数据被更新前,将其保存成”before image”,
2、执行“业务SQL”更新业务数据,在业务数据更新之后,
3、其保存成”after image” ,最后生成行锁。
以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。
二阶段:
- 提交异步化,非常快速地完成。
二阶段如是顺利提交的话,
因为“业务SQL”在一阶段已经提交至数据库,所以Seata框架只需将一阶段保存的快照数据和行锁删掉, 完成数据清理即可。- 回滚通过一阶段的回滚日志进行反向补偿
二阶段回滚: .
二阶段如果是回滚的话, Seata就需要回滚一阶段已经执行的“业务SQL” ,还原业务数据。
回滚方式便是用”before image”还原业务数据;但在还原前要首先要校验脏写,对比”数据库当前业务数据”和”after image”,如果两份数据完全一致就说明没有脏写, 可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。
简略流程图
还没有评论,来说两句吧...