《分布式事务系列教程-第五章-Seata分布式事务解决方案》
《分布式事务系列教程-第五章-Seata分布式事务解决方案》
一、seata解决方案
Seata是一个开源的分布式事务解决方案,是由阿里中间件团队研发的,原名Fescar,后更名为seata,seata致力于提供高性能和易于使用的分布式事务服务。Seata将为用户提供AT,TCC,SAGA和XA交易模型,以为用户创建一站式分布式解决方案。
- seata官网:http://seata.io
- github地址:https://github.com/seata/seata
seata也是基于2PC的一种分布式解决方案,对传统的2PC基础上进行了一些优化,通过引入事务协调器(TC)来监控各个分支事务的执行状态来判断全局事务是否提交或回滚,引入日志表来替代锁表问题。
- 阶段1:在同一本地事务中提交业务数据并且记录日志到undo表,然后释放本地锁和连接资源。
阶段2:
- 对于提交情况,异步快速地完成工作。
- 对于回滚情况,请根据阶段1中创建的日志数据进行补偿。
1.1 seata执行流程:
1)当RM启动时,首先注册到TC中。
2)TM向TC申请开启全局事务,全局事务开启后生成用于标识此次全局事务的id(XID)。每个全局事务ID各不相同。
4)首先第一个RM向TC开启分支事务,当进行微服务调用时,XID也会随之传递下来,此时的RM事务已经真正提交
5)最终TM根据各分支事务执行情况,来请求TC提交或者回滚全局事务
6)TC根据XID对应的全局事务下的分支事务来提交或回滚。
TM和全局事务的发起方在一起;
1.2 搭建seata工程
1.2.1 启动seata协调器
启动事务协调器TC
seata-server.bat -p 10101 -m file
-p
:指定启动的端口-m
:指定启动的模式,file代表以文件方式存储信息
1.2.2 搭建父工程
pom.xml:
<?xml version=”1.0” encoding=”UTF-8”?>
4.0.0
com.lscl
seata_transaction
pom
1.0-SNAPSHOT
eureka_server
store
orders
2.1.0.RELEASE
commons-lang
commons-lang
2.6
com.alibaba
druid-spring-boot-starter
1.1.16
com.alibaba.cloud
spring-cloud-alibaba-dependencies
2.1.0.RELEASE
pom
import
org.springframework.boot
spring-boot-dependencies
2.1.3.RELEASE
pom
import
org.springframework.cloud
spring-cloud-dependencies
Greenwich.RELEASE
pom
import
1.2.3 搭建EurekaServer
1)pom.xml:
<?xml version=”1.0” encoding=”UTF-8”?>
seata_transaction
com.lscl
1.0-SNAPSHOT
4.0.0
eureka_server
org.springframework.cloud
spring-cloud-starter-netflix-eureka-server
org.springframework.boot
spring-boot-starter
2)Application.yml
server:
port: 10102
eureka:
client:register-with-eureka: false # 是否将该服务注册到eureka服务端
fetch-registry: false # 是否从eureka服务端获取其他服务实例
service-url: # eureka的注册地址
defaultZone: http://127.0.0.1:${ server.port}/eureka
3)启动类
package com.lscl.eureka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;@SpringBootApplication
@EnableEurekaServer // 开启eureka服务器
public class EurekaApplication {public static void main(String[] args) {
SpringApplication.run(EurekaApplication.class);
}
}
1.2.4 搭建库存微服
1)pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>seata_transaction</artifactId>
<groupId>com.lscl</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>store</artifactId>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
</dependencies>
</project>
2)registry.conf:
registry {
# file 、redis、zookeeper、eureka、nacos 、consul、etcd3、sofa
type = "file"
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk
type = "file"
file {
name = "file.conf"
}
}
3)file.conf:
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
}
## transaction log store
store {
## store mode: file、db
mode = "file"
## file store
file {
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
max-branch-session-size = 16384
# globe session size , if exceeded throws exceptions
max-global-session-size = 512
# file buffer size , if exceeded allocate new buffer
file-write-buffer-cache-size = 16384
# when recover batch read size
session.reload.read_size = 100
# async, sync
flush-disk-mode = async
}
}
service {
#vgroup->rgroup
vgroup_mapping.orders-fescar-service-group = "default" # 微服名称
#only support single node
default.grouplist = "127.0.0.1:10101" # seata
#degrade current not support
enableDegrade = false
#disable
disable = false
}
client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
}
4)application.xml:
server:
port: 10104
servlet:
context-path: /store
spring:
application:
name: store #指定服务名
datasource:
driverClassName: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/store?serverTimezone=GMT%2b8
username: root
password: admin
jpa:
properties:
hibernate:
format_sql: true # 格式化sql
show_sql: true # 显示sql
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:10102/eureka
5)启动类:
package com.lscl.store;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
@EnableEurekaClient
@SpringBootApplication
public class StoreApplication {
public static void main(String[] args) {
SpringApplication.run(StoreApplication.class);
}
private final ApplicationContext applicationContext;
public StoreApplication(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
return druidDataSource;
}
@Primary
@Bean // 代理数据源
public DataSource dataSource(DruidDataSource dataSource) {
DataSourceProxy proxyDS = new DataSourceProxy(dataSource);
return proxyDS;
}
}
6)entity.java:
package com.lscl.store.entity;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
@Entity
@Table(name = "t_store")
public class Store implements Serializable {
@Id
private String id;
private Integer count;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
}
7)dao.java:
package com.lscl.store.dao;
import com.lscl.store.entity.Store;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
@Repository
public interface StoreDao extends JpaRepository<Store,String> {
@Query("update Store s set s.count=s.count-1")
@Modifying
void updateCount();
}
8)service.java:
package com.lscl.store.service;
import com.lscl.store.dao.StoreDao;
import io.seata.core.context.RootContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class StoreService {
@Autowired
private StoreDao storeDao;
@Transactional
public void updateCount(Integer flag) {
System.out.println("orders_xid: "+ RootContext.getXID());
// 库存-1
storeDao.updateCount();
if(flag==200){ // 模拟异常
throw new RuntimeException("store...error");
}
}
}
9)controller.java:
package com.lscl.store.controller;
import com.lscl.store.service.StoreService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class StoreController {
@Autowired
private StoreService storeService;
@RequestMapping("/updateCount/{flag}")
public String updateCount(@PathVariable Integer flag){
storeService.updateCount(flag);
return "store...success";
}
}
1.2.5 搭建订单微服
1)pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>seata_transaction</artifactId>
<groupId>com.lscl</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>orders</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
</dependencies>
</project>
2)registry.conf:
registry {
# file 、redis、zookeeper、eureka、nacos 、consul、etcd3、sofa
type = "file"
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk
type = "file"
file {
name = "file.conf"
}
}
3)file.conf:
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
#thread factory for netty
thread-factory {
boss-thread-prefix = "NettyBoss"
worker-thread-prefix = "NettyServerNIOWorker"
server-executor-thread-prefix = "NettyServerBizHandler"
share-boss-worker = false
client-selector-thread-prefix = "NettyClientSelector"
client-selector-thread-size = 1
client-worker-thread-prefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
boss-thread-size = 1
#auto default pin or 8
worker-thread-size = 8
}
}
## transaction log store
store {
## store mode: file、db
mode = "file"
## file store
file {
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
max-branch-session-size = 16384
# globe session size , if exceeded throws exceptions
max-global-session-size = 512
# file buffer size , if exceeded allocate new buffer
file-write-buffer-cache-size = 16384
# when recover batch read size
session.reload.read_size = 100
# async, sync
flush-disk-mode = async
}
}
service {
#vgroup->rgroup
vgroup_mapping.store-fescar-service-group = "default"
#only support single node
default.grouplist = "127.0.0.1:10101"
#degrade current not support
enableDegrade = false
#disable
disable = false
}
client {
async.commit.buffer.limit = 10000
lock {
retry.internal = 10
retry.times = 30
}
}
4)application.yml:
server:
port: 10103
servlet:
context-path: /orders
spring:
application:
name: orders #指定服务名
datasource:
driverClassName: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/orders?serverTimezone=GMT%2b8
username: root
password: admin
jpa:
properties:
hibernate:
format_sql: true # 格式化sql
show_sql: true # 显示sql
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:10102/eureka
feign: # 开启熔断器
hystrix:
enabled: true
5)启动类:
package com.lscl.orders;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
@EnableEurekaClient
@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication
public class OrdersApplication {
public static void main(String[] args) {
SpringApplication.run(OrdersApplication.class);
}
private final ApplicationContext applicationContext;
public OrdersApplication(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
return druidDataSource;
}
@Primary
@Bean // 代理数据源
public DataSourceProxy dataSource(DruidDataSource dataSource) {
DataSourceProxy proxyDS = new DataSourceProxy(dataSource);
return proxyDS;
}
}
6)Client:
package com.lscl.orders.client;
import com.lscl.orders.client.impl.StoreClientImpl;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
@FeignClient(value = "store",fallback = StoreClientImpl.class)
public interface StoreClient {
@RequestMapping("/store/updateCount/{flag}")
public String updateCount(@PathVariable("flag") Integer flag);
}
7)ClientImpl:
package com.lscl.orders.client.impl;
import com.lscl.orders.client.StoreClient;
import org.springframework.stereotype.Component;
@Component
public class StoreClientImpl implements StoreClient {
@Override
public String updateCount(Integer flag) {
return "no";
}
}
8)entity:
package com.lscl.orders.entity;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
@Entity
@Table(name = "t_orders")
public class Orders implements Serializable {
@Id
private String id;
private Integer count;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
}
9)dao:
package com.lscl.orders.dao;
import com.lscl.orders.entity.Orders;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
@Repository
public interface OrdersDao extends JpaRepository<Orders,String> {
@Query("update Orders o set o.count=o.count+1")
@Modifying
void updateCount();
}
10)service:
package com.lscl.orders.service;
import com.lscl.orders.client.StoreClient;
import com.lscl.orders.dao.OrdersDao;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class OrdersService {
@Autowired
private OrdersDao ordersDao;
@Autowired
private StoreClient storeClient;
@Transactional
@GlobalTransactional // 开启全局事务
public void updateCount(Integer flag) {
System.out.println("orders_xid: "+RootContext.getXID());
// 订单+1
ordersDao.updateCount();
// 库存-1
String msg = storeClient.updateCount(flag);
if("no".equals(msg)){
throw new RuntimeException("store...error");
}
System.out.println(msg);
if (flag == 100) {
throw new RuntimeException("orders...error");
}
}
}
11)controller:
package com.lscl.orders.controller;
import com.lscl.orders.service.OrdersService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrdersController {
@Autowired
private OrdersService ordersService;
@RequestMapping("/updateCount/{flag}")
public String updateCount(@PathVariable Integer flag){
ordersService.updateCount(flag);
return "orders...success";
}
}
1.2.5 创建undo_log表
根据2PC协议,我们知道在第一阶段时,执行完业务sql之后,会把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG
表中
在第二阶段时如果是回滚则根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句
如果是提交则分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
在订单、库存两个数据库中分别创建对应的undo_log日志表。
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
全部RM执行成功流程:
某个RM执行失败流程:
1.3 总结
seata也是基于2PC的一种分布式解决方案。seata方案解决了XA方案在准备阶段时,RM锁的问题。在高并发情况下,应当避免XA方案。
xa方案依赖于数据库对XA协议的支持,seata则是通过业务逻辑的处理(记录undo_log表),来达到数据的回滚,底层不依赖于数据库。
还没有评论,来说两句吧...