java源码 - Spring5.x(6)之 事务 「爱情、让人受尽委屈。」 2022-11-27 12:22 113阅读 0赞 > [关于NamespaceHandler初始化与调用时机的文章][NamespaceHandler] > > -------------------- > > [浅谈Spring的BeanDefinitionParser的触发流程][Spring_BeanDefinitionParser] ### 文章目录 ### * 1. 事务自定义标签 * 2. 注册 InfrastructureAdvisorAutoProxyCreator * 3. 获取对应的Advice * 4. 获取事务标签 * 5. 事务增强 * * 1)创建事务 createTransactionIfNecessary * 2)处理已经存在的事务 handleExistingTransaction * 3)准备事务信息 prepareTransactionInfo * 4) 回滚处理 completeTransactionAfterThrowing * 5)事务提交 commitTransactionAfterReturning # 1. 事务自定义标签 # 配置文件版: <tx:annotation-driven /> 注解版: `@EnableTransactionManagement` org.springframework.transaction.config.TxNamespaceHandler @Override public void init() { registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser()); registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser()); registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser()); } 通过代码知道,Spring会使用AnnotationDrivenBeanDefinitionParser进行annotation-driven自定义标签的解析。 而我们知道XXXParser一般都是以parse方法为入口方法。 @Override @Nullable public BeanDefinition parse(Element element, ParserContext parserContext) { registerTransactionalEventListenerFactory(parserContext); String mode = element.getAttribute("mode"); if ("aspectj".equals(mode)) { // mode="aspectj" registerTransactionAspect(element, parserContext); if (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader())) { registerJtaTransactionAspect(element, parserContext); } } else { // mode="proxy" AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext); } return null; } 在解析中存在对于 mode 属性的判断,根据代码,如我们需要使 AspectJ 的方式进行事务切入( Spring 中的事务是以 AOP 基础的 ),那么可 以使用这样的配置 <tx : annotation-driven transaction-manager ="transactionManager“ mode=” aspectj " //> # 2. 注册 InfrastructureAdvisorAutoProxyCreator # ![在这里插入图片描述][20200816104204454.png_pic_center] 进入到org.springframework.transaction.config.AnnotationDrivenBeanDefinitionParser.AopAutoProxyConfigurer\#configureAutoProxyCreator: /** *内部类,当实际处于代理模式时,引入AOP框架依赖。 */ public static void configureAutoProxyCreator(Element element, ParserContext parserContext) { AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element); String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME; if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) { Object eleSource = parserContext.extractSource(element); //创建TransactionAttributeSource定义。 RootBeanDefinition sourceDef = new RootBeanDefinition( "org.springframework.transaction.annotation.AnnotationTransactionAttributeSource"); sourceDef.setSource(eleSource); sourceDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef); //创建TransactionInterceptor定义。 RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class); interceptorDef.setSource(eleSource); interceptorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); registerTransactionManager(element, interceptorDef); interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName)); String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef); //创建TransactionAttributeSourceAdvisor定义。 RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class); advisorDef.setSource(eleSource); advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName)); //将interceptorName的bean注入到advisorDef的adviceBeanName中 advisorDef.getPropertyValues().add("adviceBeanName", interceptorName); //如果配置了order属性,则加入bean中 if (element.hasAttribute("order")) { advisorDef.getPropertyValues().add("order", element.getAttribute("order")); } parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef); CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource); compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName)); compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName)); compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName)); parserContext.registerComponent(compositeDef); } } } > 以上方法主要注册了三个Bean,AnnotationTransactionAttributeSource,BeanFactoryTransactionAttributeSourceAdvisor,TransactionInterceptor。 > 这里的用途先放着。 先来看registerAutoProxyCreatorIfNecessary方法: public static void registerAutoProxyCreatorIfNecessary( ParserContext parserContext, Element sourceElement) { BeanDefinition beanDefinition = AopConfigUtils.registerAutoProxyCreatorIfNecessary( parserContext.getRegistry(), parserContext.extractSource(sourceElement)); useClassProxyingIfNecessary(parserContext.getRegistry(), sourceElement); registerComponentIfNecessary(beanDefinition, parserContext); } @Nullable public static BeanDefinition registerAutoProxyCreatorIfNecessary( BeanDefinitionRegistry registry, @Nullable Object source) { return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source); } 这里是注册了一个InfrastructureAdvisorAutoProxyCreator的Bean。 那么我们来看看其类图: ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center] 从类图我们知道,InfrastructureAdvisorAutoProxyCreator间接的继承了BeanPostProcessor,那么进而推出,每当实例化一个Bean 的时候,Spring都会调用这个类的postProcessAfterInitialization方法: @Override public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) { if (bean != null) { Object cacheKey = getCacheKey(bean.getClass(), beanName); if (this.earlyProxyReferences.remove(cacheKey) != bean) { //是否是由于避免循环依赖而创建的bean代理 return wrapIfNecessary(bean, beanName, cacheKey); } } return bean; } //又看见了类似Aop的处理方法了 /** *如有必要,包装给定的bean,即如果它有资格被代理。 * @param bean the raw bean instance * @param beanName the name of the bean * @param cacheKey the cache key for metadata access * @return a proxy wrapping the bean, or the raw bean instance as-is */ protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) { return bean; } if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) { return bean; } if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) { this.advisedBeans.put(cacheKey, Boolean.FALSE); return bean; } //如果我们有advice,创建代理。 Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null); if (specificInterceptors != DO_NOT_PROXY) { this.advisedBeans.put(cacheKey, Boolean.TRUE); Object proxy = createProxy( bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean)); this.proxyTypes.put(cacheKey, proxy.getClass()); return proxy; } this.advisedBeans.put(cacheKey, Boolean.FALSE); return bean; } # 3. 获取对应的Advice # org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator\#getAdvicesAndAdvisorsForBean org.springframework.aop.framework.autoproxy.AbstractAdvisorAutoProxyCreator\#getAdvicesAndAdvisorsForBean @Override @Nullable protected Object[] getAdvicesAndAdvisorsForBean( Class<?> beanClass, String beanName, @Nullable TargetSource targetSource) { List<Advisor> advisors = findEligibleAdvisors(beanClass, beanName); if (advisors.isEmpty()) { return DO_NOT_PROXY; } return advisors.toArray(); } protected List<Advisor> findEligibleAdvisors(Class<?> beanClass, String beanName) { List<Advisor> candidateAdvisors = findCandidateAdvisors(); List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName); extendAdvisors(eligibleAdvisors); if (!eligibleAdvisors.isEmpty()) { eligibleAdvisors = sortAdvisors(eligibleAdvisors); } return eligibleAdvisors; } 以上方法是否很熟悉?这里基本又回到了AOP的逻辑了。 第一,findCandidateAdvisors()找到所有的Advisors,而Spring获取Advisors的逻辑是: ![在这里插入图片描述][20200816215656828.png_pic_center] 从容器中获取所有的Advisor类型的bean定义。再通过下面的方法获取到bean: ![在这里插入图片描述][20200816215930898.png_pic_center] 第二,findAdvisorsThatCanApply,我们从注释可以知道这个方法的参数Class<?> beanClass代表的是当前这个beanClass是否需要增强。 org.springframework.aop.support.AopUtils\#findAdvisorsThatCanApply就是判断传入的bean从advisor中查找是否存在可以加器clazz的advisor。其判断逻辑如下: public static List<Advisor> findAdvisorsThatCanApply(List<Advisor> candidateAdvisors, Class<?> clazz) { if (candidateAdvisors.isEmpty()) { return candidateAdvisors; } List<Advisor> eligibleAdvisors = new ArrayList<>(); for (Advisor candidate : candidateAdvisors) { if (candidate instanceof IntroductionAdvisor && canApply(candidate, clazz)) { eligibleAdvisors.add(candidate); } } boolean hasIntroductions = !eligibleAdvisors.isEmpty(); for (Advisor candidate : candidateAdvisors) { if (candidate instanceof IntroductionAdvisor) { // already processed continue; } if (canApply(candidate, clazz, hasIntroductions)) { eligibleAdvisors.add(candidate); } } return eligibleAdvisors; } 其次,我们这次是TX事务,那么我们就可以看看之前的那三个类了。 ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 1] ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 2] public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) { if (advisor instanceof IntroductionAdvisor) { return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass); } else if (advisor instanceof PointcutAdvisor) { PointcutAdvisor pca = (PointcutAdvisor) advisor; return canApply(pca.getPointcut(), targetClass, hasIntroductions); } else { // It doesn't have a pointcut so we assume it applies. return true; } } 那么再进行当前Bean是否需要增强的时候,我们知道BeanFactoryTransactionAttributeSourceAdvisor肯定在所有的Advisor内,在canApply方法中,会调用每一个的Advisor的getPointcut来判断是否匹配,而这个pointcut正是我们增强方法常写的切面,切点的那个表达式内的东西。 ![在这里插入图片描述][20200816221305358.png_pic_center] ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 3] 那么接下来我们就知道该看什么了。pc此时为TransactionAttributeSourcePointcut: public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) { Assert.notNull(pc, "Pointcut must not be null"); if (!pc.getClassFilter().matches(targetClass)) { return false; } MethodMatcher methodMatcher = pc.getMethodMatcher(); if (methodMatcher == MethodMatcher.TRUE) { //如果匹配任何方法,就不需要迭代方法… return true; } IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null; if (methodMatcher instanceof IntroductionAwareMethodMatcher) { introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher; } Set<Class<?>> classes = new LinkedHashSet<>(); if (!Proxy.isProxyClass(targetClass)) { classes.add(ClassUtils.getUserClass(targetClass)); } classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass)); for (Class<?> clazz : classes) { Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz); for (Method method : methods) { if (introductionAwareMethodMatcher != null ? introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) : methodMatcher.matches(method, targetClass)) { return true; } } } return false; } 这个方法大致做了这些思路,首先获取对应类的所有接口连同类本身进行遍历,匹配成功就说明当前advisor适配于这个bean。 如果再深究就得看方法是如何匹配上的。 ![在这里插入图片描述][20200816221707580.png_pic_center] ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 4] 通过类图我们可以看出TransactionAttributeSourcePointcut实现了MethodMatcher方法,那么这里的 methodMatcher.matches会调用org.springframework.transaction.interceptor.TransactionAttributeSourcePointcut\#matches @Override public boolean matches(Method method, Class<?> targetClass) { if (TransactionalProxy.class.isAssignableFrom(targetClass) || PlatformTransactionManager.class.isAssignableFrom(targetClass) || PersistenceExceptionTranslator.class.isAssignableFrom(targetClass)) { return false; } TransactionAttributeSource tas = getTransactionAttributeSource(); return (tas == null || tas.getTransactionAttribute(method, targetClass) != null); } 接着这个方法调用TransactionAttributeSource 的getTransactionAttribute。 而这里的`AnnotationTransactionAttributeSource`正是上面方法注册的; ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 5] 那么我们就得来看`AnnotationTransactionAttributeSource`的`getTransactionAttribute`,其实是调用的其父类的: org.springframework.transaction.interceptor.AbstractFallbackTransactionAttributeSource\#getTransactionAttribute /** *确定此方法调用的事务属性。 * <p>如果没有找到方法属性,则默认为类的事务属性。 * @param 当前调用的方法(never {@code null}) * @param 这个调用的目标类(可能是{@code null}) * @return 此方法的TransactionAttribute,如果该方法,则为{@code null} *不是事务性的 */ @Override @Nullable public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) { if (method.getDeclaringClass() == Object.class) { return null; } //首先,看看我们是否有一个缓存值。 Object cacheKey = getCacheKey(method, targetClass); TransactionAttribute cached = this.attributeCache.get(cacheKey); if (cached != null) { //值将是规范值,指示不存在事务属性, //或实际事务属性。 if (cached == NULL_TRANSACTION_ATTRIBUTE) { return null; } else { return cached; } } else { //我们需要解决这个问题。 TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass); //将其放入缓存中。 if (txAttr == null) { this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE); } else { String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass); if (txAttr instanceof DefaultTransactionAttribute) { ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification); } if (logger.isTraceEnabled()) { logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr); } this.attributeCache.put(cacheKey, txAttr); } return txAttr; } } 这个方法只做了,缓存的处理逻辑,而将真正做事的逻辑放到了computeTransactionAttribute方法中。 # 4. 获取事务标签 # @Nullable protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) { //按要求不允许非公开方法。 if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) { return null; } //方法可能在接口上,但我们需要来自目标类的属性。 //如果目标类为null,该方法将不变。 Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass); // First try是目标类中的方法。 TransactionAttribute txAttr = findTransactionAttribute(specificMethod); if (txAttr != null) { return txAttr; } //第二个try是目标类上的事务属性。 txAttr = findTransactionAttribute(specificMethod.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } if (specificMethod != method) { //退一步就是看一看原来的方法。 txAttr = findTransactionAttribute(method); if (txAttr != null) { return txAttr; } // Last fallback是原始方法的类。 txAttr = findTransactionAttribute(method.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } } return null; } 从这个方法我们就可以总结出事务的获取属性: 如果方法上有事务属性则看方法上的,否则就看类上的。 继续深入看这个方法内是如何获取到事务属性的,由findTransactionAttribute辗转到了: org.springframework.transaction.annotation.AnnotationTransactionAttributeSource\#determineTransactionAttribute @Nullable protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) { for (TransactionAnnotationParser annotationParser : this.annotationParsers) { TransactionAttribute attr = annotationParser.parseTransactionAnnotation(element); if (attr != null) { return attr; } } return null; } 这里的TransactionAnnotationParser 其实使用的是SpringTransactionAnnotationParser: org.springframework.transaction.annotation.SpringTransactionAnnotationParser\#parseTransactionAnnotation(java.lang.reflect.AnnotatedElement) @Override @Nullable public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) { AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes( element, Transactional.class, false, false); if (attributes != null) { return parseTransactionAnnotation(attributes); } else { return null; } } protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) { RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute(); Propagation propagation = attributes.getEnum("propagation"); rbta.setPropagationBehavior(propagation.value()); Isolation isolation = attributes.getEnum("isolation"); rbta.setIsolationLevel(isolation.value()); rbta.setTimeout(attributes.getNumber("timeout").intValue()); rbta.setReadOnly(attributes.getBoolean("readOnly")); rbta.setQualifier(attributes.getString("value")); List<RollbackRuleAttribute> rollbackRules = new ArrayList<>(); for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) { rollbackRules.add(new RollbackRuleAttribute(rbRule)); } for (String rbRule : attributes.getStringArray("rollbackForClassName")) { rollbackRules.add(new RollbackRuleAttribute(rbRule)); } for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) { rollbackRules.add(new NoRollbackRuleAttribute(rbRule)); } for (String rbRule : attributes.getStringArray("noRollbackForClassName")) { rollbackRules.add(new NoRollbackRuleAttribute(rbRule)); } rbta.setRollbackRules(rollbackRules); return rbta; } 至此,我们已经看见了propagation等标签属性。 总结一下就是,Spring容器getBean时会调用InfrastructureAdvisorAutoProxyCreator的BeanPostProcessor接口的方法,去查到到当前类是否需要增强,以及对应的增强类。完成该类的初始化。 但是我们目前漏了一个,Transactionlnterceptor。 # 5. 事务增强 # 通过前面的学习,以及AOP的知识,我们知道,从Spring容器中获取到的Bean在具体调用方法时,其调用的是代理类的invoke方法。 那么我们可以关注上面的第三个类了:Transactionlnterceptor。 回顾一下: org.springframework.transaction.config.AnnotationDrivenBeanDefinitionParser.AopAutoProxyConfigurer\#configureAutoProxyCreator: ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 6] 1. 先是创建了AnnotationTransactionAttributeSource的BeanDefinition,并生成BeanName; 2. 创建TransactionInterceptor的BeanDefinition,并生成BeanName。 3. 将这个两个类注入到BeanFactoryTransactionAttributeSourceAdvisor中。 那么当事务代理类被调用时,BeanFactoryTransactionAttributeSourceAdvisor注册的advise为TransactionInterceptor,那么会调用TransactionInterceptor的invoke方法。 如下: @Override @Nullable public Object invoke(MethodInvocation invocation) throws Throwable { //算出目标类:可能是{@code null}。 // TransactionAttributeSource应该被传递给目标类 //以及方法,方法可能来自接口。 Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); } /** *一般委托基于周围建议的子类,委托给几个其他模板 *这个类的方法。能够处理{@link CallbackPreferringPlatformTransactionManager} *以及常规的{@link PlatformTransactionManager}实现。 * @param 被调用的方法 * @param 我们正在其上调用方法的目标类 * @param 调用要用于继续目标调用的回调 * @return 方法的返回值(如果有的话) * @throws 从目标调用传播的Throwable */ @Nullable protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { //如果事务属性为null,则该方法是非事务性的。 TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { //使用getTransaction和提交/回滚调用进行标准的事务界定。 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal; try { //这是一个around通知:调用链中的下一个拦截器。 //这通常会导致调用目标对象。 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { //目标调用异常 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } commitTransactionAfterReturning(txInfo); return retVal; } else { final ThrowableHolder throwableHolder = new ThrowableHolder(); //它是一个CallbackPreferringPlatformTransactionManager:传入一个TransactionCallback。 try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { //一个RuntimeException:将导致回滚。 if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { //一个正常的返回值:会导致提交。 throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); //检查结果状态:它可能指示一个可抛出的重新抛出。 if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } } } 功能如下: 1. 获取事务属性。 2. 加载配置中的TransactionManager。 3. 不同事务采用不同的逻辑。 4. 在目标方法执行前获取事务与事务信息。 5. 执行目标方法。 6. 异常处理。 7. 提交事务前清除事务信息。 8. 提交事务。 ## 1)创建事务 createTransactionIfNecessary ## /** *根据给定的TransactionAttribute创建一个必要的事务。 * 允许调用者执行定制的TransactionAttribute查找 * TransactionAttributeSource。 * @param txAttr TransactionAttribute(可以是{@code null}) * @param joinpointIdentification 完全限定的方法名称(用于监视和日志记录目的) * @return TransactionInfo对象,无论是否创建了事务。 * The {@code hasTransaction()} method on TransactionInfo can be used to * tell if there was a transaction created. * @see #getTransactionAttributeSource() */ protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { //如果没有指定名称,将方法标识应用为事务名称。 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { //获取事务状态 status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); } 其他的没什么好说的,看注释就知道什么意思,主要来看getTransaction方法: org.springframework.transaction.support.AbstractPlatformTransactionManager\#getTransaction /** *此实现处理传播行为。代表 * {@code doGetTransaction}, {@code isExistingTransaction} *和{@code doBegin}。 */ @Override public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { Object transaction = doGetTransaction(); //缓存调试标记,以避免重复检查。 boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { //如果没有给出事务定义,则使用默认值。 definition = new DefaultTransactionDefinition(); } if (isExistingTransaction(transaction)) { //检查传播行为以了解如何行为。 return handleExistingTransaction(definition, transaction, debugEnabled); } //检查新事务的定义设置。 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } //没有发现现有事务->检查传播行为以了解如何继续。 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { //创建“空”事务:没有实际事务,但可能存在同步。 if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } } 步骤解析: 1. doGetTransaction获取事务。 protected Object doGetTransaction() { DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject(); //如果当前线程已经记录数据库连接则使用原有连接。 txObject.setSavepointAllowed(this.isNestedTransactionAllowed()); ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.obtainDataSource()); //false 表示非新创建连接 txObject.setConnectionHolder(conHolder, false); return txObject; } 1. isExistingTransaction,如果当先线程存在事务,则转向嵌套事务的处理。 handleExistingTransaction()。 2. definition.getTimeout() < TransactionDefinition.TIMEOUT\_DEFAULT,超时异常。 3. newTransactionStatus 4. doBegin,完善Transaction。 org.springframework.jdbc.datasource.DataSourceTransactionManager\#doBegin /** 构造transaction */ protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction; Connection con = null; try { if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = this.obtainDataSource().getConnection(); if (this.logger.isDebugEnabled()) { this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); txObject.setReadOnly(definition.isReadOnly()); //如果需要,切换到手动提交。在某些JDBC驱动程序中,这是非常昂贵的, //因此我们不想不必要地这样做(例如,如果我们明确地 //已配置连接池设置它)。 if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (this.logger.isDebugEnabled()) { this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } this.prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true); int timeout = this.determineTimeout(definition); if (timeout != -1) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable var7) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.obtainDataSource()); txObject.setConnectionHolder((ConnectionHolder)null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7); } } 事务始于这个doBegin: * 尝试获取连接,有就用,没有就获取新的。 * 设置隔离级别,只读标识。 * 更改默认提交设置。 * 设置标识位,标识当前连接已经被事务激活。 * 设置过期时间。 * 将connectionHolder 绑定到线程。 其实有很多都是交给了数据库的底层连接来处理。 @Nullable public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition) throws SQLException { Assert.notNull(con, "No Connection specified"); //设置只读标志。 if (definition != null && definition.isReadOnly()) { try { if (logger.isDebugEnabled()) { logger.debug("Setting JDBC Connection [" + con + "] read-only"); } con.setReadOnly(true); } catch (SQLException | RuntimeException ex) { Throwable exToCheck = ex; while (exToCheck != null) { if (exToCheck.getClass().getSimpleName().contains("Timeout")) { //假设这是一个连接超时,否则就会丢失:例如JDBC 4.0 throw ex; } exToCheck = exToCheck.getCause(); } //“只读不受支持”SQLException ->忽略,这只是一个提示 logger.debug("Could not set JDBC Connection read-only", ex); } } //应用特定隔离级别(如果有的话)。 Integer previousIsolationLevel = null; if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { if (logger.isDebugEnabled()) { logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " + definition.getIsolationLevel()); } int currentIsolation = con.getTransactionIsolation(); if (currentIsolation != definition.getIsolationLevel()) { previousIsolationLevel = currentIsolation; con.setTransactionIsolation(definition.getIsolationLevel()); } } return previousIsolationLevel; } 1. 将事务信息记录到当前线程。prepareSynchronization protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != -1 ? definition.getIsolationLevel() : null); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); } } ## 2)处理已经存在的事务 handleExistingTransaction ## ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 7] org.springframework.transaction.support.AbstractPlatformTransactionManager\#getTransaction Spring 中支持多种事务传播规则,这写都是当事务存在的时候进行的处理。 /** *为现有事务创建一个TransactionStatus。 */ private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (useSavepointForNestedTransaction()) { //在现有spring管理的事务中创建保存点, //通过由TransactionStatus实现的SavepointManager API。 //通常使用JDBC 3.0保存点。不要激活Spring同步。 DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { //通过嵌套的开始和提交/回滚调用嵌套事务。 //通常只针对JTA: Spring同步可能在这里被激活 //在已有JTA事务的情况下。 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } //假设PROPAGATION_SUPPORTS或PROPAGATION_REQUIRED。 if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); } 1. REQUIRED(默认属性) 如果存在一个事务,则支持当前事务。如果没有事务则开启一个新的事务。 被设置成这个级别时,会为每一个被调用的方法创建一个逻辑事务域。如果前面的方法已经创建了事务,那么后面的方法支持当前的事务,如果当前没有事务会重新建立事务。 2. NESTED 支持当前事务,新增Savepoint点,与当前事务同步提交或回滚。 嵌套事务一个非常重要的概念就是内层事务依赖于外层事务。外层事务失败时,会回滚内层事务所做的动作。而内层事务操作失败并不会引起外层事务的回滚。 3. MANDATORY Support a current transaction, throw an exception if none exists.支持当前事务,如果当前没有事务,就抛出异常。 对挂起操作的主要目的是记录原有事务的状态,以便于后续操作对事物的恢复。 /** *暂停给定的事务。首先挂起事务同步, *然后委托给{@code doSuspend}模板方法。 * @param事务当前事务对象 *(或{@code null}暂停活动同步,如果有的话) * @return an object that holds suspended resources * (or {@code null} if neither transaction nor synchronization active) * @see #doSuspend * @see #resume */ @Nullable protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { suspendedResources = doSuspend(transaction); } String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException | Error ex) { // doSuspend失败-原始事务仍处于活动状态… doResumeSynchronization(suspendedSynchronizations); throw ex; } } else if (transaction != null) { //事务活动,但没有同步活动。 Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { //事务和同步都不是活动的。 return null; } } ## 3)准备事务信息 prepareTransactionInfo ## org.springframework.transaction.interceptor.TransactionAspectSupport\#prepareTransactionInfo /** *为给定的属性和状态对象准备一个TransactionInfo。 * @param txAttr TransactionAttribute(可以是{@code null}) * @param joinpointIdentification完全限定方法名 *(用于监视和日志记录目的) * @param 当前事务的事务状态 * @return 准备好的TransactionInfo对象 */ protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) { TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { //这种方法需要一个交易… if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } //如果已经存在不兼容的tx,事务管理器将标记一个错误。 txInfo.newTransactionStatus(status); } else { // TransactionInfo.hasTransaction()方法将返回false。我们只创建了它 //保持该类中维护的ThreadLocal堆栈的完整性。 if (logger.isTraceEnabled()) { logger.trace("No need to create transaction for [" + joinpointIdentification + "]: This method is not transactional."); } } //我们总是将TransactionInfo绑定到线程,即使我们没有创建 //这里有一个新事务。这保证了TransactionInfo堆栈 //将被正确地管理,即使这个方面没有创建任何事务。 txInfo.bindToThread(); return txInfo; } Spring在建立事务连接并完成了事务信息的提取后,我们需要讲所有的事务信息统一记录在TransactionInfo的实例中。这个实例包含了方法开始前的所有的状态信息。 这个类的主要作用就是用来回滚。 ## 4) 回滚处理 completeTransactionAfterThrowing ## ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 8] org.springframework.transaction.interceptor.TransactionAspectSupport\#completeTransactionAfterThrowing: /** *处理一个throwable,完成事务。 *我们可以提交或回滚,这取决于配置。 * @param txInfo information about the current transaction * @param ex throwable encountered */ protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex); } if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by rollback exception", ex); throw ex2; } } else { //我们不回滚这个异常。 //如果TransactionStatus.isRollbackOnly()为真, //仍将回滚。 try { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by commit exception", ex); throw ex2; } } } } 首先来看回滚条件: ![在这里插入图片描述][20200818104001896.png_pic_center] @Override public boolean rollbackOn(Throwable ex) { return (ex instanceof RuntimeException || ex instanceof Error); } 默认是对RuntimeException 与 Error 进行回滚。 再来看如何回滚: ![在这里插入图片描述][20200818103858847.png_pic_center] @Override public final void rollback(TransactionStatus status) throws TransactionException { //如果事务已经完成,那么回滚会抛出异常 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; processRollback(defStatus, false); } /** 处理实际的回滚。 *完成标志已经被选中。 * @param 表示事务的状态对象 * @throws 回滚失败时的TransactionException */ private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; try { triggerBeforeCompletion(status); if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } status.rollbackToHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } doRollback(status); } else { //参与较大的交易 if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } //只有在我们被要求提前失败时,意外回滚才重要 if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } } catch (RuntimeException | Error ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); //如果我们有一个只回滚的全局标记,则会引发unexpected drollbackexception异常 if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { cleanupAfterCompletion(status); } } 具体流程如下: 1. 自定义触发器的调用,包括回滚前,完成回滚后的调用。 protected final void triggerBeforeCompletion(DefaultTransactionStatus status) { if (status.isNewSynchronization()) { if (status.isDebug()) { logger.trace("Triggering beforeCompletion synchronization"); } TransactionSynchronizationUtils.triggerBeforeCompletion(); } } 1. 回滚逻辑处理函数 org.springframework.transaction.support.AbstractTransactionStatus\#rollbackToHeldSavepoint 这里需要注意的是: 对于嵌入事务的内部异常不会引起外部事务的回滚。 /** *回滚到为事务持有的保存点 *然后立即释放保存点。 */ public void rollbackToHeldSavepoint() throws TransactionException { Object savepoint = getSavepoint(); if (savepoint == null) { throw new TransactionUsageException( "Cannot roll back to savepoint - no savepoint associated with current transaction"); } getSavepointManager().rollbackToSavepoint(savepoint); getSavepointManager().releaseSavepoint(savepoint); setSavepoint(null); } org.springframework.jdbc.datasource.JdbcTransactionObjectSupport\#releaseSavepoint /** * This implementation releases the given JDBC 3.0 Savepoint. * @see java.sql.Connection#releaseSavepoint */ @Override public void releaseSavepoint(Object savepoint) throws TransactionException { ConnectionHolder conHolder = getConnectionHolderForSavepoint(); try { conHolder.getConnection().releaseSavepoint((Savepoint) savepoint); } catch (Throwable ex) { logger.debug("Could not explicitly release JDBC savepoint", ex); } } 当之前已经保存的事务信息中的事务为新事务,那么直接回滚,常用于单独事务的处理。 对于没有保存点的回 Spring 同样是使用底层数据库连接提供的 API 来操作的 由于我们使用的是 DataSourceTransactionManager ,那么 doRollback 会使用此类中的实现: org.springframework.jdbc.datasource.DataSourceTransactionManager\#doRollback @Override protected void doRollback(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); } try { con.rollback(); } catch (SQLException ex) { throw new TransactionSystemException("Could not roll back JDBC transaction", ex); } } 当前事务不属于以上两种情况,多数用于JTA,只做回滚标识,等到提交的时候统一不提交。 1. 信息清除cleanupAfterCompletion /** *完成后清理,必要时清除同步, *并在完成后调用docleanup。 * @param status object representing the transaction * @see #doCleanupAfterCompletion */ private void cleanupAfterCompletion(DefaultTransactionStatus status) { status.setCompleted(); if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } if (status.isNewTransaction()) { doCleanupAfterCompletion(status.getTransaction()); } if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); } Object transaction = (status.hasTransaction() ? status.getTransaction() : null); //结束挂起状态 resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources()); } } 作用: * 避免重复调用 * 绑定到当前线程的事务信息清楚 * 如果是新事务需要做些清除资源的操作 org.springframework.jdbc.datasource.DataSourceTransactionManager\#doCleanupAfterCompletion @Override protected void doCleanupAfterCompletion(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; //如果连接容器暴露,则从线程中移除。 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.unbindResource(obtainDataSource()); } //重置连接。 Connection con = txObject.getConnectionHolder().getConnection(); try { if (txObject.isMustRestoreAutoCommit()) { con.setAutoCommit(true); } DataSourceUtils.resetConnectionAfterTransaction( con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly()); } catch (Throwable ex) { logger.debug("Could not reset JDBC Connection after transaction", ex); } if (txObject.isNewConnectionHolder()) { if (logger.isDebugEnabled()) { logger.debug("Releasing JDBC Connection [" + con + "] after transaction"); } DataSourceUtils.releaseConnection(con, this.dataSource); } txObject.getConnectionHolder().clear(); } 如果在事务执行前有事务挂起,那么当前事务执行结束后需要将挂起事务恢复。 protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) throws TransactionException { if (resourcesHolder != null) { Object suspendedResources = resourcesHolder.suspendedResources; if (suspendedResources != null) { doResume(transaction, suspendedResources); } List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; if (suspendedSynchronizations != null) { TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly); TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name); doResumeSynchronization(suspendedSynchronizations); } } } ## 5)事务提交 commitTransactionAfterReturning ## /** *在成功完成调用后执行,但不在处理异常后执行。 *如果不创建事务,就什么也不做。 * @param txInfo information about the current transaction */ protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } } **通过之前设置的TransactionInfo,当某个事务是另一个事务的嵌入事务,但是这些事务又不在Spring的管理范围内,或者无法设置保存点,Spring就会通过设置这个回滚标识来禁止提交。** 首先当某个嵌入事务发生回滚的时候会设置回滚标识,而等到外部事务提交时,一旦判断出当前事务流被设置了回滚标识,则由外部事务来统一进行整体事务的回滚。 /** *这个提交的实现处理参与现有的 *事务和程序性回滚请求。 * Delegates to {@code isRollbackOnly}, {@code doCommit} * and {@code rollback}. * @see org.springframework.transaction.TransactionStatus#isRollbackOnly() * @see #doCommit * @see #rollback */ @Override public final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus, false); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus, true); return; } processCommit(defStatus); } 真正提交流程: /** *处理一个实际的提交。 *只回滚标志已经被检查和应用。 * @param status object representing the transaction * @throws TransactionException in case of commit failure */ private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { boolean unexpectedRollback = false; prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } unexpectedRollback = status.isGlobalRollbackOnly(); doCommit(status); } else if (isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = status.isGlobalRollbackOnly(); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException | Error ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } } 符合提交的的事务如下: * 当事务状态中有保存点信息的话便不会去提交事务。 * 当事务非新事务的时候也不会去执行提交事务操作。 [NamespaceHandler]: https://blog.csdn.net/qq924862077/article/details/72887698 [Spring_BeanDefinitionParser]: https://blog.csdn.net/boneix/article/details/70075657 [20200816104204454.png_pic_center]: /images/20221124/1ca534064e96415a87df8ce4b7385816.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center]: /images/20221124/592cc96ac5cf4e70887d5de3a6dce1b9.png [20200816215656828.png_pic_center]: /images/20221124/b192633fcbca4fcba7900f41c8d3a174.png [20200816215930898.png_pic_center]: /images/20221124/6e5db001ade14a8aaf0e83daf0a9d51a.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 1]: /images/20221124/a515fd5157fe48c6a51067fd7531e02f.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 2]: /images/20221124/80ca6616c474429684fd1753053de0f7.png [20200816221305358.png_pic_center]: /images/20221124/bace78448e354f8fbca93e63f3e568f9.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 3]: /images/20221124/a675498e264c496993f738ec69138d50.png [20200816221707580.png_pic_center]: /images/20221124/01daaa6f60e7447194d46327e39342f0.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 4]: /images/20221124/61fc996f4f7a4e9c9bb132935723d994.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 5]: /images/20221124/83182497835e4e47b0a35c2957f36d82.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 6]: /images/20221124/dde82f89baf2434a9b2e82c4d067684e.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 7]: /images/20221124/fb653fabfe574ecea57a2e27d2bc8432.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpdHRsZXdoaXRldmc_size_16_color_FFFFFF_t_70_pic_center 8]: /images/20221124/facb81e738e44b4091abdb037b948ad1.png [20200818104001896.png_pic_center]: /images/20221124/52a0f6868805462490b7a567c44f9b75.png [20200818103858847.png_pic_center]: /images/20221124/4fd728460bb14596871641cfd1d58dd5.png
还没有评论,来说两句吧...