SpringCloud-Alibaba之分布式事务Seata
SpringCloud-Alibaba之分布式事务Seata
目录
一、Seata介绍
二、使用Seata实现分布式事务控制
2.1 订单微服务实现下单接口
2.2 商品微服务实现扣减库存接口
2.3 暴露出分布式事务问题
2.3.1 正常测试
2.3.2 异常测试
2.4 Seata解决分布式事务问题
2.4.1 下载Seata
2.4.2 修改配置文件
2.4.3 初始化seata在nacos的配置
2.4.4 启动seata服务
2.4.5 添加undo_log表
2.4.6 微服务集成seata事务控制
2.4.7 总结
一、Seata介绍
Seata的设计目标是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进。它把一个分布式事务理解成一个包含了若干分支事务的全局事务。如下所示:
全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务。
Seata主要由三个重要组件组成:
- TC:Transaction Coordinator 事务协调器,管理全局的分支事务的状态,用于全局性事务的提交和回滚。也就是Seata独立运行的服务端。
- TM:Transaction Manager 事务管理器,用于开启、提交或者回滚全局事务。
- RM:Resource Manager 资源管理器,用于分支事务上的资源管理,向TC注册分支事务,上报分支事务的状态,接受TC的命令来提交或者回滚分支事务。
Seata的执行流程如下:
- A服务的TM向TC申请开启一个全局事务,TC就会创建一个全局事务并返回一个唯一的XID
- A服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖
- A服务执行分支事务,向数据库做操作
- A服务开始远程调用B服务,此时XID会在微服务的调用链上传播
- B服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖
- B服务执行分支事务,向数据库做操作
- 全局事务调用链处理完毕,TM根据有无异常向TC发起全局事务的提交或者回滚
- TC协调其管辖之下的所有分支事务,决定提交还是回滚
Seata实现2PC与传统2PC的差别:
- 架构层次不同,传统2PC方案的RM实际上是在数据库层,即RM本质上就是数据库本身。而Seata的RM是以jar包的形式作为中间件层部署在应用程序这一侧的。
- 保持资源锁时间不同,传统的2PC需要将资源锁保持到第二阶段完成后才释放。而Seata的做法是第一阶段就将本地事务提交了,缩短了保持资源锁的时间。
二、使用Seata实现分布式事务控制
下面示例通过Seata中间件实现分布式事务,模拟电商中的下单和减库存的过程。
我们通过订单微服务执行下单操作,然后由订单微服务调用商品微服务扣减库存。如下:
2.1 订单微服务实现下单接口
controller层:
package cn.jack.controller;
import cn.jack.domain.Order;
import cn.jack.service.impl.OrderServiceImpl5;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class OrderController5 {
@Autowired
private OrderServiceImpl5 orderServiceImpl5;
/**
* 下单
* @param pid 商品id
* @return
*/
@RequestMapping("/order/prod/{pid}")
public Order order(@PathVariable("pid") Long pid) {
return this.orderServiceImpl5.createOrder(pid);
}
}
service层:
package cn.jack.service.impl;
import cn.jack.dao.OrderDao;
import cn.jack.domain.Order;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class OrderServiceImpl5 {
@Autowired
private OrderDao orderDao;
@Autowired
private ProductService productService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
public Order createOrder(Long pid) {
log.info("收到下单请求,准备查询商品信息。pid={}", pid);
// TODO 通过Feign调用商品微服务,查询商品信息
Product product = this.productService.findByPid(pid);
log.info("商品信息查询成功。内容为:{}", JSON.toJSONString(product));
// 进行容错判断
if (product.getPid() == -100) {
Order order = new Order();
order.setOid(-100L);
order.setPname("下单失败");
return order;
}
// TODO 生成订单信息保存
Order order = new Order();
order.setNumber(1);
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setUid(1);
order.setUsername("陈家宝");
this.orderDao.save(order);
log.info("订单信息保存成功。内容为:{}", JSON.toJSONString(order));
// TODO 调用商品微服务,扣减库存
this.productService.reduceInventory(pid, order.getNumber());
// TODO 下单成功之后,mq发送订单内容消息
rocketMQTemplate.convertAndSend("jack-topic", order);
return order;
}
}
2.2 商品微服务实现扣减库存接口
controller层:
package cn.jack.controller;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class ProductController {
@Autowired
private ProductService productService;
@RequestMapping("/product/{pid}")
public Product product(@PathVariable("pid") Integer pid) {
log.info("开始查询商品信息。pid={}", pid);
Product product = productService.findByPid(pid);
log.info("商品信息查询成功:{}", JSON.toJSONString(product));
return product;
}
/**
* 扣减商品库存
* @param pid 商品ID
* @param number 扣除数量
*/
@RequestMapping("/product/reduceInventory")
public void reduceInventory(@RequestParam("pid") Integer pid, @RequestParam("number") Integer number) {
this.productService.reduceInventory(pid, number);
}
}
Service层:
package cn.jack.service.impl;
import cn.jack.dao.ProductDao;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductDao productDao;
@Override
public Product findByPid(Integer pid) {
return this.productDao.findById(pid).get();
}
@Override
public void reduceInventory(Integer pid, Integer number) {
// TODO 根据商品id,查询商品信息
Product product = this.productDao.findById(pid).get();
// TODO 省略校验,直接扣减库存
product.setStock(product.getStock() - number);
this.productDao.save(product);
}
}
2.3 暴露出分布式事务问题
2.3.1 正常测试
访问商品下单接口,生成订单记录,商品库存扣除正常。
2.3.2 异常测试
模拟商品库存扣除出现异常,此时订单信息保存成功,而商品库存没有扣除。出现分布式事务问题。
商品微服务service层修改如下:
package cn.jack.service.impl;
import cn.jack.dao.ProductDao;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductDao productDao;
@Override
public Product findByPid(Integer pid) {
return this.productDao.findById(pid).get();
}
@Override
public void reduceInventory(Integer pid, Integer number) {
// TODO 根据商品id,查询商品信息
Product product = this.productDao.findById(pid).get();
// TODO 模拟异常
int i = 1 / 0;
// TODO 省略校验,直接扣减库存
product.setStock(product.getStock() - number);
this.productDao.save(product);
}
}
访问下单链接,进行测试:
2.4 Seata解决分布式事务问题
seata官方demo地址:http://seata.io/zh-cn/docs/user/quickstart.html
客户端集成:https://github.com/seata/seata-samples/blob/master/doc/quick-integration-with-spring-cloud.md
2.4.1 下载Seata
下载链接:https://github.com/seata/seata/releases/tag/v1.3.0,根据需要进行下载,这里我下载zip版本。
2.4.2 修改配置文件
registry.conf该配置用于指定 TC 的注册中心和配置文件,默认都是 file; 如果使用其他的注册中心,要求 Seata-Server 也注册到该配置中心上。将下载到的压缩包进行解压,进入到conf目录,修改配置文件:
registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "serverAddr" # seata-server
serverAddr = "127.0.0.1:8848"
group = "DEFAULT_GROUP" # SEATA_GROUP
namespace = ""
cluster = "default"
username = ""
password = ""
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
}
}
注意:我微服务中引入的seata依赖spring-cloud-starter-alibaba-seata版本是2.1.1.RELEASE,这里配置文件中的registry.nacos.application必须是serverAddr,group必须是DEFAULT_GROUP,因为该版本的seata客户端获取注册在nacos的seata服务时,写死的参数。所以我们需要把seata注册在nacos的默认分组下,服务名称为serverAddr。
2.4.3 初始化seata在nacos的配置
通过看seata提供的README.md得知,使用nacos作为注册、配置中心。需要将一些数据初始化到nacos上。
config.txt文件地址:https://github.com/seata/seata/blob/develop/script/config-center/config.txt
初始化脚本地址:https://github.com/seata/seata/blob/develop/script/config-center/nacos/nacos-config.sh
为了后续查看方便,贴上两个文件的内容。
config.txt
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroup_mapping.service-product=default
service.vgroup_mapping.service-order=default
service.vgroup_mapping.my_test_tx_group=default
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:9000
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=file
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=username
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.host=127.0.0.1
store.redis.port=6379
store.redis.maxConn=10
store.redis.minConn=1
store.redis.database=0
store.redis.password=null
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
注意:
我这里让seata监听9000端口,默认的是8091。修改配置service.default.grouplist=127.0.0.1:9000
有多个微服务,就需要添加多少条此配置service.vgroup_mapping.微服务名称=default。例如,我有个微服务名称为service-product,则需要加条配置:service.vgroup_mapping.service-product=default。后续会在微服务的bootstrap.yml中指定事务服务分组名称为微服务名称,tx-service-group: ${
spring.application.name}
初始化脚本nacos-config.sh
#!/usr/bin/env bash
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at、
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
while getopts ":h:p:g:t:u:w:" opt
do
case $opt in
h)
host=$OPTARG
;;
p)
port=$OPTARG
;;
g)
group=$OPTARG
;;
t)
tenant=$OPTARG
;;
u)
username=$OPTARG
;;
w)
password=$OPTARG
;;
?)
echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
exit 1
;;
esac
done
if [[ -z ${
host} ]]; then
host=localhost
fi
if [[ -z ${
port} ]]; then
port=8848
fi
if [[ -z ${
group} ]]; then
group="SEATA_GROUP"
fi
if [[ -z ${
tenant} ]]; then
tenant=""
fi
if [[ -z ${
username} ]]; then
username=""
fi
if [[ -z ${
password} ]]; then
password=""
fi
nacosAddr=$host:$port
contentType="content-type:application/json;charset=UTF-8"
echo "set nacosAddr=$nacosAddr"
echo "set group=$group"
failCount=0
tempLog=$(mktemp -u)
function addConfig() {
curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$1&group=$group&content=$2&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
if [[ -z $(cat "${tempLog}") ]]; then
echo " Please check the cluster status. "
exit 1
fi
if [[ $(cat "${tempLog}") =~ "true" ]]; then
echo "Set $1=$2 successfully "
else
echo "Set $1=$2 failure "
(( failCount++ ))
fi
}
count=0
#for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
# 在windows环境下读取路径失败,我这里是直接写死了config.txt路径
for line in $(cat C:/Develop/seata-1.3.0/conf/config.txt | sed s/[[:space:]]//g); do
(( count++ ))
key=${
line%%=*}
value=${
line#*=}
addConfig "${key}" "${value}"
done
echo "========================================================================="
echo " Complete initialization parameters, total-count:$count , failure-count:$failCount "
echo "========================================================================="
if [[ ${
failCount} -eq 0 ]]; then
echo " Init nacos config finished, please start seata-server. "
else
echo " init nacos config fail. "
fi
read -n 1
注意:此脚本在windows环境下,读取config.txt文件路径失败,我这里是直接写死路径。
执行以下命令进行初始化:
# 将seata的配置写入nacos中
# 注意:这里要保证nacos已经正常运行
cd conf
nacos-config.sh 127.0.0.1
正常输出结果如图所示:
在nacos中就可以看到初始化过来的配置:
2.4.4 启动seata服务
cd bin
seata-server.bat -p 9000
启动完成,在nacos的服务列表可以看到seata-server的服务。
2.4.5 添加undo_log表
在我们的数据库中加入一张undo_log表,这是seata记录事务日志要用到的表。表结构如下:
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
2.4.6 微服务集成seata事务控制
第一步:添加依赖
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
第二步:添加代理数据源
Seata是通过代理数据源实现事务分支的,所以需要配置io.seata.rm.datasource.DataSourceProxy 的Bean,且是@Primary默认的数据源,否则事务不会回滚,无法实现分布式事务。
package cn.jack.config;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
@Primary
@Bean
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
第三步:在微服务的resources下添加Seata的配置文件registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "serverAddr" # 不起作用,源码中写死了:serverAddr
serverAddr = "127.0.0.1:8848"
group = "DEFAULT_GROUP" # 不起作用,源码中写死了:DEFAULT_GROUP
namespace = ""
cluster = "default"
username = ""
password = ""
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
}
}
第四步:bootstrap.yml中添加seata配置
spring:
application:
name: service-product
cloud:
nacos:
config:
server-addr: 127.0.0.1:8848 # Nacos配置中心的地址
file-extension: yaml # 配置的格式
shared-dataids: all-service.yaml # 要引入的配置
refreshable-dataids: all-service.yaml # 动态刷新时也刷新引入的配置
alibaba:
seata:
tx-service-group: ${
spring.application.name} # 和服务名称相同
profiles:
active: test # 环境标识
第五步:下单接口服务层开启全局事务
package cn.jack.service.impl;
import cn.jack.dao.OrderDao;
import cn.jack.domain.Order;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import com.alibaba.fastjson.JSON;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class OrderServiceImpl5 {
@Autowired
private OrderDao orderDao;
@Autowired
private ProductService productService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GlobalTransactional // 全局事务控制
public Order createOrder(Long pid) {
log.info("收到下单请求,准备查询商品信息。pid={}", pid);
// TODO 通过Feign调用商品微服务,查询商品信息
Product product = this.productService.findByPid(pid);
log.info("商品信息查询成功。内容为:{}", JSON.toJSONString(product));
// 进行容错判断
if (product.getPid() == -100) {
Order order = new Order();
order.setOid(-100L);
order.setPname("下单失败");
return order;
}
// TODO 生成订单信息保存
Order order = new Order();
order.setNumber(1);
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setUid(1);
order.setUsername("陈家宝");
this.orderDao.save(order);
log.info("订单信息保存成功。内容为:{}", JSON.toJSONString(order));
// TODO 调用商品微服务,扣减库存
this.productService.reduceInventory(pid, order.getNumber());
// TODO 下单成功之后,mq发送订单内容消息
rocketMQTemplate.convertAndSend("jack-topic", order);
return order;
}
}
第六步:测试
再次下单测试。扣减库存失败后,商品订单数据回滚,达到了全局事务的一致性。
2.4.7 总结
Seata运行流程分析
服务之间调用的问题
服务之间的调用使用了Feign,如果配置了容错类(fallback)或者容错工厂(fallbackFactory),则全局事务“不生效”。即商品微服务中扣减库存出现异常后,订单微服务的订单不会回滚。
可以理解为Seata决定全局事务是否回滚的方式,就是看开启全局事务的方法(加了@GlobalTransactional注解的方法)是否抛出异常,如果该方法抛了异常,则回滚所有分支事务。而如果Feign调用商品微服务(ProductService)配置了容错类的话,虽然调用的商品微服务方法抛出了异常,但是进入了容错的方法,也就是说开启全局事务的方法(此处为下订单的方法createOrder)并没有异常,所以订单不会回滚。
2.2 商品微服务实现扣减库存接口
controller层:
package cn.jack.controller;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class ProductController {
@Autowired
private ProductService productService;
@RequestMapping("/product/{pid}")
public Product product(@PathVariable("pid") Integer pid) {
log.info("开始查询商品信息。pid={}", pid);
Product product = productService.findByPid(pid);
log.info("商品信息查询成功:{}", JSON.toJSONString(product));
return product;
}
/**
* 扣减商品库存
* @param pid 商品ID
* @param number 扣除数量
*/
@RequestMapping("/product/reduceInventory")
public void reduceInventory(@RequestParam("pid") Integer pid, @RequestParam("number") Integer number) {
this.productService.reduceInventory(pid, number);
}
}
Service层:
package cn.jack.service.impl;
import cn.jack.dao.ProductDao;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductDao productDao;
@Override
public Product findByPid(Integer pid) {
return this.productDao.findById(pid).get();
}
@Override
public void reduceInventory(Integer pid, Integer number) {
// TODO 根据商品id,查询商品信息
Product product = this.productDao.findById(pid).get();
// TODO 省略校验,直接扣减库存
product.setStock(product.getStock() - number);
this.productDao.save(product);
}
}
2.3 暴露出分布式事务问题
2.3.1 正常测试
访问商品下单接口,生成订单记录,商品库存扣除正常。
2.3.2 异常测试
模拟商品库存扣除出现异常,此时订单信息保存成功,而商品库存没有扣除。出现分布式事务问题。
商品微服务service层修改如下:
package cn.jack.service.impl;
import cn.jack.dao.ProductDao;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductDao productDao;
@Override
public Product findByPid(Integer pid) {
return this.productDao.findById(pid).get();
}
@Override
public void reduceInventory(Integer pid, Integer number) {
// TODO 根据商品id,查询商品信息
Product product = this.productDao.findById(pid).get();
// TODO 模拟异常
int i = 1 / 0;
// TODO 省略校验,直接扣减库存
product.setStock(product.getStock() - number);
this.productDao.save(product);
}
}
访问下单链接,进行测试:
2.4 Seata解决分布式事务问题
seata官方demo地址:http://seata.io/zh-cn/docs/user/quickstart.html
客户端集成:https://github.com/seata/seata-samples/blob/master/doc/quick-integration-with-spring-cloud.md
2.4.1 下载Seata
下载链接:https://github.com/seata/seata/releases/tag/v1.3.0,根据需要进行下载,这里我下载zip版本。
2.4.2 修改配置文件
registry.conf该配置用于指定 TC 的注册中心和配置文件,默认都是 file; 如果使用其他的注册中心,要求 Seata-Server 也注册到该配置中心上。将下载到的压缩包进行解压,进入到conf目录,修改配置文件:
registry.conf
registry {
file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = “nacos”
nacos {
application = "serverAddr" # seata-server
serverAddr = "127.0.0.1:8848"
group = "DEFAULT_GROUP" # SEATA_GROUP
namespace = ""
cluster = "default"
username = ""
password = ""
}
}config {
file、nacos 、apollo、zk、consul、etcd3
type = “nacos”
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
}
}注意:我微服务中引入的seata依赖spring-cloud-starter-alibaba-seata版本是2.1.1.RELEASE,这里配置文件中的registry.nacos.application必须是serverAddr,group必须是DEFAULT_GROUP,因为该版本的seata客户端获取注册在nacos的seata服务时,写死的参数。所以我们需要把seata注册在nacos的默认分组下,服务名称为serverAddr。
2.4.3 初始化seata在nacos的配置
通过看seata提供的README.md得知,使用nacos作为注册、配置中心。需要将一些数据初始化到nacos上。
config.txt文件地址:https://github.com/seata/seata/blob/develop/script/config-center/config.txt
初始化脚本地址:https://github.com/seata/seata/blob/develop/script/config-center/nacos/nacos-config.sh
为了后续查看方便,贴上两个文件的内容。
config.txt
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroup_mapping.service-product=default
service.vgroup_mapping.service-order=default
service.vgroup_mapping.my_test_tx_group=default
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:9000
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=file
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=username
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.host=127.0.0.1
store.redis.port=6379
store.redis.maxConn=10
store.redis.minConn=1
store.redis.database=0
store.redis.password=null
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
注意:
- 我这里让seata监听9000端口,默认的是8091。修改配置service.default.grouplist=127.0.0.1:9000
- 有多个微服务,就需要添加多少条此配置service.vgroup_mapping.微服务名称=default。例如,我有个微服务名称为service-product,则需要加条配置:service.vgroup_mapping.service-product=default。后续会在微服务的bootstrap.yml中指定事务服务分组名称为微服务名称,tx-service-group: ${spring.application.name}
初始化脚本nacos-config.sh
#!/usr/bin/env bash
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at、
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
while getopts ":h:p:g:t:u:w:" opt
do
case $opt in
h)
host=$OPTARG
;;
p)
port=$OPTARG
;;
g)
group=$OPTARG
;;
t)
tenant=$OPTARG
;;
u)
username=$OPTARG
;;
w)
password=$OPTARG
;;
?)
echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
exit 1
;;
esac
done
if [[ -z ${host} ]]; then
host=localhost
fi
if [[ -z ${port} ]]; then
port=8848
fi
if [[ -z ${group} ]]; then
group="SEATA_GROUP"
fi
if [[ -z ${tenant} ]]; then
tenant=""
fi
if [[ -z ${username} ]]; then
username=""
fi
if [[ -z ${password} ]]; then
password=""
fi
nacosAddr=$host:$port
contentType="content-type:application/json;charset=UTF-8"
echo "set nacosAddr=$nacosAddr"
echo "set group=$group"
failCount=0
tempLog=$(mktemp -u)
function addConfig() {
curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$1&group=$group&content=$2&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
if [[ -z $(cat "${tempLog}") ]]; then
echo " Please check the cluster status. "
exit 1
fi
if [[ $(cat "${tempLog}") =~ "true" ]]; then
echo "Set $1=$2 successfully "
else
echo "Set $1=$2 failure "
(( failCount++ ))
fi
}
count=0
#for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
# 在windows环境下读取路径失败,我这里是直接写死了config.txt路径
for line in $(cat C:/Develop/seata-1.3.0/conf/config.txt | sed s/[[:space:]]//g); do
(( count++ ))
key=${line%%=*}
value=${line#*=}
addConfig "${key}" "${value}"
done
echo "========================================================================="
echo " Complete initialization parameters, total-count:$count , failure-count:$failCount "
echo "========================================================================="
if [[ ${failCount} -eq 0 ]]; then
echo " Init nacos config finished, please start seata-server. "
else
echo " init nacos config fail. "
fi
read -n 1
注意:此脚本在windows环境下,读取config.txt文件路径失败,我这里是直接写死路径。
执行以下命令进行初始化:
# 将seata的配置写入nacos中
# 注意:这里要保证nacos已经正常运行
cd conf
nacos-config.sh 127.0.0.1
正常输出结果如图所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BOr0QIWK-1654484848475)(https://gitee.com/xiaoxiangyuan/mark\_down\_pic/raw/master/2021/images/20220516175319.png)\]
在nacos中就可以看到初始化过来的配置:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K5Oj1Yg7-1654484848476)(https://gitee.com/xiaoxiangyuan/mark\_down\_pic/raw/master/2021/images/20220516175319.png)\]
2.4.4 启动seata服务
cd bin
seata-server.bat -p 9000
启动完成,在nacos的服务列表可以看到seata-server的服务。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1EejM3iu-1654484848476)(/Users/apple/Downloads/%E6%88%AA%E5%9B%BE/iShot_2022-06-06_10.55.10.png)][外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ayI6F725-1654484848476)(/Users/apple/Downloads/%E6%88%AA%E5%9B%BE/iShot_2022-06-06_10.55.27.png)]
2.4.5 添加undo_log表
在我们的数据库中加入一张undo_log表,这是seata记录事务日志要用到的表。表结构如下:
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
2.4.6 微服务集成seata事务控制
第一步:添加依赖
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
第二步:添加代理数据源
Seata是通过代理数据源实现事务分支的,所以需要配置io.seata.rm.datasource.DataSourceProxy 的Bean,且是@Primary默认的数据源,否则事务不会回滚,无法实现分布式事务。
package cn.jack.config;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
@Primary
@Bean
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
第三步:在微服务的resources下添加Seata的配置文件registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "serverAddr" # 不起作用,源码中写死了:serverAddr
serverAddr = "127.0.0.1:8848"
group = "DEFAULT_GROUP" # 不起作用,源码中写死了:DEFAULT_GROUP
namespace = ""
cluster = "default"
username = ""
password = ""
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
}
}
第四步:bootstrap.yml中添加seata配置
spring:
application:
name: service-product
cloud:
nacos:
config:
server-addr: 127.0.0.1:8848 # Nacos配置中心的地址
file-extension: yaml # 配置的格式
shared-dataids: all-service.yaml # 要引入的配置
refreshable-dataids: all-service.yaml # 动态刷新时也刷新引入的配置
alibaba:
seata:
tx-service-group: ${spring.application.name} # 和服务名称相同
profiles:
active: test # 环境标识
第五步:下单接口服务层开启全局事务
package cn.jack.service.impl;
import cn.jack.dao.OrderDao;
import cn.jack.domain.Order;
import cn.jack.domain.Product;
import cn.jack.service.ProductService;
import com.alibaba.fastjson.JSON;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class OrderServiceImpl5 {
@Autowired
private OrderDao orderDao;
@Autowired
private ProductService productService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GlobalTransactional // 全局事务控制
public Order createOrder(Long pid) {
log.info("收到下单请求,准备查询商品信息。pid={}", pid);
// TODO 通过Feign调用商品微服务,查询商品信息
Product product = this.productService.findByPid(pid);
log.info("商品信息查询成功。内容为:{}", JSON.toJSONString(product));
// 进行容错判断
if (product.getPid() == -100) {
Order order = new Order();
order.setOid(-100L);
order.setPname("下单失败");
return order;
}
// TODO 生成订单信息保存
Order order = new Order();
order.setNumber(1);
order.setPid(pid);
order.setPname(product.getPname());
order.setPprice(product.getPprice());
order.setUid(1);
order.setUsername("陈家宝");
this.orderDao.save(order);
log.info("订单信息保存成功。内容为:{}", JSON.toJSONString(order));
// TODO 调用商品微服务,扣减库存
this.productService.reduceInventory(pid, order.getNumber());
// TODO 下单成功之后,mq发送订单内容消息
rocketMQTemplate.convertAndSend("jack-topic", order);
return order;
}
}
第六步:测试
再次下单测试。扣减库存失败后,商品订单数据回滚,达到了全局事务的一致性。
2.4.7 总结
Seata运行流程分析
要点说明:
1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连接代理的
目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务操作就一定有
undo_log。
2、在第一阶段undo_Jog中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成就己经将分
支事务提交,也就释放了锁资源。
3、TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支事务,每个
分支事务将自己的Branch ID分支事务ID与XID关联。
4、第二阶段全局事务提交,TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事务,这里各
各参与者只需要删除undo._log即可,并且可以异步执行,第二阶段很快可以完成。
5、第二阶段全局事务回滚,TC会通知各各分支参与者回滚分支事务,通过XID 和 Branch ID 找到相应的回滚日
志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失败则会重试回滚操
作。服务之间调用的问题
服务之间的调用使用了Feign,如果配置了容错类(fallback)或者容错工厂(fallbackFactory),则全局事务“不生效”。即商品微服务中扣减库存出现异常后,订单微服务的订单不会回滚。
可以理解为Seata决定全局事务是否回滚的方式,就是看开启全局事务的方法(加了@GlobalTransactional注解的方法)是否抛出异常,如果该方法抛了异常,则回滚所有分支事务。而如果Feign调用商品微服务(ProductService)配置了容错类的话,虽然调用的商品微服务方法抛出了异常,但是进入了容错的方法,也就是说开启全局事务的方法(此处为下订单的方法createOrder)并没有异常,所以订单不会回滚。
还没有评论,来说两句吧...