SpringCloud Alibaba篇 - Seata【分布式事务】

逃离我推掉我的手 2023-10-03 11:03 28阅读 0赞

⑤ Seata【分布式事务】

5.1 分布式事务的由来

实例

单体应用被拆分成微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用三个独立的数据源,
业务操作需要调用三个服务来完成。此时每个服务内部的数据一致性由本地事务来保证, 但是全局的数据一致性问题没法保证。

在这里插入图片描述

总结一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题

5.2 Seata简介

Seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务.

官网:http://seata.io/zh-cn/

分布式处理事务过程的 ID+三组件模型

  • Transaction ID XID - 全局唯一的事务ID
  • TC (Transaction Coordinator) - 事务协调者

    维护全局和分支事务的状态,驱动全局事务提交或回滚。

  • TM (Transaction Manager) - 事务管理器

    定义全局事务的范围:开始全局事务、提交或回滚全局事务。

  • RM (Resource Manager) - 资源管理器

    管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

图解

在这里插入图片描述

  1. TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一 的XID;
  2. XID 在微服务调用链路的上下文中传播;
  3. RM向TC注册分支事务,将其纳入XID对应全局事务的管辖;
  4. TM向TC发起针对XID的全局提交或回滚决议;
  5. TC调度XID下管辖的全部分支事务完成提交或回滚请求。

Seata的简单使用(怎么玩)

在spring的时候是通过注解@Transactional 管理本地事务

在这里插入图片描述

5.3 Seata-server的下载与安装

下载地址: https://github.com/seata/seata/releases

  • 先从官网中下载好Seata-server并解压到文件夹
  • 先备份file.conf,然后修改file.conf

    • 修改service模块,1.0以后没有这个选项,就默认是my_test_tx_group分组

      vgroup_mapping.my_test_tx_group = “fsp_tx_group”

    • 修改store模块

      store {

      store mode: file、db

      mode = “db”

      database store

      db {

      1. ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
      2. datasource = "dbcp"
      3. ## mysql/oracle/h2/oceanbase etc.
      4. db-type = "mysql"
      5. driver-class-name = "com.mysql.jdbc.Driver"
      6. url = "jdbc:mysql://127.0.0.1:3306/seata"
      7. user = "root"
      8. password = "cvzhanshi"
      9. min-conn = 1
      10. max-conn = 3
      11. global.table = "global_table"
      12. branch.table = "branch_table"
      13. lock-table = "lock_table"
      14. query-limit = 100

      }
      }

  • 备份file.example.conf,然后修改file.example.conf,修改内容和file.conf一样
  • 备份registry.conf,修改文件

设置注册进nacos中

  1. type = "nacos"
  2. nacos {
  3. serverAddr = "localhost:8848"
  4. namespace = ""
  5. cluster = "default"
  6. }
  • 创建数据库和表

    创建seata数据库

    create database seata character set utf8;
    use seata;

    创建seata数据库需要的表(三张表)

    CREATE TABLE IF NOT EXISTS global_table
    (

    1. `xid` VARCHAR(128) NOT NULL,
    2. `transaction_id` BIGINT,
    3. `status` TINYINT NOT NULL,
    4. `application_id` VARCHAR(32),
    5. `transaction_service_group` VARCHAR(32),
    6. `transaction_name` VARCHAR(128),
    7. `timeout` INT,
    8. `begin_time` BIGINT,
    9. `application_data` VARCHAR(2000),
    10. `gmt_create` DATETIME,
    11. `gmt_modified` DATETIME,
    12. PRIMARY KEY (`xid`),
    13. KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    14. KEY `idx_transaction_id` (`transaction_id`)

    ) ENGINE = InnoDB
    DEFAULT CHARSET = utf8;

    CREATE TABLE IF NOT EXISTS branch_table
    (

    1. `branch_id` BIGINT NOT NULL,
    2. `xid` VARCHAR(128) NOT NULL,
    3. `transaction_id` BIGINT,
    4. `resource_group_id` VARCHAR(32),
    5. `resource_id` VARCHAR(256),
    6. `branch_type` VARCHAR(8),
    7. `status` TINYINT,
    8. `client_id` VARCHAR(64),
    9. `application_data` VARCHAR(2000),
    10. `gmt_create` DATETIME(6),
    11. `gmt_modified` DATETIME(6),
    12. PRIMARY KEY (`branch_id`),
    13. KEY `idx_xid` (`xid`)

    ) ENGINE = InnoDB
    DEFAULT CHARSET = utf8;

    CREATE TABLE IF NOT EXISTS lock_table
    (

    1. `row_key` VARCHAR(128) NOT NULL,
    2. `xid` VARCHAR(96),
    3. `transaction_id` BIGINT,
    4. `branch_id` BIGINT NOT NULL,
    5. `resource_id` VARCHAR(256),
    6. `table_name` VARCHAR(32),
    7. `pk` VARCHAR(36),
    8. `gmt_create` DATETIME,
    9. `gmt_modified` DATETIME,
    10. PRIMARY KEY (`row_key`),
    11. KEY `idx_branch_id` (`branch_id`)

    ) ENGINE = InnoDB
    DEFAULT CHARSET = utf8;

  • 启动nacos和seata测试

在这里插入图片描述

在这里插入图片描述

seate在nacos中注册成功,说明安装完成

5.4 订单/库存/账户业务数据库准备

业务说明

这里我们会创建三个服务,一个订单服务, 一个库存服务, 一个账户服务。

当用户下单时,会在订单服务中创建一个订单 ,然后通过远程调用库存服务来扣减下单商品的库存,
再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成。

该操作跨越三三个数据库,有两次远程调用,很明显会有分布式事务问题。

  • 创建三个数据库

    create database seata_order; —订单
    create database seata_storage; —库存
    create database seata_account; —账户

  • 在每个库下建表

seata_order

  1. CREATE TABLE t_order(
  2. `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
  3. `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
  4. `product_id` BIGINT(11)DEFAULT NULL COMMENT '产品id',
  5. `count` INT(11) DEFAULT NULL COMMENT '数量',
  6. `money` DECIMAL(11,0) DEFAULT NULL COMMENT '金额',
  7. `status` INT(1) DEFAULT NULL COMMENT '订单状态: 0:创建中; 1:已完结'
  8. )ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
  9. select * from t_order;
  10. --回滚日志
  11. CREATE TABLE `undo_log` (
  12. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  13. `branch_id` bigint(20) NOT NULL,
  14. `xid` varchar(100) NOT NULL,
  15. `context` varchar(128) NOT NULL,
  16. `rollback_info` longblob NOT NULL,
  17. `log_status` int(11) NOT NULL,
  18. `log_created` datetime NOT NULL,
  19. `log_modified` datetime NOT NULL,
  20. `ext` varchar(100) DEFAULT NULL,
  21. PRIMARY KEY (`id`),
  22. UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
  23. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

seata_storage

  1. CREATE TABLE t_storage(
  2. `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
  3. `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
  4. `total` INT(11) DEFAULT NULL COMMENT '总库存',
  5. `used` INT(11) DEFAULT NULL COMMENT '已用库存',
  6. `residue` INT(11) DEFAULT NULL COMMENT '剩余库存'
  7. )ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
  8. INSERT INTO t_storage(`id`,`product_id`,`total`,`used`,`residue`)VALUES('1','1','100','0','100');
  9. --回滚日志
  10. CREATE TABLE `undo_log` (
  11. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  12. `branch_id` bigint(20) NOT NULL,
  13. `xid` varchar(100) NOT NULL,
  14. `context` varchar(128) NOT NULL,
  15. `rollback_info` longblob NOT NULL,
  16. `log_status` int(11) NOT NULL,
  17. `log_created` datetime NOT NULL,
  18. `log_modified` datetime NOT NULL,
  19. `ext` varchar(100) DEFAULT NULL,
  20. PRIMARY KEY (`id`),
  21. UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
  22. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

seata_account

  1. CREATE TABLE t_account(
  2. `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',
  3. `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
  4. `total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
  5. `used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
  6. `residue` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度'
  7. )ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
  8. INSERT INTO t_account(`id`,`user_id`,`total`,`used`,`residue`)VALUES('1','1','1000','0','1000');
  9. --回滚日志
  10. CREATE TABLE `undo_log` (
  11. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  12. `branch_id` bigint(20) NOT NULL,
  13. `xid` varchar(100) NOT NULL,
  14. `context` varchar(128) NOT NULL,
  15. `rollback_info` longblob NOT NULL,
  16. `log_status` int(11) NOT NULL,
  17. `log_created` datetime NOT NULL,
  18. `log_modified` datetime NOT NULL,
  19. `ext` varchar(100) DEFAULT NULL,
  20. PRIMARY KEY (`id`),
  21. UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
  22. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

结果:

在这里插入图片描述

5.5 订单/库存/账户业务微服务准备

业务需求:下订单->减库存->扣余额->改(订单)状态

5.5.1 订单模块

  • 新建模块seata-order-service2001
  • 导入依赖




    com.alibaba.cloud
    spring-cloud-starter-alibaba-nacos-discovery



    com.alibaba.cloud
    spring-cloud-starter-alibaba-seata


    io.seata
    seata-all


    io.seata
    seata-spring-boot-starter




    io.seata
    seata-all
    1.2.0


    io.seata
    seata-spring-boot-starter
    1.2.0


    org.springframework.cloud
    spring-cloud-starter-openfeign


    org.mybatis.spring.boot
    mybatis-spring-boot-starter



    com.alibaba
    druid-spring-boot-starter


    mysql
    mysql-connector-java



    org.springframework.boot
    spring-boot-starter-jdbc


    org.springframework.boot
    spring-boot-starter-web


    org.springframework.boot
    spring-boot-starter-actuator


    org.projectlombok
    lombok
    true


    org.springframework.boot
    spring-boot-starter-test
    test

  • 编写配置文件

    server:
    port: 2001

    spring:
    application:

    1. name: seata-order-service

    cloud:

    1. alibaba:
    2. seata:
    3. # 自定义事务组名称需要与seata-server中的对应
    4. tx-service-group: my_test_tx_group #因为seata的file.conf文件中没有service模块,事务组名默认为my_test_tx_group
    5. #service要与tx-service-group对齐,vgroupMapping和grouplist在service的下一级,my_test_tx_group在再下一级
    6. service:
    7. vgroupMapping:
    8. #要和tx-service-group的值一致
    9. my_test_tx_group: default
    10. grouplist:
    11. # seata seaver的 地址配置,此处可以集群配置是个数组
    12. default: 192.168.1.104:8091
    13. nacos:
    14. discovery:
    15. server-addr: localhost:8848 #nacos

    datasource:

    1. # 当前数据源操作类型
    2. type: com.alibaba.druid.pool.DruidDataSource
    3. # mysql驱动类
    4. driver-class-name: com.mysql.jdbc.Driver
    5. url: jdbc:mysql://localhost:3306/seata_order
    6. username: root
    7. password: lian0911

    feign:
    hystrix:

    1. enabled: false

    logging:
    level:

    1. io:
    2. seata: info

    mybatis:
    mapperLocations: classpath:mapper/.xml

  • 编写 file.conf 文件

内容在解压的seata中从conf中的README.md中有链接

  1. transport {
  2. # tcp, unix-domain-socket
  3. type = "TCP"
  4. #NIO, NATIVE
  5. server = "NIO"
  6. #enable heartbeat
  7. heartbeat = true
  8. # the client batch send request enable
  9. enableClientBatchSendRequest = true
  10. #thread factory for netty
  11. threadFactory {
  12. bossThreadPrefix = "NettyBoss"
  13. workerThreadPrefix = "NettyServerNIOWorker"
  14. serverExecutorThread-prefix = "NettyServerBizHandler"
  15. shareBossWorker = false
  16. clientSelectorThreadPrefix = "NettyClientSelector"
  17. clientSelectorThreadSize = 1
  18. clientWorkerThreadPrefix = "NettyClientWorkerThread"
  19. # netty boss thread size
  20. bossThreadSize = 1
  21. #auto default pin or 8
  22. workerThreadSize = "default"
  23. }
  24. shutdown {
  25. # when destroy server, wait seconds
  26. wait = 3
  27. }
  28. serialization = "seata"
  29. compressor = "none"
  30. }
  31. service {
  32. #transaction service group mapping
  33. vgroupMapping.my_test_tx_group = "default"
  34. #only support when registry.type=file, please don't set multiple addresses
  35. default.grouplist = "127.0.0.1:8091"
  36. #degrade, current not support
  37. enableDegrade = false
  38. #disable seata
  39. disableGlobalTransaction = false
  40. }
  41. client {
  42. rm {
  43. asyncCommitBufferLimit = 10000
  44. lock {
  45. retryInterval = 10
  46. retryTimes = 30
  47. retryPolicyBranchRollbackOnConflict = true
  48. }
  49. reportRetryCount = 5
  50. tableMetaCheckEnable = false
  51. tableMetaCheckerInterval = 60000
  52. reportSuccessEnable = false
  53. sagaBranchRegisterEnable = false
  54. sagaJsonParser = jackson
  55. sagaRetryPersistModeUpdate = false
  56. sagaCompensatePersistModeUpdate = false
  57. tccActionInterceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
  58. }
  59. tm {
  60. commitRetryCount = 5
  61. rollbackRetryCount = 5
  62. defaultGlobalTransactionTimeout = 60000
  63. degradeCheck = false
  64. degradeCheckPeriod = 2000
  65. degradeCheckAllowTimes = 10
  66. interceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
  67. }
  68. undo {
  69. dataValidation = true
  70. onlyCareUpdateColumns = true
  71. logSerialization = "jackson"
  72. logTable = "undo_log"
  73. compress {
  74. enable = true
  75. # allow zip, gzip, deflater, 7z, lz4, bzip2, default is zip
  76. type = zip
  77. # if rollback info size > threshold, then will be compress
  78. # allow k m g t
  79. threshold = 64k
  80. }
  81. }
  82. loadBalance {
  83. type = "RandomLoadBalance"
  84. virtualNodes = 10
  85. }
  86. }
  87. log {
  88. exceptionRate = 100
  89. }
  • 编写 registry.conf

内容和file.conf一样查找

  1. registry {
  2. # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom
  3. type = "nacos"
  4. nacos {
  5. application = "seata-server"
  6. serverAddr = "127.0.0.1:8848"
  7. group = "SEATA_GROUP"
  8. namespace = ""
  9. username = ""
  10. password = ""
  11. }
  12. eureka {
  13. serviceUrl = "http://localhost:8761/eureka"
  14. weight = "1"
  15. }
  16. redis {
  17. serverAddr = "localhost:6379"
  18. db = "0"
  19. password = ""
  20. timeout = "0"
  21. }
  22. zk {
  23. serverAddr = "127.0.0.1:2181"
  24. sessionTimeout = 6000
  25. connectTimeout = 2000
  26. username = ""
  27. password = ""
  28. }
  29. consul {
  30. serverAddr = "127.0.0.1:8500"
  31. aclToken = ""
  32. }
  33. etcd3 {
  34. serverAddr = "http://localhost:2379"
  35. }
  36. sofa {
  37. serverAddr = "127.0.0.1:9603"
  38. region = "DEFAULT_ZONE"
  39. datacenter = "DefaultDataCenter"
  40. group = "SEATA_GROUP"
  41. addressWaitTime = "3000"
  42. }
  43. file {
  44. name = "file.conf"
  45. }
  46. custom {
  47. name = ""
  48. }
  49. }
  50. config {
  51. # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig、custom
  52. type = "file"
  53. nacos {
  54. serverAddr = "127.0.0.1:8848"
  55. namespace = ""
  56. group = "SEATA_GROUP"
  57. username = ""
  58. password = ""
  59. dataId = "seata.properties"
  60. }
  61. consul {
  62. serverAddr = "127.0.0.1:8500"
  63. aclToken = ""
  64. }
  65. apollo {
  66. appId = "seata-server"
  67. apolloMeta = "http://192.168.1.204:8801"
  68. namespace = "application"
  69. apolloAccesskeySecret = ""
  70. }
  71. zk {
  72. serverAddr = "127.0.0.1:2181"
  73. sessionTimeout = 6000
  74. connectTimeout = 2000
  75. username = ""
  76. password = ""
  77. nodePath = "/seata/seata.properties"
  78. }
  79. etcd3 {
  80. serverAddr = "http://localhost:2379"
  81. }
  82. file {
  83. name = "file.conf"
  84. }
  85. custom {
  86. name = ""
  87. }
  88. }
  • domain包实体类编写

Order

  1. @Data
  2. @AllArgsConstructor
  3. @NoArgsConstructor
  4. public class Order {
  5. private Long id;
  6. private Long userId;
  7. private Long productId;
  8. private Integer count;
  9. private BigDecimal money;
  10. private Integer status; // 订单状态 0:创建中 1:已完结
  11. }

CommonResult

  1. /**
  2. * @author cVzhanshi
  3. * @create 2021-07-12 21:30
  4. */
  5. @Data
  6. @AllArgsConstructor
  7. @NoArgsConstructor
  8. public class CommonResult<T> {
  9. private Integer code;
  10. private String message;
  11. private T data;
  12. public CommonResult(Integer code, String message) {
  13. this(code, message, null);
  14. }
  15. }
  • dao层的编写

OrderDao

  1. /**
  2. * @author cVzhanshi
  3. * @create 2021-07-12 21:33
  4. */
  5. @Mapper
  6. @Repository
  7. public interface OrderDao {
  8. //1 新建订单
  9. void create(Order order);
  10. //2 修改订单状态,从0改为1
  11. void update(@Param("userId") Long userId, @Param("status") Integer status);
  12. }

对应的映射文件classpath:mapper/OrderMapper.xml

  1. <mapper namespace="cn.cvzhanshi.springcloud.alibaba.dao.OrderDao">
  2. <resultMap id="BaseResultMap" type="cn.cvzhanshi.springcloud.alibaba.domain.Order">
  3. <id column="id" property="id" jdbcType="BIGINT" />
  4. <result column="user_id" property="userId" jdbcType="BIGINT" />
  5. <result column="product_id" property="productId" jdbcType="BIGINT" />
  6. <result column="count" property="count" jdbcType="INTEGER" />
  7. <result column="money" property="money" jdbcType="DECIMAL" />
  8. <result column="status" property="status" jdbcType="INTEGER" />
  9. </resultMap>
  10. <insert id="create" parameterType="cn.cvzhanshi.springcloud.alibaba.domain.Order"
  11. useGeneratedKeys="true" keyProperty="id">
  12. insert into t_order(`user_id`, `product_id`, `count`, `money`, `status`)
  13. values(#{userId}, #{productId}, #{count}, #{money}, 0);
  14. </insert>
  15. <update id="update" parameterType="cn.cvzhanshi.springcloud.alibaba.domain.Order">
  16. update t_order set `status` = 1
  17. where `user_id` = #{userId} and `status` = #{status};
  18. </update>
  19. </mapper>
  • service层的编写

AccountService

  1. /**
  2. * @author cVzhanshi
  3. * @create 2021-07-12 21:57
  4. */
  5. @Service
  6. @FeignClient(value = "seata-account-service")
  7. public interface AccountService {
  8. @PostMapping(value = "/account/decrease")
  9. CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
  10. }

StorageService

  1. /**
  2. * @author cVzhanshi
  3. * @create 2021-07-12 21:56
  4. */
  5. @Service
  6. @FeignClient(value = "seata-storage-service")
  7. public interface StorageService {
  8. //减库存
  9. @PostMapping(value = "/storage/decrease")
  10. CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
  11. }

OrderService

  1. /**
  2. * @author cVzhanshi
  3. * @create 2021-07-12 21:57
  4. */
  5. public interface OrderService {
  6. void create(Order order);
  7. }

OrderServiceImpl

  1. /**
  2. * @author cVzhanshi
  3. * @create 2021-07-12 22:04
  4. */
  5. @Slf4j
  6. @Service
  7. public class OrderServiceImpl implements OrderService {
  8. @Autowired
  9. private OrderDao orderDao;
  10. @Autowired
  11. private AccountService accountService;
  12. @Autowired
  13. private StorageService storageService;
  14. @Override
  15. public void create(Order order) {
  16. //1. 新建订单
  17. log.info("-------> 开始新建订单");
  18. orderDao.create(order);
  19. //2. 扣减库存
  20. log.info("-------> 订单微服务开始调用库存,做扣减count");
  21. storageService.decrease(order.getProductId(), order.getCount());
  22. log.info("-------> 订单微服务开始调用库存,做扣减完成");
  23. //3. 扣减账号余额
  24. log.info("-------> 订单微服务开始调用账号,做扣减money");
  25. accountService.decrease(order.getUserId(), order.getMoney());
  26. log.info("-------> 订单微服务开始调用账号,做扣减完成");
  27. //4、修改状态
  28. log.info("-------> 修改订单状态");
  29. orderDao.update(order.getUserId(), 0);
  30. log.info("-------> 修改订单状态完成");
  31. log.info("-------> 新建订单完成");
  32. }
  33. }
  • controller层的编写

    /**

    • @author cVzhanshi
    • @create 2021-07-12 22:56
      */
      @RestController
      public class OrderController {

      @Autowired
      private OrderService orderService;

      @GetMapping(“/order/create”)
      public CommonResult create(Order order){

      1. orderService.create(order);
      2. return new CommonResult(200, "订单创建成功!");

      }
      }

  • config配置类的编写

MybatisConfig

  1. /**
  2. * @author cVzhanshi
  3. * @create 2021-07-12 22:58
  4. */
  5. @MapperScan("cn.cvzhanshi.springcloud.alibaba.dao")
  6. @Configuration
  7. public class MybatisConfig {
  8. }

DataSourceProxyConfig

  1. /**
  2. * @author cVzhanshi
  3. * @create 2021-07-12 22:58
  4. */
  5. //使用Seata对数据源进行代理
  6. @Configuration
  7. public class DataSourceProxyConfig {
  8. @Value("${mybatis.mapperLocations}")
  9. private String mapperLocations;
  10. @Bean
  11. @ConfigurationProperties(prefix = "spring.datasource")
  12. public DataSource druidDataSource() {
  13. return new DruidDataSource();
  14. }
  15. @Bean
  16. public DataSourceProxy dataSourceProxy(DataSource druidDataSource) {
  17. return new DataSourceProxy(druidDataSource);
  18. }
  19. @Bean
  20. public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
  21. SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
  22. bean.setDataSource(dataSourceProxy);
  23. ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
  24. bean.setMapperLocations(resolver.getResources(mapperLocations));
  25. return bean.getObject();
  26. }
  27. }
  • 主启动类

    /**

    • @author cVzhanshi
    • @create 2021-07-12 23:02
      */
      @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) //取消数据源的自动创建
      @EnableDiscoveryClient
      @EnableFeignClients
      public class SeataOrderMain2001 {

      public static void main(String[] args) {

      1. SpringApplication.run(SeataOrderMain2001.class,args);

      }
      }

启动测试

  • 出现错误

在这里插入图片描述

  • 出错原因

seata 版本问题, spring-cloud-alibaba 依赖下的seata-spring-boot-starter 版本是1.1.0,而我用的seata是 1.2.0 版本 。
1.2.0 版本SeataAutoConfiguration.class 没有seataDataSourceBeanPostProcessor bean ,而是用SeataAutoDataSourceProxyCreator代替

  • 解决方法

去掉spring-cloud-starter-alibaba-seata 下seata-spring-boot-starter 依赖

  1. <dependency>
  2. <groupId>com.alibaba.cloud</groupId>
  3. <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
  4. <exclusions>
  5. <exclusion>
  6. <groupId>io.seata</groupId>
  7. <artifactId>seata-all</artifactId>
  8. </exclusion>
  9. <exclusion>
  10. <groupId>io.seata</groupId>
  11. <artifactId>seata-spring-boot-starter</artifactId>
  12. </exclusion>
  13. <exclusion>
  14. <artifactId>spring-boot-starter</artifactId>
  15. <groupId>org.springframework.boot</groupId>
  16. </exclusion>
  17. </exclusions>
  18. </dependency>
  19. <dependency>
  20. <groupId>io.seata</groupId>
  21. <artifactId>seata-all</artifactId>
  22. <version>1.2.0</version>
  23. </dependency>
  • 问题解决,继续测试

在这里插入图片描述

5.5.2 库存模块

  • 新建模块seata-storage-service2002
  • 导入依赖和2001一样直接拷贝
  • 编写配置文件

    server:
    port: 2002

    spring:
    application:

    1. name: seata-storage-service

    cloud:

    1. alibaba:
    2. seata:
    3. # 自定义事务组名称需要与seata-server中的对应
    4. tx-service-group: my_test_tx_group #因为seata的file.conf文件中没有service模块,事务组名默认为my_test_tx_group
    5. #service要与tx-service-group对齐,vgroupMapping和grouplist在service的下一级,my_test_tx_group在再下一级
    6. service:
    7. vgroupMapping:
    8. #要和tx-service-group的值一致
    9. my_test_tx_group: default
    10. grouplist:
    11. # seata seaver的 地址配置,此处可以集群配置是个数组
    12. default: 192.168.1.104:8091
    13. nacos:
    14. discovery:
    15. server-addr: localhost:8848 #nacos

    datasource:

    1. # 当前数据源操作类型
    2. type: com.alibaba.druid.pool.DruidDataSource
    3. # mysql驱动类
    4. driver-class-name: com.mysql.jdbc.Driver
    5. url: jdbc:mysql://localhost:3306/seata_storage
    6. username: root
    7. password: lian0911

    feign:
    hystrix:

    1. enabled: false

    logging:
    level:

    1. io:
    2. seata: info

    mybatis:
    mapperLocations: classpath:mapper/*.xml

  • file.conf 和registry.conf 与2001一样,直接拷贝

  • domain层实体类

CommonResult

  1. @Data
  2. @AllArgsConstructor
  3. @NoArgsConstructor
  4. public class CommonResult<T> {
  5. private Integer code;
  6. private String message;
  7. private T data;
  8. public CommonResult(Integer code, String message) {
  9. this(code, message, null);
  10. }
  11. }

Storage

  1. @Data
  2. @AllArgsConstructor
  3. @NoArgsConstructor
  4. public class Storage {
  5. private Long id;
  6. //产品id
  7. private Long productId;
  8. //总库存
  9. private Integer total;
  10. //已用库存
  11. private Integer used;
  12. //剩余库存
  13. private Integer residue;
  14. }
  • dao层的编写

StorageDao

  1. /**
  2. * @author cVzhanshi
  3. * @create 2021-07-13 0:17
  4. */
  5. @Mapper
  6. @Repository
  7. public interface StorageDao {
  8. void decrease(@Param("productId") Long productId, @Param("count") Integer count);
  9. }

对应的映射文件classpath:mapper/StorageMapper.xml

  1. <mapper namespace="com.angenin.springcloud.dao.StorageDao">
  2. <resultMap id="BaseResultMap" type="com.angenin.springcloud.domain.Storage">
  3. <id column="id" property="id" jdbcType="BIGINT"/>
  4. <result column="product_id" property="productId" jdbcType="BIGINT"/>
  5. <result column="total" property="total" jdbcType="INTEGER"/>
  6. <result column="used" property="used" jdbcType="INTEGER"/>
  7. <result column="residue" property="residue" jdbcType="INTEGER"/>
  8. </resultMap>
  9. <update id="decrease">
  10. update t_storage
  11. set used = used + #{count}, residue = residue - #{count}
  12. where product_id= #{productId};
  13. </update>
  14. </mapper>
  • service层的编写

StorageService

  1. /**
  2. * @author cVzhanshi
  3. * @create 2021-07-13 0:22
  4. */
  5. public interface StorageService {
  6. /**
  7. * 扣减库存
  8. * @param productId
  9. * @param count
  10. */
  11. void decrease(Long productId, Integer count);
  12. }

StorageServiceImpl

  1. /**
  2. * @author cVzhanshi
  3. * @create 2021-07-13 0:23
  4. */
  5. @Service
  6. public class StorageServiceImpl implements StorageService {
  7. private static final Logger LOGGER = LoggerFactory.getLogger(StorageServiceImpl.class);
  8. @Resource
  9. private StorageDao storageDao;
  10. @Override
  11. public void decrease(Long productId, Integer count) {
  12. LOGGER.info("----> StorageService中扣减库存");
  13. storageDao.decrease(productId, count);
  14. LOGGER.info("----> StorageService中扣减库存完成");
  15. }
  16. }
  • controller层的编写

StorageController

  1. @RestController
  2. public class StorageController {
  3. @Resource
  4. private StorageService storageService;
  5. @RequestMapping("/storage/decrease")
  6. public CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count){
  7. storageService.decrease(productId, count);
  8. return new CommonResult(200, "扣减库存成功!");
  9. }
  10. }
  • 配置类 MyBatisConfig 和 DataSourceProxyConfig 与2001一样,直接copy
  • 主启动类

    /**

    • @author cVzhanshi
    • @create 2021-07-13 0:29
      */
      @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
      @EnableFeignClients
      @EnableDiscoveryClient
      public class SeataStorageMain2002 {
  1. public static void main(String[] args) {
  2. SpringApplication.run(SeataStorageMain2002.class,args);
  3. }
  4. }
  • 启动测试

在这里插入图片描述

5.5.3 账户模块

  • 新建模块seata-account-service2003
  • 导入依赖和2001一样直接拷贝
  • 编写配置文件

    server:
    port: 2003

    spring:
    application:

    1. name: seata-account-service

    cloud:

    1. alibaba:
    2. seata:
    3. # 自定义事务组名称需要与seata-server中的对应
    4. tx-service-group: my_test_tx_group #因为seata的file.conf文件中没有service模块,事务组名默认为my_test_tx_group
    5. #service要与tx-service-group对齐,vgroupMapping和grouplist在service的下一级,my_test_tx_group在再下一级
    6. service:
    7. vgroupMapping:
    8. #要和tx-service-group的值一致
    9. my_test_tx_group: default
    10. grouplist:
    11. # seata seaver的 地址配置,此处可以集群配置是个数组
    12. default: 192.168.1.104:8091
    13. nacos:
    14. discovery:
    15. server-addr: localhost:8848 #nacos

    datasource:

    1. # 当前数据源操作类型
    2. type: com.alibaba.druid.pool.DruidDataSource
    3. # mysql驱动类
    4. driver-class-name: com.mysql.jdbc.Driver
    5. url: jdbc:mysql://localhost:3306/seata_account
    6. username: root
    7. password: lian0911

    feign:
    hystrix:

    1. enabled: false

    logging:
    level:

    1. io:
    2. seata: info

    mybatis:
    mapperLocations: classpath:mapper/*.xml

  • file.conf 和registry.conf 与2001一样,直接拷贝

  • domain层实体类

CommonResult

  1. @Data
  2. @AllArgsConstructor
  3. @NoArgsConstructor
  4. public class CommonResult<T> {
  5. private Integer code;
  6. private String message;
  7. private T data;
  8. public CommonResult(Integer code, String message) {
  9. this(code, message, null);
  10. }
  11. }

Account

  1. /**
  2. * @author cVzhanshi
  3. * @create 2021-07-13 0:37
  4. */
  5. @Data
  6. @AllArgsConstructor
  7. @NoArgsConstructor
  8. public class Account {
  9. private Long id;
  10. /**
  11. * 用户id
  12. */
  13. private Long userId;
  14. /**
  15. * 总额度
  16. */
  17. private Integer total;
  18. /**
  19. * 已用额度
  20. */
  21. private Integer used;
  22. /**
  23. * 剩余额度
  24. */
  25. private Integer residue;
  26. }
  • dao 层的编写

AccountDao

  1. @Mapper
  2. @Repository
  3. public interface AccountDao {
  4. void decrease(@Param("userId") Long userId, @Param("money") BigDecimal money);
  5. }

对应的映射文件classpath:mapper/ AccountMapper.xml

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
  3. "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
  4. <mapper namespace="cn.cvzhanshi.springcloud.alibaba.dao.AccountDao">
  5. <resultMap id="BaseResultMap" type="cn.cvzhanshi.springcloud.alibaba.domain.Account">
  6. <id column="id" property="id" jdbcType="BIGINT"/>
  7. <result column="user_id" property="userId" jdbcType="BIGINT"/>
  8. <result column="total" property="total" jdbcType="DECIMAL"/>
  9. <result column="used" property="used" jdbcType="DECIMAL"/>
  10. <result column="residue" property="residue" jdbcType="DECIMAL"/>
  11. </resultMap>
  12. <update id="decrease">
  13. update t_account
  14. set used = used + #{money}, residue = residue - #{money}
  15. where user_id = #{userId};
  16. </update>
  17. </mapper>
  • service层的编写

AccountService

  1. public interface AccountService {
  2. void decrease(Long userId, BigDecimal money);
  3. }

AccountServiceImpl

  1. /**
  2. * @author cVzhanshi
  3. * @create 2021-07-13 0:43
  4. */
  5. @Service
  6. public class AccountServiceImpl implements AccountService {
  7. private static final Logger LOGGER = LoggerFactory.getLogger(AccountServiceImpl.class);
  8. @Resource
  9. private AccountDao accountDao;
  10. @Override
  11. public void decrease(Long userId, BigDecimal money) {
  12. LOGGER.info("---> AccountService中扣减账户余额");
  13. accountDao.decrease(userId, money);
  14. LOGGER.info("---> AccountService中扣减账户余额完成");
  15. }
  16. }
  • controller层的编写

    /**

    • @author cVzhanshi
    • @create 2021-07-13 0:44
      */
      @RestController
      public class AccountController {
  1. @Autowired
  2. private AccountService accountService;
  3. @RequestMapping("/account/decrease")
  4. public CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money){
  5. accountService.decrease(userId, money);
  6. return new CommonResult(200, "扣减库存成功!");
  7. }
  8. }
  • 主启动类

    /**

    • @author cVzhanshi
    • @create 2021-07-13 0:46
      */
      @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
      @EnableFeignClients
      @EnableDiscoveryClient
      public class SeataAccountMain2003 {
  1. public static void main(String[] args) {
  2. SpringApplication.run(SeataAccountMain2003.class,args);
  3. }
  4. }
  • 启动测试

在这里插入图片描述

环境准备完毕


5.5.4 Test

  • 正常下单
  • 启动nacos、seata、2001、2002、2003
  • 访问 http://localhost:2001/order/create?userId=1&productId=1&count=10&money=10

在这里插入图片描述

  • 查看数据库中的数据变化

order表

在这里插入图片描述

account表

在这里插入图片描述

storage表

在这里插入图片描述

输入日志

在这里插入图片描述

  • 超时异常
  • 停止2003
  • 在2003的AccountServiceImpl里的decrease中添加异常代码

    @Override
    public void decrease(Long userId, BigDecimal money) {

    LOGGER.info(“—-> AccountService中扣减账户余额”);
    //模拟超时异常,暂停20秒
    try {

    1. TimeUnit.SECONDS.sleep(20);

    } catch (InterruptedException e) {

    1. e.printStackTrace();

    }
    accountDao.decrease(userId, money);
    LOGGER.info(“—-> AccountService中扣减账户余额完成”);
    }

  • 重新启动2003

  • 继续访问http://localhost:2001/order/create?userId=1&productId=1&count=10&money=10

在这里插入图片描述

  • 查看数据库

storage表
在这里插入图片描述

account表

在这里插入图片描述

order表

在这里插入图片描述

故障原因:当库存和账户金额扣减后,订单状态并没有设置为已经完成,没有从零改为1
而且由于feign的重试机制,账户余额还有可能被多次扣减

  • 超市异常,添加 @GlobalTransactional 注解
  • 停止2001
  • 在2001的OderServiceImpl里的create方法上加上 @GlobalTransactional

    @Override
    //name随便命名,只要不重复即可
    //rollbackFor = Exception.class表示出现所有异常都回滚
    //rollbackFor表示哪些需要回滚
    //noRollbackFor表示哪些不需要回滚
    @GlobalTransactional(name = “test-create-order”, rollbackFor = Exception.class)
    public void create(Order order) {

    1. }
  • 重启2001。

  • 刷新页面http://localhost:2001/order/create?userId=1&productId=1&count=10&money=10

在这里插入图片描述

  • 查看数据库

order表

在这里插入图片描述

account表

在这里插入图片描述

storage表

在这里插入图片描述

order表没有插入,相应的账户表和库存表都没有改变,回滚成功

5.6 Seata的再理解

5.6.1 图解TC/TM/RM三组件

  • 官方图解

在这里插入图片描述

  • 自身理解

在这里插入图片描述

5.6.2 分布式事务的执行流程

  • TM开启分布式事务(TM 向TC注册全局事务记录) ;
  • 按业务场景,编排数据库、服务等事务内资源(RM向TC汇报资源准备状态) ;
  • TM结束分布式事务,事务一阶段结束(TM通知TC提交/回滚分布式事务) ;
  • TC汇总事务信息,决定分布式事务是提交还是回滚;
  • TC通知所有RM提交/回滚资源,事务二阶段结束。

模式分类

  • AT 模式

    提供无侵入自动补偿的事务模式,目前已支持 MySQL、 Oracle 、PostgreSQL和 TiDB的AT模式,H2 开发中

  • TCC 模式

    支持 TCC 模式并可与 AT 混用,灵活度更高

  • SAGA 模式

    为长事务提供有效的解决方案


AT模式详解

整体机制 两阶段提交协议的演变:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。

在一阶段, Seata 会拦截“业务SQL”,
1、解析SQL语义,找到“业务SQL”要更新的业务数据,在业务数据被更新前,将其保存成”before image”,
2、执行“业务SQL”更新业务数据,在业务数据更新之后,
3、其保存成”after image” ,最后生成行锁。
以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

在这里插入图片描述

  • 二阶段:

    • 提交异步化,非常快速地完成。

    二阶段如是顺利提交的话,
    因为“业务SQL”在一阶段已经提交至数据库,所以Seata框架只需将一阶段保存的快照数据和行锁删掉, 完成数据清理即可。

    在这里插入图片描述

    • 回滚通过一阶段的回滚日志进行反向补偿

    二阶段回滚: .
    二阶段如果是回滚的话, Seata就需要回滚一阶段已经执行的“业务SQL” ,还原业务数据。
    回滚方式便是用”before image”还原业务数据;但在还原前要首先要校验脏写,对比”数据库当前业务数据”和”after image”,如果两份数据完全一致就说明没有脏写, 可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。

    在这里插入图片描述


简略流程图
在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,28人围观)

还没有评论,来说两句吧...

相关阅读