SpringCloud-Alibaba之分布式事务Seata

我不是女神ヾ 2023-10-01 21:23 86阅读 0赞

SpringCloud-Alibaba之分布式事务Seata

目录

一、Seata介绍

二、使用Seata实现分布式事务控制

2.1 订单微服务实现下单接口

2.2 商品微服务实现扣减库存接口

2.3 暴露出分布式事务问题

2.3.1 正常测试

2.3.2 异常测试

2.4 Seata解决分布式事务问题

2.4.1 下载Seata

2.4.2 修改配置文件

2.4.3 初始化seata在nacos的配置

2.4.4 启动seata服务

2.4.5 添加undo_log表

2.4.6 微服务集成seata事务控制

2.4.7 总结


一、Seata介绍

Seata的设计目标是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进。它把一个分布式事务理解成一个包含了若干分支事务的全局事务。如下所示:

img

全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务。

Seata主要由三个重要组件组成:

  • TC:Transaction Coordinator 事务协调器,管理全局的分支事务的状态,用于全局性事务的提交和回滚。也就是Seata独立运行的服务端。
  • TM:Transaction Manager 事务管理器,用于开启、提交或者回滚全局事务。
  • RM:Resource Manager 资源管理器,用于分支事务上的资源管理,向TC注册分支事务,上报分支事务的状态,接受TC的命令来提交或者回滚分支事务。

img

Seata的执行流程如下:

  1. A服务的TM向TC申请开启一个全局事务,TC就会创建一个全局事务并返回一个唯一的XID
  2. A服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖
  3. A服务执行分支事务,向数据库做操作
  4. A服务开始远程调用B服务,此时XID会在微服务的调用链上传播
  5. B服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖
  6. B服务执行分支事务,向数据库做操作
  7. 全局事务调用链处理完毕,TM根据有无异常向TC发起全局事务的提交或者回滚
  8. TC协调其管辖之下的所有分支事务,决定提交还是回滚

Seata实现2PC与传统2PC的差别:

  1. 架构层次不同,传统2PC方案的RM实际上是在数据库层,即RM本质上就是数据库本身。而Seata的RM是以jar包的形式作为中间件层部署在应用程序这一侧的。
  2. 保持资源锁时间不同,传统的2PC需要将资源锁保持到第二阶段完成后才释放。而Seata的做法是第一阶段就将本地事务提交了,缩短了保持资源锁的时间。

二、使用Seata实现分布式事务控制

下面示例通过Seata中间件实现分布式事务,模拟电商中的下单和减库存的过程。

我们通过订单微服务执行下单操作,然后由订单微服务调用商品微服务扣减库存。如下:

img

2.1 订单微服务实现下单接口

controller层:

  1. package cn.jack.controller;
  2. import cn.jack.domain.Order;
  3. import cn.jack.service.impl.OrderServiceImpl5;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.PathVariable;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. @RestController
  10. @Slf4j
  11. public class OrderController5 {
  12. @Autowired
  13. private OrderServiceImpl5 orderServiceImpl5;
  14. /**
  15. * 下单
  16. * @param pid 商品id
  17. * @return
  18. */
  19. @RequestMapping("/order/prod/{pid}")
  20. public Order order(@PathVariable("pid") Long pid) {
  21. return this.orderServiceImpl5.createOrder(pid);
  22. }
  23. }

service层:

  1. package cn.jack.service.impl;
  2. import cn.jack.dao.OrderDao;
  3. import cn.jack.domain.Order;
  4. import cn.jack.domain.Product;
  5. import cn.jack.service.ProductService;
  6. import com.alibaba.fastjson.JSON;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Service;
  11. @Service
  12. @Slf4j
  13. public class OrderServiceImpl5 {
  14. @Autowired
  15. private OrderDao orderDao;
  16. @Autowired
  17. private ProductService productService;
  18. @Autowired
  19. private RocketMQTemplate rocketMQTemplate;
  20. public Order createOrder(Long pid) {
  21. log.info("收到下单请求,准备查询商品信息。pid={}", pid);
  22. // TODO 通过Feign调用商品微服务,查询商品信息
  23. Product product = this.productService.findByPid(pid);
  24. log.info("商品信息查询成功。内容为:{}", JSON.toJSONString(product));
  25. // 进行容错判断
  26. if (product.getPid() == -100) {
  27. Order order = new Order();
  28. order.setOid(-100L);
  29. order.setPname("下单失败");
  30. return order;
  31. }
  32. // TODO 生成订单信息保存
  33. Order order = new Order();
  34. order.setNumber(1);
  35. order.setPid(pid);
  36. order.setPname(product.getPname());
  37. order.setPprice(product.getPprice());
  38. order.setUid(1);
  39. order.setUsername("陈家宝");
  40. this.orderDao.save(order);
  41. log.info("订单信息保存成功。内容为:{}", JSON.toJSONString(order));
  42. // TODO 调用商品微服务,扣减库存
  43. this.productService.reduceInventory(pid, order.getNumber());
  44. // TODO 下单成功之后,mq发送订单内容消息
  45. rocketMQTemplate.convertAndSend("jack-topic", order);
  46. return order;
  47. }
  48. }
  49. 2.2 商品微服务实现扣减库存接口
  50. controller层:
  51. package cn.jack.controller;
  52. import cn.jack.domain.Product;
  53. import cn.jack.service.ProductService;
  54. import com.alibaba.fastjson.JSON;
  55. import lombok.extern.slf4j.Slf4j;
  56. import org.springframework.beans.factory.annotation.Autowired;
  57. import org.springframework.web.bind.annotation.PathVariable;
  58. import org.springframework.web.bind.annotation.RequestMapping;
  59. import org.springframework.web.bind.annotation.RequestParam;
  60. import org.springframework.web.bind.annotation.RestController;
  61. @RestController
  62. @Slf4j
  63. public class ProductController {
  64. @Autowired
  65. private ProductService productService;
  66. @RequestMapping("/product/{pid}")
  67. public Product product(@PathVariable("pid") Integer pid) {
  68. log.info("开始查询商品信息。pid={}", pid);
  69. Product product = productService.findByPid(pid);
  70. log.info("商品信息查询成功:{}", JSON.toJSONString(product));
  71. return product;
  72. }
  73. /**
  74. * 扣减商品库存
  75. * @param pid 商品ID
  76. * @param number 扣除数量
  77. */
  78. @RequestMapping("/product/reduceInventory")
  79. public void reduceInventory(@RequestParam("pid") Integer pid, @RequestParam("number") Integer number) {
  80. this.productService.reduceInventory(pid, number);
  81. }
  82. }
  83. Service层:
  84. package cn.jack.service.impl;
  85. import cn.jack.dao.ProductDao;
  86. import cn.jack.domain.Product;
  87. import cn.jack.service.ProductService;
  88. import org.springframework.beans.factory.annotation.Autowired;
  89. import org.springframework.stereotype.Service;
  90. @Service
  91. public class ProductServiceImpl implements ProductService {
  92. @Autowired
  93. private ProductDao productDao;
  94. @Override
  95. public Product findByPid(Integer pid) {
  96. return this.productDao.findById(pid).get();
  97. }
  98. @Override
  99. public void reduceInventory(Integer pid, Integer number) {
  100. // TODO 根据商品id,查询商品信息
  101. Product product = this.productDao.findById(pid).get();
  102. // TODO 省略校验,直接扣减库存
  103. product.setStock(product.getStock() - number);
  104. this.productDao.save(product);
  105. }
  106. }
  107. 2.3 暴露出分布式事务问题
  108. 2.3.1 正常测试
  109. 访问商品下单接口,生成订单记录,商品库存扣除正常。
  110. 2.3.2 异常测试
  111. 模拟商品库存扣除出现异常,此时订单信息保存成功,而商品库存没有扣除。出现分布式事务问题。
  112. 商品微服务service层修改如下:
  113. package cn.jack.service.impl;
  114. import cn.jack.dao.ProductDao;
  115. import cn.jack.domain.Product;
  116. import cn.jack.service.ProductService;
  117. import org.springframework.beans.factory.annotation.Autowired;
  118. import org.springframework.stereotype.Service;
  119. @Service
  120. public class ProductServiceImpl implements ProductService {
  121. @Autowired
  122. private ProductDao productDao;
  123. @Override
  124. public Product findByPid(Integer pid) {
  125. return this.productDao.findById(pid).get();
  126. }
  127. @Override
  128. public void reduceInventory(Integer pid, Integer number) {
  129. // TODO 根据商品id,查询商品信息
  130. Product product = this.productDao.findById(pid).get();
  131. // TODO 模拟异常
  132. int i = 1 / 0;
  133. // TODO 省略校验,直接扣减库存
  134. product.setStock(product.getStock() - number);
  135. this.productDao.save(product);
  136. }
  137. }
  138. 访问下单链接,进行测试:
  139. 2.4 Seata解决分布式事务问题
  140. seata官方demo地址:http://seata.io/zh-cn/docs/user/quickstart.html
  141. 客户端集成:https://github.com/seata/seata-samples/blob/master/doc/quick-integration-with-spring-cloud.md
  142. 2.4.1 下载Seata
  143. 下载链接:https://github.com/seata/seata/releases/tag/v1.3.0,根据需要进行下载,这里我下载zip版本。
  144. 2.4.2 修改配置文件
  145. registry.conf该配置用于指定 TC 的注册中心和配置文件,默认都是 file; 如果使用其他的注册中心,要求 Seata-Server 也注册到该配置中心上。将下载到的压缩包进行解压,进入到conf目录,修改配置文件:
  146. registry.conf
  147. registry {
  148. # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  149. type = "nacos"
  150. nacos {
  151. application = "serverAddr" # seata-server
  152. serverAddr = "127.0.0.1:8848"
  153. group = "DEFAULT_GROUP" # SEATA_GROUP
  154. namespace = ""
  155. cluster = "default"
  156. username = ""
  157. password = ""
  158. }
  159. }
  160. config {
  161. # file、nacos 、apollo、zk、consul、etcd3
  162. type = "nacos"
  163. nacos {
  164. serverAddr = "127.0.0.1:8848"
  165. namespace = ""
  166. group = "SEATA_GROUP"
  167. username = ""
  168. password = ""
  169. }
  170. }
  171. 注意:我微服务中引入的seata依赖spring-cloud-starter-alibaba-seata版本是2.1.1.RELEASE,这里配置文件中的registry.nacos.application必须是serverAddrgroup必须是DEFAULT_GROUP,因为该版本的seata客户端获取注册在nacosseata服务时,写死的参数。所以我们需要把seata注册在nacos的默认分组下,服务名称为serverAddr
  172. 2.4.3 初始化seatanacos的配置
  173. 通过看seata提供的README.md得知,使用nacos作为注册、配置中心。需要将一些数据初始化到nacos上。
  174. config.txt文件地址:https://github.com/seata/seata/blob/develop/script/config-center/config.txt
  175. 初始化脚本地址:https://github.com/seata/seata/blob/develop/script/config-center/nacos/nacos-config.sh
  176. 为了后续查看方便,贴上两个文件的内容。
  177. config.txt
  178. transport.type=TCP
  179. transport.server=NIO
  180. transport.heartbeat=true
  181. transport.enableClientBatchSendRequest=false
  182. transport.threadFactory.bossThreadPrefix=NettyBoss
  183. transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
  184. transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
  185. transport.threadFactory.shareBossWorker=false
  186. transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
  187. transport.threadFactory.clientSelectorThreadSize=1
  188. transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
  189. transport.threadFactory.bossThreadSize=1
  190. transport.threadFactory.workerThreadSize=default
  191. transport.shutdown.wait=3
  192. service.vgroup_mapping.service-product=default
  193. service.vgroup_mapping.service-order=default
  194. service.vgroup_mapping.my_test_tx_group=default
  195. service.vgroupMapping.my_test_tx_group=default
  196. service.default.grouplist=127.0.0.1:9000
  197. service.enableDegrade=false
  198. service.disableGlobalTransaction=false
  199. client.rm.asyncCommitBufferLimit=10000
  200. client.rm.lock.retryInterval=10
  201. client.rm.lock.retryTimes=30
  202. client.rm.lock.retryPolicyBranchRollbackOnConflict=true
  203. client.rm.reportRetryCount=5
  204. client.rm.tableMetaCheckEnable=false
  205. client.rm.sqlParserType=druid
  206. client.rm.reportSuccessEnable=false
  207. client.rm.sagaBranchRegisterEnable=false
  208. client.tm.commitRetryCount=5
  209. client.tm.rollbackRetryCount=5
  210. client.tm.defaultGlobalTransactionTimeout=60000
  211. client.tm.degradeCheck=false
  212. client.tm.degradeCheckAllowTimes=10
  213. client.tm.degradeCheckPeriod=2000
  214. store.mode=file
  215. store.file.dir=file_store/data
  216. store.file.maxBranchSessionSize=16384
  217. store.file.maxGlobalSessionSize=512
  218. store.file.fileWriteBufferCacheSize=16384
  219. store.file.flushDiskMode=async
  220. store.file.sessionReloadReadSize=100
  221. store.db.datasource=druid
  222. store.db.dbType=mysql
  223. store.db.driverClassName=com.mysql.jdbc.Driver
  224. store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
  225. store.db.user=username
  226. store.db.password=password
  227. store.db.minConn=5
  228. store.db.maxConn=30
  229. store.db.globalTable=global_table
  230. store.db.branchTable=branch_table
  231. store.db.queryLimit=100
  232. store.db.lockTable=lock_table
  233. store.db.maxWait=5000
  234. store.redis.host=127.0.0.1
  235. store.redis.port=6379
  236. store.redis.maxConn=10
  237. store.redis.minConn=1
  238. store.redis.database=0
  239. store.redis.password=null
  240. store.redis.queryLimit=100
  241. server.recovery.committingRetryPeriod=1000
  242. server.recovery.asynCommittingRetryPeriod=1000
  243. server.recovery.rollbackingRetryPeriod=1000
  244. server.recovery.timeoutRetryPeriod=1000
  245. server.maxCommitRetryTimeout=-1
  246. server.maxRollbackRetryTimeout=-1
  247. server.rollbackRetryTimeoutUnlockEnable=false
  248. client.undo.dataValidation=true
  249. client.undo.logSerialization=jackson
  250. client.undo.onlyCareUpdateColumns=true
  251. server.undo.logSaveDays=7
  252. server.undo.logDeletePeriod=86400000
  253. client.undo.logTable=undo_log
  254. client.log.exceptionRate=100
  255. transport.serialization=seata
  256. transport.compressor=none
  257. metrics.enabled=false
  258. metrics.registryType=compact
  259. metrics.exporterList=prometheus
  260. metrics.exporterPrometheusPort=9898
  261. 注意:
  262. 我这里让seata监听9000端口,默认的是8091。修改配置service.default.grouplist=127.0.0.1:9000
  263. 有多个微服务,就需要添加多少条此配置service.vgroup_mapping.微服务名称=default。例如,我有个微服务名称为service-product,则需要加条配置:service.vgroup_mapping.service-product=default。后续会在微服务的bootstrap.yml中指定事务服务分组名称为微服务名称,tx-service-group: ${
  264. spring.application.name}
  265. 初始化脚本nacos-config.sh
  266. #!/usr/bin/env bash
  267. # Copyright 1999-2019 Seata.io Group.
  268. #
  269. # Licensed under the Apache License, Version 2.0 (the "License");
  270. # you may not use this file except in compliance with the License.
  271. # You may obtain a copy of the License at、
  272. #
  273. # http://www.apache.org/licenses/LICENSE-2.0
  274. #
  275. # Unless required by applicable law or agreed to in writing, software
  276. # distributed under the License is distributed on an "AS IS" BASIS,
  277. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  278. # See the License for the specific language governing permissions and
  279. # limitations under the License.
  280. while getopts ":h:p:g:t:u:w:" opt
  281. do
  282. case $opt in
  283. h)
  284. host=$OPTARG
  285. ;;
  286. p)
  287. port=$OPTARG
  288. ;;
  289. g)
  290. group=$OPTARG
  291. ;;
  292. t)
  293. tenant=$OPTARG
  294. ;;
  295. u)
  296. username=$OPTARG
  297. ;;
  298. w)
  299. password=$OPTARG
  300. ;;
  301. ?)
  302. echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
  303. exit 1
  304. ;;
  305. esac
  306. done
  307. if [[ -z ${
  308. host} ]]; then
  309. host=localhost
  310. fi
  311. if [[ -z ${
  312. port} ]]; then
  313. port=8848
  314. fi
  315. if [[ -z ${
  316. group} ]]; then
  317. group="SEATA_GROUP"
  318. fi
  319. if [[ -z ${
  320. tenant} ]]; then
  321. tenant=""
  322. fi
  323. if [[ -z ${
  324. username} ]]; then
  325. username=""
  326. fi
  327. if [[ -z ${
  328. password} ]]; then
  329. password=""
  330. fi
  331. nacosAddr=$host:$port
  332. contentType="content-type:application/json;charset=UTF-8"
  333. echo "set nacosAddr=$nacosAddr"
  334. echo "set group=$group"
  335. failCount=0
  336. tempLog=$(mktemp -u)
  337. function addConfig() {
  338. curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$1&group=$group&content=$2&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
  339. if [[ -z $(cat "${tempLog}") ]]; then
  340. echo " Please check the cluster status. "
  341. exit 1
  342. fi
  343. if [[ $(cat "${tempLog}") =~ "true" ]]; then
  344. echo "Set $1=$2 successfully "
  345. else
  346. echo "Set $1=$2 failure "
  347. (( failCount++ ))
  348. fi
  349. }
  350. count=0
  351. #for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
  352. # 在windows环境下读取路径失败,我这里是直接写死了config.txt路径
  353. for line in $(cat C:/Develop/seata-1.3.0/conf/config.txt | sed s/[[:space:]]//g); do
  354. (( count++ ))
  355. key=${
  356. line%%=*}
  357. value=${
  358. line#*=}
  359. addConfig "${key}" "${value}"
  360. done
  361. echo "========================================================================="
  362. echo " Complete initialization parameters, total-count:$count , failure-count:$failCount "
  363. echo "========================================================================="
  364. if [[ ${
  365. failCount} -eq 0 ]]; then
  366. echo " Init nacos config finished, please start seata-server. "
  367. else
  368. echo " init nacos config fail. "
  369. fi
  370. read -n 1
  371. 注意:此脚本在windows环境下,读取config.txt文件路径失败,我这里是直接写死路径。
  372. 执行以下命令进行初始化:
  373. # 将seata的配置写入nacos中
  374. # 注意:这里要保证nacos已经正常运行
  375. cd conf
  376. nacos-config.sh 127.0.0.1
  377. 正常输出结果如图所示:
  378. nacos中就可以看到初始化过来的配置:
  379. 2.4.4 启动seata服务
  380. cd bin
  381. seata-server.bat -p 9000
  382. 启动完成,在nacos的服务列表可以看到seata-server的服务。
  383. 2.4.5 添加undo_log
  384. 在我们的数据库中加入一张undo_log表,这是seata记录事务日志要用到的表。表结构如下:
  385. -- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
  386. CREATE TABLE `undo_log` (
  387. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  388. `branch_id` bigint(20) NOT NULL,
  389. `xid` varchar(100) NOT NULL,
  390. `context` varchar(128) NOT NULL,
  391. `rollback_info` longblob NOT NULL,
  392. `log_status` int(11) NOT NULL,
  393. `log_created` datetime NOT NULL,
  394. `log_modified` datetime NOT NULL,
  395. `ext` varchar(100) DEFAULT NULL,
  396. PRIMARY KEY (`id`),
  397. UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
  398. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
  399. 2.4.6 微服务集成seata事务控制
  400. 第一步:添加依赖
  401. <!--seata-->
  402. <dependency>
  403. <groupId>com.alibaba.cloud</groupId>
  404. <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
  405. </dependency>
  406. 第二步:添加代理数据源
  407. Seata是通过代理数据源实现事务分支的,所以需要配置io.seata.rm.datasource.DataSourceProxy Bean,且是@Primary默认的数据源,否则事务不会回滚,无法实现分布式事务。
  408. package cn.jack.config;
  409. import com.alibaba.druid.pool.DruidDataSource;
  410. import io.seata.rm.datasource.DataSourceProxy;
  411. import org.springframework.boot.context.properties.ConfigurationProperties;
  412. import org.springframework.context.annotation.Bean;
  413. import org.springframework.context.annotation.Configuration;
  414. import org.springframework.context.annotation.Primary;
  415. @Configuration
  416. public class DataSourceProxyConfig {
  417. @Bean
  418. @ConfigurationProperties(prefix = "spring.datasource")
  419. public DruidDataSource druidDataSource() {
  420. return new DruidDataSource();
  421. }
  422. @Primary
  423. @Bean
  424. public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
  425. return new DataSourceProxy(druidDataSource);
  426. }
  427. }
  428. 第三步:在微服务的resources下添加Seata的配置文件registry.conf
  429. registry {
  430. # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  431. type = "nacos"
  432. nacos {
  433. application = "serverAddr" # 不起作用,源码中写死了:serverAddr
  434. serverAddr = "127.0.0.1:8848"
  435. group = "DEFAULT_GROUP" # 不起作用,源码中写死了:DEFAULT_GROUP
  436. namespace = ""
  437. cluster = "default"
  438. username = ""
  439. password = ""
  440. }
  441. }
  442. config {
  443. # file、nacos 、apollo、zk、consul、etcd3
  444. type = "nacos"
  445. nacos {
  446. serverAddr = "127.0.0.1:8848"
  447. namespace = ""
  448. group = "SEATA_GROUP"
  449. username = ""
  450. password = ""
  451. }
  452. }
  453. 第四步:bootstrap.yml中添加seata配置
  454. spring:
  455. application:
  456. name: service-product
  457. cloud:
  458. nacos:
  459. config:
  460. server-addr: 127.0.0.1:8848 # Nacos配置中心的地址
  461. file-extension: yaml # 配置的格式
  462. shared-dataids: all-service.yaml # 要引入的配置
  463. refreshable-dataids: all-service.yaml # 动态刷新时也刷新引入的配置
  464. alibaba:
  465. seata:
  466. tx-service-group: ${
  467. spring.application.name} # 和服务名称相同
  468. profiles:
  469. active: test # 环境标识
  470. 第五步:下单接口服务层开启全局事务
  471. package cn.jack.service.impl;
  472. import cn.jack.dao.OrderDao;
  473. import cn.jack.domain.Order;
  474. import cn.jack.domain.Product;
  475. import cn.jack.service.ProductService;
  476. import com.alibaba.fastjson.JSON;
  477. import io.seata.spring.annotation.GlobalTransactional;
  478. import lombok.extern.slf4j.Slf4j;
  479. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  480. import org.springframework.beans.factory.annotation.Autowired;
  481. import org.springframework.stereotype.Service;
  482. @Service
  483. @Slf4j
  484. public class OrderServiceImpl5 {
  485. @Autowired
  486. private OrderDao orderDao;
  487. @Autowired
  488. private ProductService productService;
  489. @Autowired
  490. private RocketMQTemplate rocketMQTemplate;
  491. @GlobalTransactional // 全局事务控制
  492. public Order createOrder(Long pid) {
  493. log.info("收到下单请求,准备查询商品信息。pid={}", pid);
  494. // TODO 通过Feign调用商品微服务,查询商品信息
  495. Product product = this.productService.findByPid(pid);
  496. log.info("商品信息查询成功。内容为:{}", JSON.toJSONString(product));
  497. // 进行容错判断
  498. if (product.getPid() == -100) {
  499. Order order = new Order();
  500. order.setOid(-100L);
  501. order.setPname("下单失败");
  502. return order;
  503. }
  504. // TODO 生成订单信息保存
  505. Order order = new Order();
  506. order.setNumber(1);
  507. order.setPid(pid);
  508. order.setPname(product.getPname());
  509. order.setPprice(product.getPprice());
  510. order.setUid(1);
  511. order.setUsername("陈家宝");
  512. this.orderDao.save(order);
  513. log.info("订单信息保存成功。内容为:{}", JSON.toJSONString(order));
  514. // TODO 调用商品微服务,扣减库存
  515. this.productService.reduceInventory(pid, order.getNumber());
  516. // TODO 下单成功之后,mq发送订单内容消息
  517. rocketMQTemplate.convertAndSend("jack-topic", order);
  518. return order;
  519. }
  520. }
  521. 第六步:测试
  522. 再次下单测试。扣减库存失败后,商品订单数据回滚,达到了全局事务的一致性。
  523. 2.4.7 总结
  524. Seata运行流程分析
  525. 服务之间调用的问题
  526. 服务之间的调用使用了Feign,如果配置了容错类(fallback)或者容错工厂(fallbackFactory),则全局事务“不生效”。即商品微服务中扣减库存出现异常后,订单微服务的订单不会回滚。
  527. 可以理解为Seata决定全局事务是否回滚的方式,就是看开启全局事务的方法(加了@GlobalTransactional注解的方法)是否抛出异常,如果该方法抛了异常,则回滚所有分支事务。而如果Feign调用商品微服务(ProductService)配置了容错类的话,虽然调用的商品微服务方法抛出了异常,但是进入了容错的方法,也就是说开启全局事务的方法(此处为下订单的方法createOrder)并没有异常,所以订单不会回滚。

2.2 商品微服务实现扣减库存接口

controller层:

  1. package cn.jack.controller;
  2. import cn.jack.domain.Product;
  3. import cn.jack.service.ProductService;
  4. import com.alibaba.fastjson.JSON;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.PathVariable;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. import org.springframework.web.bind.annotation.RequestParam;
  10. import org.springframework.web.bind.annotation.RestController;
  11. @RestController
  12. @Slf4j
  13. public class ProductController {
  14. @Autowired
  15. private ProductService productService;
  16. @RequestMapping("/product/{pid}")
  17. public Product product(@PathVariable("pid") Integer pid) {
  18. log.info("开始查询商品信息。pid={}", pid);
  19. Product product = productService.findByPid(pid);
  20. log.info("商品信息查询成功:{}", JSON.toJSONString(product));
  21. return product;
  22. }
  23. /**
  24. * 扣减商品库存
  25. * @param pid 商品ID
  26. * @param number 扣除数量
  27. */
  28. @RequestMapping("/product/reduceInventory")
  29. public void reduceInventory(@RequestParam("pid") Integer pid, @RequestParam("number") Integer number) {
  30. this.productService.reduceInventory(pid, number);
  31. }
  32. }

Service层:

  1. package cn.jack.service.impl;
  2. import cn.jack.dao.ProductDao;
  3. import cn.jack.domain.Product;
  4. import cn.jack.service.ProductService;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Service;
  7. @Service
  8. public class ProductServiceImpl implements ProductService {
  9. @Autowired
  10. private ProductDao productDao;
  11. @Override
  12. public Product findByPid(Integer pid) {
  13. return this.productDao.findById(pid).get();
  14. }
  15. @Override
  16. public void reduceInventory(Integer pid, Integer number) {
  17. // TODO 根据商品id,查询商品信息
  18. Product product = this.productDao.findById(pid).get();
  19. // TODO 省略校验,直接扣减库存
  20. product.setStock(product.getStock() - number);
  21. this.productDao.save(product);
  22. }
  23. }

2.3 暴露出分布式事务问题

2.3.1 正常测试

访问商品下单接口,生成订单记录,商品库存扣除正常。

img

2.3.2 异常测试

模拟商品库存扣除出现异常,此时订单信息保存成功,而商品库存没有扣除。出现分布式事务问题。

商品微服务service层修改如下:

  1. package cn.jack.service.impl;
  2. import cn.jack.dao.ProductDao;
  3. import cn.jack.domain.Product;
  4. import cn.jack.service.ProductService;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Service;
  7. @Service
  8. public class ProductServiceImpl implements ProductService {
  9. @Autowired
  10. private ProductDao productDao;
  11. @Override
  12. public Product findByPid(Integer pid) {
  13. return this.productDao.findById(pid).get();
  14. }
  15. @Override
  16. public void reduceInventory(Integer pid, Integer number) {
  17. // TODO 根据商品id,查询商品信息
  18. Product product = this.productDao.findById(pid).get();
  19. // TODO 模拟异常
  20. int i = 1 / 0;
  21. // TODO 省略校验,直接扣减库存
  22. product.setStock(product.getStock() - number);
  23. this.productDao.save(product);
  24. }
  25. }

访问下单链接,进行测试:

img

2.4 Seata解决分布式事务问题

seata官方demo地址:http://seata.io/zh-cn/docs/user/quickstart.html

客户端集成:https://github.com/seata/seata-samples/blob/master/doc/quick-integration-with-spring-cloud.md

2.4.1 下载Seata

下载链接:https://github.com/seata/seata/releases/tag/v1.3.0,根据需要进行下载,这里我下载zip版本。

img

2.4.2 修改配置文件

registry.conf该配置用于指定 TC 的注册中心和配置文件,默认都是 file; 如果使用其他的注册中心,要求 Seata-Server 也注册到该配置中心上。将下载到的压缩包进行解压,进入到conf目录,修改配置文件:

  • registry.conf

    registry {

    file 、nacos 、eureka、redis、zk、consul、etcd3、sofa

    type = “nacos”

    nacos {

    1. application = "serverAddr" # seata-server
    2. serverAddr = "127.0.0.1:8848"
    3. group = "DEFAULT_GROUP" # SEATA_GROUP
    4. namespace = ""
    5. cluster = "default"
    6. username = ""
    7. password = ""

    }
    }

    config {

    file、nacos 、apollo、zk、consul、etcd3

    type = “nacos”

    nacos {

    1. serverAddr = "127.0.0.1:8848"
    2. namespace = ""
    3. group = "SEATA_GROUP"
    4. username = ""
    5. password = ""

    }
    }

注意:我微服务中引入的seata依赖spring-cloud-starter-alibaba-seata版本是2.1.1.RELEASE,这里配置文件中的registry.nacos.application必须是serverAddr,group必须是DEFAULT_GROUP,因为该版本的seata客户端获取注册在nacos的seata服务时,写死的参数。所以我们需要把seata注册在nacos的默认分组下,服务名称为serverAddr。

2.4.3 初始化seata在nacos的配置

通过看seata提供的README.md得知,使用nacos作为注册、配置中心。需要将一些数据初始化到nacos上。

config.txt文件地址:https://github.com/seata/seata/blob/develop/script/config-center/config.txt

初始化脚本地址:https://github.com/seata/seata/blob/develop/script/config-center/nacos/nacos-config.sh

为了后续查看方便,贴上两个文件的内容。

config.txt

  1. transport.type=TCP
  2. transport.server=NIO
  3. transport.heartbeat=true
  4. transport.enableClientBatchSendRequest=false
  5. transport.threadFactory.bossThreadPrefix=NettyBoss
  6. transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
  7. transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
  8. transport.threadFactory.shareBossWorker=false
  9. transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
  10. transport.threadFactory.clientSelectorThreadSize=1
  11. transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
  12. transport.threadFactory.bossThreadSize=1
  13. transport.threadFactory.workerThreadSize=default
  14. transport.shutdown.wait=3
  15. service.vgroup_mapping.service-product=default
  16. service.vgroup_mapping.service-order=default
  17. service.vgroup_mapping.my_test_tx_group=default
  18. service.vgroupMapping.my_test_tx_group=default
  19. service.default.grouplist=127.0.0.1:9000
  20. service.enableDegrade=false
  21. service.disableGlobalTransaction=false
  22. client.rm.asyncCommitBufferLimit=10000
  23. client.rm.lock.retryInterval=10
  24. client.rm.lock.retryTimes=30
  25. client.rm.lock.retryPolicyBranchRollbackOnConflict=true
  26. client.rm.reportRetryCount=5
  27. client.rm.tableMetaCheckEnable=false
  28. client.rm.sqlParserType=druid
  29. client.rm.reportSuccessEnable=false
  30. client.rm.sagaBranchRegisterEnable=false
  31. client.tm.commitRetryCount=5
  32. client.tm.rollbackRetryCount=5
  33. client.tm.defaultGlobalTransactionTimeout=60000
  34. client.tm.degradeCheck=false
  35. client.tm.degradeCheckAllowTimes=10
  36. client.tm.degradeCheckPeriod=2000
  37. store.mode=file
  38. store.file.dir=file_store/data
  39. store.file.maxBranchSessionSize=16384
  40. store.file.maxGlobalSessionSize=512
  41. store.file.fileWriteBufferCacheSize=16384
  42. store.file.flushDiskMode=async
  43. store.file.sessionReloadReadSize=100
  44. store.db.datasource=druid
  45. store.db.dbType=mysql
  46. store.db.driverClassName=com.mysql.jdbc.Driver
  47. store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
  48. store.db.user=username
  49. store.db.password=password
  50. store.db.minConn=5
  51. store.db.maxConn=30
  52. store.db.globalTable=global_table
  53. store.db.branchTable=branch_table
  54. store.db.queryLimit=100
  55. store.db.lockTable=lock_table
  56. store.db.maxWait=5000
  57. store.redis.host=127.0.0.1
  58. store.redis.port=6379
  59. store.redis.maxConn=10
  60. store.redis.minConn=1
  61. store.redis.database=0
  62. store.redis.password=null
  63. store.redis.queryLimit=100
  64. server.recovery.committingRetryPeriod=1000
  65. server.recovery.asynCommittingRetryPeriod=1000
  66. server.recovery.rollbackingRetryPeriod=1000
  67. server.recovery.timeoutRetryPeriod=1000
  68. server.maxCommitRetryTimeout=-1
  69. server.maxRollbackRetryTimeout=-1
  70. server.rollbackRetryTimeoutUnlockEnable=false
  71. client.undo.dataValidation=true
  72. client.undo.logSerialization=jackson
  73. client.undo.onlyCareUpdateColumns=true
  74. server.undo.logSaveDays=7
  75. server.undo.logDeletePeriod=86400000
  76. client.undo.logTable=undo_log
  77. client.log.exceptionRate=100
  78. transport.serialization=seata
  79. transport.compressor=none
  80. metrics.enabled=false
  81. metrics.registryType=compact
  82. metrics.exporterList=prometheus
  83. metrics.exporterPrometheusPort=9898

注意:

  1. 我这里让seata监听9000端口,默认的是8091。修改配置service.default.grouplist=127.0.0.1:9000
  2. 有多个微服务,就需要添加多少条此配置service.vgroup_mapping.微服务名称=default。例如,我有个微服务名称为service-product,则需要加条配置:service.vgroup_mapping.service-product=default。后续会在微服务的bootstrap.yml中指定事务服务分组名称为微服务名称,tx-service-group: ${spring.application.name}

初始化脚本nacos-config.sh

  1. #!/usr/bin/env bash
  2. # Copyright 1999-2019 Seata.io Group.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at、
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. while getopts ":h:p:g:t:u:w:" opt
  16. do
  17. case $opt in
  18. h)
  19. host=$OPTARG
  20. ;;
  21. p)
  22. port=$OPTARG
  23. ;;
  24. g)
  25. group=$OPTARG
  26. ;;
  27. t)
  28. tenant=$OPTARG
  29. ;;
  30. u)
  31. username=$OPTARG
  32. ;;
  33. w)
  34. password=$OPTARG
  35. ;;
  36. ?)
  37. echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
  38. exit 1
  39. ;;
  40. esac
  41. done
  42. if [[ -z ${host} ]]; then
  43. host=localhost
  44. fi
  45. if [[ -z ${port} ]]; then
  46. port=8848
  47. fi
  48. if [[ -z ${group} ]]; then
  49. group="SEATA_GROUP"
  50. fi
  51. if [[ -z ${tenant} ]]; then
  52. tenant=""
  53. fi
  54. if [[ -z ${username} ]]; then
  55. username=""
  56. fi
  57. if [[ -z ${password} ]]; then
  58. password=""
  59. fi
  60. nacosAddr=$host:$port
  61. contentType="content-type:application/json;charset=UTF-8"
  62. echo "set nacosAddr=$nacosAddr"
  63. echo "set group=$group"
  64. failCount=0
  65. tempLog=$(mktemp -u)
  66. function addConfig() {
  67. curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$1&group=$group&content=$2&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
  68. if [[ -z $(cat "${tempLog}") ]]; then
  69. echo " Please check the cluster status. "
  70. exit 1
  71. fi
  72. if [[ $(cat "${tempLog}") =~ "true" ]]; then
  73. echo "Set $1=$2 successfully "
  74. else
  75. echo "Set $1=$2 failure "
  76. (( failCount++ ))
  77. fi
  78. }
  79. count=0
  80. #for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
  81. # 在windows环境下读取路径失败,我这里是直接写死了config.txt路径
  82. for line in $(cat C:/Develop/seata-1.3.0/conf/config.txt | sed s/[[:space:]]//g); do
  83. (( count++ ))
  84. key=${line%%=*}
  85. value=${line#*=}
  86. addConfig "${key}" "${value}"
  87. done
  88. echo "========================================================================="
  89. echo " Complete initialization parameters, total-count:$count , failure-count:$failCount "
  90. echo "========================================================================="
  91. if [[ ${failCount} -eq 0 ]]; then
  92. echo " Init nacos config finished, please start seata-server. "
  93. else
  94. echo " init nacos config fail. "
  95. fi
  96. read -n 1

注意:此脚本在windows环境下,读取config.txt文件路径失败,我这里是直接写死路径。

执行以下命令进行初始化:

  1. # 将seata的配置写入nacos中
  2. # 注意:这里要保证nacos已经正常运行
  3. cd conf
  4. nacos-config.sh 127.0.0.1

正常输出结果如图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BOr0QIWK-1654484848475)(https://gitee.com/xiaoxiangyuan/mark\_down\_pic/raw/master/2021/images/20220516175319.png)\]

在nacos中就可以看到初始化过来的配置:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K5Oj1Yg7-1654484848476)(https://gitee.com/xiaoxiangyuan/mark\_down\_pic/raw/master/2021/images/20220516175319.png)\]

2.4.4 启动seata服务

  1. cd bin
  2. seata-server.bat -p 9000

启动完成,在nacos的服务列表可以看到seata-server的服务。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1EejM3iu-1654484848476)(/Users/apple/Downloads/%E6%88%AA%E5%9B%BE/iShot_2022-06-06_10.55.10.png)][外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ayI6F725-1654484848476)(/Users/apple/Downloads/%E6%88%AA%E5%9B%BE/iShot_2022-06-06_10.55.27.png)]

2.4.5 添加undo_log表

在我们的数据库中加入一张undo_log表,这是seata记录事务日志要用到的表。表结构如下:

  1. -- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
  2. CREATE TABLE `undo_log` (
  3. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  4. `branch_id` bigint(20) NOT NULL,
  5. `xid` varchar(100) NOT NULL,
  6. `context` varchar(128) NOT NULL,
  7. `rollback_info` longblob NOT NULL,
  8. `log_status` int(11) NOT NULL,
  9. `log_created` datetime NOT NULL,
  10. `log_modified` datetime NOT NULL,
  11. `ext` varchar(100) DEFAULT NULL,
  12. PRIMARY KEY (`id`),
  13. UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
  14. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

2.4.6 微服务集成seata事务控制

第一步:添加依赖

  1. <!--seata-->
  2. <dependency>
  3. <groupId>com.alibaba.cloud</groupId>
  4. <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
  5. </dependency>

第二步:添加代理数据源

Seata是通过代理数据源实现事务分支的,所以需要配置io.seata.rm.datasource.DataSourceProxy 的Bean,且是@Primary默认的数据源,否则事务不会回滚,无法实现分布式事务。

  1. package cn.jack.config;
  2. import com.alibaba.druid.pool.DruidDataSource;
  3. import io.seata.rm.datasource.DataSourceProxy;
  4. import org.springframework.boot.context.properties.ConfigurationProperties;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.context.annotation.Primary;
  8. @Configuration
  9. public class DataSourceProxyConfig {
  10. @Bean
  11. @ConfigurationProperties(prefix = "spring.datasource")
  12. public DruidDataSource druidDataSource() {
  13. return new DruidDataSource();
  14. }
  15. @Primary
  16. @Bean
  17. public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
  18. return new DataSourceProxy(druidDataSource);
  19. }
  20. }

第三步:在微服务的resources下添加Seata的配置文件registry.conf

  1. registry {
  2. # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  3. type = "nacos"
  4. nacos {
  5. application = "serverAddr" # 不起作用,源码中写死了:serverAddr
  6. serverAddr = "127.0.0.1:8848"
  7. group = "DEFAULT_GROUP" # 不起作用,源码中写死了:DEFAULT_GROUP
  8. namespace = ""
  9. cluster = "default"
  10. username = ""
  11. password = ""
  12. }
  13. }
  14. config {
  15. # file、nacos 、apollo、zk、consul、etcd3
  16. type = "nacos"
  17. nacos {
  18. serverAddr = "127.0.0.1:8848"
  19. namespace = ""
  20. group = "SEATA_GROUP"
  21. username = ""
  22. password = ""
  23. }
  24. }

第四步:bootstrap.yml中添加seata配置

  1. spring:
  2. application:
  3. name: service-product
  4. cloud:
  5. nacos:
  6. config:
  7. server-addr: 127.0.0.1:8848 # Nacos配置中心的地址
  8. file-extension: yaml # 配置的格式
  9. shared-dataids: all-service.yaml # 要引入的配置
  10. refreshable-dataids: all-service.yaml # 动态刷新时也刷新引入的配置
  11. alibaba:
  12. seata:
  13. tx-service-group: ${spring.application.name} # 和服务名称相同
  14. profiles:
  15. active: test # 环境标识

第五步:下单接口服务层开启全局事务

  1. package cn.jack.service.impl;
  2. import cn.jack.dao.OrderDao;
  3. import cn.jack.domain.Order;
  4. import cn.jack.domain.Product;
  5. import cn.jack.service.ProductService;
  6. import com.alibaba.fastjson.JSON;
  7. import io.seata.spring.annotation.GlobalTransactional;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.stereotype.Service;
  12. @Service
  13. @Slf4j
  14. public class OrderServiceImpl5 {
  15. @Autowired
  16. private OrderDao orderDao;
  17. @Autowired
  18. private ProductService productService;
  19. @Autowired
  20. private RocketMQTemplate rocketMQTemplate;
  21. @GlobalTransactional // 全局事务控制
  22. public Order createOrder(Long pid) {
  23. log.info("收到下单请求,准备查询商品信息。pid={}", pid);
  24. // TODO 通过Feign调用商品微服务,查询商品信息
  25. Product product = this.productService.findByPid(pid);
  26. log.info("商品信息查询成功。内容为:{}", JSON.toJSONString(product));
  27. // 进行容错判断
  28. if (product.getPid() == -100) {
  29. Order order = new Order();
  30. order.setOid(-100L);
  31. order.setPname("下单失败");
  32. return order;
  33. }
  34. // TODO 生成订单信息保存
  35. Order order = new Order();
  36. order.setNumber(1);
  37. order.setPid(pid);
  38. order.setPname(product.getPname());
  39. order.setPprice(product.getPprice());
  40. order.setUid(1);
  41. order.setUsername("陈家宝");
  42. this.orderDao.save(order);
  43. log.info("订单信息保存成功。内容为:{}", JSON.toJSONString(order));
  44. // TODO 调用商品微服务,扣减库存
  45. this.productService.reduceInventory(pid, order.getNumber());
  46. // TODO 下单成功之后,mq发送订单内容消息
  47. rocketMQTemplate.convertAndSend("jack-topic", order);
  48. return order;
  49. }
  50. }

第六步:测试

再次下单测试。扣减库存失败后,商品订单数据回滚,达到了全局事务的一致性。

2.4.7 总结

  • Seata运行流程分析
    在这里插入图片描述

    要点说明:
    1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连接代理的
    目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务操作就一定有
    undo_log。
    2、在第一阶段undo_Jog中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成就己经将分
    支事务提交,也就释放了锁资源。
    3、TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支事务,每个
    分支事务将自己的Branch ID分支事务ID与XID关联。
    4、第二阶段全局事务提交,TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事务,这里各
    各参与者只需要删除undo._log即可,并且可以异步执行,第二阶段很快可以完成。
    5、第二阶段全局事务回滚,TC会通知各各分支参与者回滚分支事务,通过XID 和 Branch ID 找到相应的回滚日
    志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失败则会重试回滚操
    作。

  • 服务之间调用的问题

服务之间的调用使用了Feign,如果配置了容错类(fallback)或者容错工厂(fallbackFactory),则全局事务“不生效”。即商品微服务中扣减库存出现异常后,订单微服务的订单不会回滚。

可以理解为Seata决定全局事务是否回滚的方式,就是看开启全局事务的方法(加了@GlobalTransactional注解的方法)是否抛出异常,如果该方法抛了异常,则回滚所有分支事务。而如果Feign调用商品微服务(ProductService)配置了容错类的话,虽然调用的商品微服务方法抛出了异常,但是进入了容错的方法,也就是说开启全局事务的方法(此处为下订单的方法createOrder)并没有异常,所以订单不会回滚。

发表评论

表情:
评论列表 (有 0 条评论,86人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Seata--分布式事务

    1、Seata 分布式事务基础 1.1事务 事务指的就是一个操作单元,在这个操作单元中的所有操作最终要保持一致的行为,要么所有操作都成功,要么所有的操作都被撤销。简