TX-LCN分布式事务之LCN模式

谁践踏了优雅 2023-10-02 12:53 59阅读 0赞

什么是LCN模式

LCN模式是TX-LCN分布式事务模式的一种,L-lock-锁定事务单元、C-confirm-确认事务模块状态、 notify-通知事务单元

原理

LCN模式是通过Spring AOP的方式代理Connection的方式实现对本地事务的操作,然后在由TxManager统一协调控制事务。 当本地事务提交回滚或者关闭连接时将会执行假操作,该代理的连接将由LCN连接池管理。

模式特点

  • 该模式对代码的嵌入性为低。
  • 该模式仅限于本地存在连接对象且可通过连接对象控制事务的模块。
  • 该模式下的事务提交与回滚是由本地事务方控制,对于数据一致性上有较高的保障。
  • 该模式缺陷在于代理的连接需要随事务发起方一共释放连接,增加了连接占用的时间。

源码解读

首先我们来看几个关键的类DataSourceAspect-数据源切面类、TransactionAspect事务切面类、LcnConnectionProxylcn 连接代理类、DTXLogicWeaver分布式事务调度器、DTXServiceExecutor分布式事务执行器

DataSourceAspect的作用

  • 源码

    @Aspect
    @Component
    public class DataSourceAspect implements Ordered {

    1. private static final Logger log = LoggerFactory.getLogger(DataSourceAspect.class);
    2. private final TxClientConfig txClientConfig;
    3. private final DTXResourceWeaver dtxResourceWeaver;
    4. public DataSourceAspect(TxClientConfig txClientConfig, DTXResourceWeaver dtxResourceWeaver) {
    5. this.txClientConfig = txClientConfig;
    6. this.dtxResourceWeaver = dtxResourceWeaver;
    7. }
    8. @Around("execution(* javax.sql.DataSource.getConnection(..))")
    9. public Object around(ProceedingJoinPoint point) throws Throwable {
    10. return this.dtxResourceWeaver.getConnection(() -> {
    11. return (Connection)point.proceed();
    12. });
    13. }
    14. public int getOrder() {
    15. return this.txClientConfig.getResourceOrder();
    16. }

    }

由该类的源码,我们能够知道,lcn模式主要对数据库的连接进行了拦截代理。获取到数据库的连接交由lcn 来进行代理。

TransactionAspect 作用

  • 源码

    @Aspect
    @Component
    public class TransactionAspect implements Ordered {

    1. private static final Logger log = LoggerFactory.getLogger(TransactionAspect.class);
    2. private final TxClientConfig txClientConfig;
    3. private final DTXLogicWeaver dtxLogicWeaver;
    4. public TransactionAspect(TxClientConfig txClientConfig, DTXLogicWeaver dtxLogicWeaver) {
    5. this.txClientConfig = txClientConfig;
    6. this.dtxLogicWeaver = dtxLogicWeaver;
    7. }
    8. @Pointcut("@annotation(com.codingapi.txlcn.tc.annotation.LcnTransaction)")
    9. public void lcnTransactionPointcut() {
    10. }
    11. @Around("lcnTransactionPointcut() && !txcTransactionPointcut()&& !tccTransactionPointcut() && !txTransactionPointcut()")
    12. public Object runWithLcnTransaction(ProceedingJoinPoint point) throws Throwable {
    13. DTXInfo dtxInfo = DTXInfo.getFromCache(point);
    14. LcnTransaction lcnTransaction = (LcnTransaction)dtxInfo.getBusinessMethod().getAnnotation(LcnTransaction.class);
    15. dtxInfo.setTransactionType("lcn");
    16. dtxInfo.setTransactionPropagation(lcnTransaction.propagation());
    17. DTXLogicWeaver var10000 = this.dtxLogicWeaver;
    18. point.getClass();
    19. return var10000.runTransaction(dtxInfo, point::proceed);
    20. }
    21. public int getOrder() {
    22. return this.txClientConfig.getDtxAspectOrder();
    23. }

    }

由该类的源码,我们能够明白,通过解析@LcnTransaction注解进行相应的操作。代码会调用到DTXLogicWeaver

DTXLogicWeaver 作用

  1. public Object runTransaction(DTXInfo dtxInfo, BusinessCallback business) throws Throwable {
  2. if (Objects.isNull(DTXLocalContext.cur())) {
  3. DTXLocalContext.getOrNew();
  4. log.debug("<---- TxLcn start ---->");
  5. DTXLocalContext dtxLocalContext = DTXLocalContext.getOrNew();
  6. TxContext txContext;
  7. if (this.globalContext.hasTxContext()) {
  8. txContext = this.globalContext.txContext();
  9. dtxLocalContext.setInGroup(true);
  10. log.debug("Unit[{}] used parent's TxContext[{}].", dtxInfo.getUnitId(), txContext.getGroupId());
  11. } else {
  12. txContext = this.globalContext.startTx();
  13. }
  14. if (Objects.nonNull(dtxLocalContext.getGroupId())) {
  15. dtxLocalContext.setDestroy(false);
  16. }
  17. dtxLocalContext.setUnitId(dtxInfo.getUnitId());
  18. dtxLocalContext.setGroupId(txContext.getGroupId());
  19. dtxLocalContext.setTransactionType(dtxInfo.getTransactionType());
  20. TxTransactionInfo info = new TxTransactionInfo();
  21. info.setBusinessCallback(business);
  22. info.setGroupId(txContext.getGroupId());
  23. info.setUnitId(dtxInfo.getUnitId());
  24. info.setPointMethod(dtxInfo.getBusinessMethod());
  25. info.setPropagation(dtxInfo.getTransactionPropagation());
  26. info.setTransactionInfo(dtxInfo.getTransactionInfo());
  27. info.setTransactionType(dtxInfo.getTransactionType());
  28. info.setTransactionStart(txContext.isDtxStart());
  29. boolean var15 = false;
  30. Object var6;
  31. try {
  32. var15 = true;
  33. var6 = this.transactionServiceExecutor.transactionRunning(info);
  34. var15 = false;
  35. } finally {
  36. if (var15) {
  37. if (dtxLocalContext.isDestroy()) {
  38. synchronized(txContext.getLock()) {
  39. txContext.getLock().notifyAll();
  40. }
  41. if (!dtxLocalContext.isInGroup()) {
  42. this.globalContext.destroyTx();
  43. }
  44. DTXLocalContext.makeNeverAppeared();
  45. TracingContext.tracing().destroy();
  46. }
  47. log.debug("<---- TxLcn end ---->");
  48. }
  49. }
  50. if (dtxLocalContext.isDestroy()) {
  51. synchronized(txContext.getLock()) {
  52. txContext.getLock().notifyAll();
  53. }
  54. if (!dtxLocalContext.isInGroup()) {
  55. this.globalContext.destroyTx();
  56. }
  57. DTXLocalContext.makeNeverAppeared();
  58. TracingContext.tracing().destroy();
  59. }
  60. log.debug("<---- TxLcn end ---->");
  61. return var6;
  62. } else {
  63. return business.call();
  64. }
  65. }

以上代码是该类的核心逻辑,可以看出来TX-LCN事务的处理全部都是走的这个类的该方法,最终会调用到DTXServiceExecutor分布式事务执行器

DTXServiceExecutor 作用

  1. /**
  2. * 事务业务执行
  3. *
  4. * @param info info
  5. * @return Object
  6. * @throws Throwable Throwable
  7. */
  8. public Object transactionRunning(TxTransactionInfo info) throws Throwable {
  9. // 1. 获取事务类型
  10. String transactionType = info.getTransactionType();
  11. // 2. 获取事务传播状态
  12. DTXPropagationState propagationState = propagationResolver.resolvePropagationState(info);
  13. // 2.1 如果不参与分布式事务立即终止
  14. if (propagationState.isIgnored()) {
  15. return info.getBusinessCallback().call();
  16. }
  17. // 3. 获取本地分布式事务控制器
  18. DTXLocalControl dtxLocalControl = txLcnBeanHelper.loadDTXLocalControl(transactionType, propagationState);
  19. // 4. 织入事务操作
  20. try {
  21. // 4.1 记录事务类型到事务上下文
  22. Set<String> transactionTypeSet = globalContext.txContext(info.getGroupId()).getTransactionTypes();
  23. transactionTypeSet.add(transactionType);
  24. dtxLocalControl.preBusinessCode(info);
  25. // 4.2 业务执行前
  26. txLogger.txTrace(
  27. info.getGroupId(), info.getUnitId(), "pre business code, unit type: {}", transactionType);
  28. // 4.3 执行业务
  29. Object result = dtxLocalControl.doBusinessCode(info);
  30. // 4.4 业务执行成功
  31. txLogger.txTrace(info.getGroupId(), info.getUnitId(), "business success");
  32. dtxLocalControl.onBusinessCodeSuccess(info, result);
  33. return result;
  34. } catch (TransactionException e) {
  35. txLogger.error(info.getGroupId(), info.getUnitId(), "before business code error");
  36. throw e;
  37. } catch (Throwable e) {
  38. // 4.5 业务执行失败
  39. txLogger.error(info.getGroupId(), info.getUnitId(), Transactions.TAG_TRANSACTION,
  40. "business code error");
  41. dtxLocalControl.onBusinessCodeError(info, e);
  42. throw e;
  43. } finally {
  44. // 4.6 业务执行完毕
  45. dtxLocalControl.postBusinessCode(info);
  46. }
  47. }

通过以上代码可以看出,该类是整个事务执行关键类。

以上就是LCN模式比较核心的代码,其他的分支代码就不一一赘述了

实战

由上一篇分布式事务之TX-LCN 我们规划了俩个TC分别是lcn-order 服务和lcn-pay服务,我们的思路是订单服务调用支付服务,分别在订单服务表t_order和支付服务表t_pay中插入插入数据。

订单服务核心代码和数据表脚本

  • 代码

    /**

    • @author:triumphxx
    • @Date:2021/10/24
    • @Time:2:13 下午
    • @微信公众号:北漂码农有话说
    • @网站:http://blog.triumphxx.com.cn
    • @GitHub https://github.com/triumphxx
    • @Desc:
      **/
      @RestController
      public class LcnOrderController {
  1. @Autowired
  2. TOrderDao tOrderDao;
  3. @Autowired
  4. private RestTemplate restTemplate;
  5. @PostMapping("/add-order")
  6. @Transactional(rollbackFor = Exception.class)
  7. @LcnTransaction
  8. public String add(){
  9. TOrder bean = new TOrder();
  10. bean.setTId(1);
  11. bean.setTName("order");
  12. restTemplate.postForEntity("http://lcn-pay/add-pay","",String.class);
  13. // int i = 1/0;
  14. tOrderDao.insert(bean);
  15. return "新增订单成功";
  16. }
  17. }
  • 脚本

    CREATE TABLE t_order (
    t_id int(11) NOT NULL,
    t_name varchar(45) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=latin1

支付服务核心代码和数据表脚本

  • 代码

    /**

    • @author:triumphxx
    • @Date:2021/10/24
    • @Time:2:26 下午
    • @微信公众号:北漂码农有话说
    • @网站:http://blog.triumphxx.com.cn
    • @GitHub https://github.com/triumphxx
    • @Desc:
      **/
      @RestController
      public class LcnPayController {

      @Autowired
      TPayDao tPayDao;

      @PostMapping(“/add-pay”)
      @Transactional(rollbackFor = Exception.class)
      @LcnTransaction
      public String addPay(){

      1. TPay tPay = new TPay();
      2. tPay.setTId(1);
      3. tPay.setTName("t_pay");
      4. int i = tPayDao.insertSelective(tPay);
      5. return "新增支付成功";

      }
      }

  • 脚本

    CREATE TABLE t_pay (

    1. `t_id` int(11) NOT NULL,
    2. `t_name` varchar(45) DEFAULT NULL

    ) ENGINE=InnoDB DEFAULT CHARSET=latin1

测试流程

  • 启动 Redis
  • 启动 TM
  • 启动注册中心 eureka-server
  • 启动服务 lcn-order
  • 启动服务 lcn-pay
  • 请求接口 http://localhost:8001/add-order
  • 代码创造异常看数据是否进行回滚

小结

本篇我们分析了TX-LCN分布式事务的lcn模式的原理及相关源码,以及搭建服务的进行测试。希望能对大家有所帮助。 源码地址源码传送门

E6_89_AB_E7_A0_81_E6_90_9C_E7_B4_A2_E8_81_94_E5_90_88_E4_BC_A0_E6_92_AD_E6_A0_B7_E5_BC_8F-_E6_A0_87_E5_87_86_E8_89_B2_E7_89_88.png

发表评论

表情:
评论列表 (有 0 条评论,59人围观)

还没有评论,来说两句吧...

相关阅读

    相关 LCN分布式事务

    背景 项目采用Spring Cloud (Spring Boot 2.0.1)开发,Spring Cloud是一个微服务架构实施的综合性解决框架。 1.知识点概述

    相关 LCN分布式事务

    背景 项目采用Spring Cloud (Spring Boot 2.0.1)开发,Spring Cloud是一个微服务架构实施的综合性解决框架。 1.知识点概述 1