1.故事前因

在分析mybatis源码时一直带的疑问,一直以为事务是在SqlSessionTemplate#SqlSessionInterceptor#invoke完成的,直到断点才发现并不简单!

  • 在没有事务注解时连接数据库都是SimpleExecutor#prepareStatementgetConnection连接的,这里何来的事务一说,下一步都是直接查询了
  • 然后在有事务注解的时候,经过断点发现这个连接早就创建好了,那么是何时创建的?事务又是在哪里开启的呢?
    今天来一探究竟!!!

2.首先再回顾一下aop原理

  • 大家都知道首先向容器注册一个事务管理类,同时扫描所有的@Transactional注解并生成代理类。然后再执行加了事务的代码是会先走动态代理的代码。

2.1 首先找到bean初始化之后的回调AbstractAutowireCapableBeanFactory

@Override
public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName)
		throws BeansException {

	Object result = existingBean;
	for (BeanPostProcessor processor : getBeanPostProcessors()) {
	    //将原始类进行代理包装
		Object current = processor.postProcessAfterInitialization(result, beanName);
		if (current == null) {
			return result;
		}
		result = current;
	}
	return result;
}

2.2 来到AOP处理类AbstractAutoProxyCreator#wrapIfNecessary

  • wrapIfNecessary中找到这段代码
// Create proxy if we have advice. 如果有advice就创建代理
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
if (specificInterceptors != DO_NOT_PROXY) {
	this.advisedBeans.put(cacheKey, Boolean.TRUE);
	//满足添加就创建代理
	Object proxy = createProxy(
			bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
	this.proxyTypes.put(cacheKey, proxy.getClass());
	return proxy;
}

//getAdvicesAndAdvisorsForBean -> findEligibleAdvisors -> findCandidateAdvisors
//-> BeanFactoryAdvisorRetrievalHelper#findAdvisorBeans
public List<Advisor> findAdvisorBeans() {
	// Determine list of advisor bean names, if not cached already.
	String[] advisorNames = this.cachedAdvisorBeanNames;
	if (advisorNames == null) {
		// Do not initialize FactoryBeans here: We need to leave all regular beans
		// uninitialized to let the auto-proxy creator apply to them!
		//找到Advisor实现类的beanName数组
		advisorNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(
				this.beanFactory, Advisor.class, true, false);
		this.cachedAdvisorBeanNames = advisorNames;
	}
	if (advisorNames.length == 0) {
		return new ArrayList<>();
	}

	List<Advisor> advisors = new ArrayList<>();
	for (String name : advisorNames) {
		//。。。省略部分代码
		//通过beanName找到所有的Advisor实现类bean
		advisors.add(this.beanFactory.getBean(name, Advisor.class));
		//。。。省略部分代码		
	return advisors;
}

2.3 AOP的调用阶段

  • 上述内容会包装成一个config最终由JdkDynamicAopProxy或者CglibAopProxy生成动代理,代理invoke类自然就是这两个类,本内容大概分析一下JdkDynamicAopProxy,在该类找到invoke类
//找到这个代码 将advised的实现类根据满足规则 最终组成一条调用链路
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

//封装成MethodInvocation 
MethodInvocation invocation =
						new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
// Proceed to the joinpoint through the interceptor chain.
retVal = invocation.proceed();

//ReflectiveMethodInvocation 递归调用代理类
public Object proceed() throws Throwable {
	// We start with an index of -1 and increment early.
	if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
		//链路调用完成后 最终调用原始类
		return invokeJoinpoint();
	}

	Object interceptorOrInterceptionAdvice =
			this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
	if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
		// Evaluate dynamic method matcher here: static part will already have
		// been evaluated and found to match.
		InterceptorAndDynamicMethodMatcher dm =
				(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
		Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
		if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
			//调用满足条件的切面类
			return dm.interceptor.invoke(this);
		}
		else {
			// Dynamic matching failed.
			// Skip this interceptor and invoke the next in the chain.
			//递归
			return proceed();
		}
	}
	else {
		// It's an interceptor, so we just invoke it: The pointcut will have
		// been evaluated statically before this object was constructed.
		return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
	}
}

3.事务的源码分析

3.1 事务的初始化类ProxyTransactionManagementConfiguration

//加了Configuration 能被spring扫描到
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

	//该类实现了Advisor beanName为org.springframework.transaction.config.internalTransactionAdvisor
	//也就是上面的切面扫描类会扫描到该类
	@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
			TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {

		BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
		//校验规则 都是由下面的bean实现初始化的
		advisor.setTransactionAttributeSource(transactionAttributeSource);
		//链路类
		advisor.setAdvice(transactionInterceptor);
		if (this.enableTx != null) {
			advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
		}
		return advisor;
	}

	@Bean
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public TransactionAttributeSource transactionAttributeSource() {
		return new AnnotationTransactionAttributeSource();
	}

	@Bean
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
		//链路执行类为TransactionInterceptor
		TransactionInterceptor interceptor = new TransactionInterceptor();
		interceptor.setTransactionAttributeSource(transactionAttributeSource);
		if (this.txManager != null) {
			interceptor.setTransactionManager(this.txManager);
		}
		return interceptor;
	}
}
  • 简单分析下AnnotationTransactionAttributeSource
public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
	this.publicMethodsOnly = publicMethodsOnly;
	if (jta12Present || ejb3Present) {
		this.annotationParsers = new LinkedHashSet<>(4);
		this.annotationParsers.add(new SpringTransactionAnnotationParser());
		if (jta12Present) {
			this.annotationParsers.add(new JtaTransactionAnnotationParser());
		}
		if (ejb3Present) {
			this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
		}
	}
	else {
		this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser());
	}
}

//都会添加该类SpringTransactionAnnotationParser 只贴这个方法了 根据这个判断是否生成事务的代理类
@Override
public boolean isCandidateClass(Class<?> targetClass) {
	return AnnotationUtils.isCandidateClass(targetClass, Transactional.class);
}

至此,整个AOP的流程已经事务类的初始化以及全部分析完了,接下来继续分析链路中的事务类调用

3.2 事务代理类TransactionInterceptor

@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
	// Work out the target class: may be {@code null}.
	// The TransactionAttributeSource should be passed the target class
	// as well as the method, which may be from an interface.
	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

	// Adapt to TransactionAspectSupport's invokeWithinTransaction...
	//执行
	return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
		@Override
		@Nullable
		public Object proceedWithInvocation() throws Throwable {
			return invocation.proceed();
		}
		@Override
		public Object getTarget() {
			return invocation.getThis();
		}
		@Override
		public Object[] getArguments() {
			return invocation.getArguments();
		}
	});
}

3.3 事务管理类TransactionAspectSupport#invokeWithinTransaction

if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
	// Standard transaction demarcation with getTransaction and commit/rollback calls.
	//创建一个事务 主要分析这里 ##
	TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
	
	Object retVal;
	try {
		// This is an around advice: Invoke the next interceptor in the chain.
		// This will normally result in a target object being invoked.
		//业务执行 这里如果无其他切面正常会到SqlSessionTemplate
		retVal = invocation.proceedWithInvocation();
	}
	catch (Throwable ex) {
		// target invocation exception
		//事务异常切面
		completeTransactionAfterThrowing(txInfo, ex);
		throw ex;
	}
	finally {
		//清除事务相关
		cleanupTransactionInfo(txInfo);
	}
	
	if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
		// Set rollback-only in case of Vavr failure matching our rollback rules...
		TransactionStatus status = txInfo.getTransactionStatus();
		if (status != null && txAttr != null) {
			retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
		}
	}
	//提交或者回滚事务
	commitTransactionAfterReturning(txInfo);
	return retVal;
}

3.4 事务入口AbstractPlatformTransactionManager

  • createTransactionIfNecessary -> AbstractPlatformTransactionManager#getTransaction -> startTransaction
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
		boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

	boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
	DefaultTransactionStatus status = newTransactionStatus(
			definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
	// ###
	doBegin(transaction, definition);
	prepareSynchronization(status, definition);
	return status;
}

3.5 事务开启DataSourceTransactionManager#doBegin

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
	Connection con = null;

	try {
		if (!txObject.hasConnectionHolder() ||
				txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
			//根据数据源插件获取一个连接	
			Connection newCon = obtainDataSource().getConnection();
			if (logger.isDebugEnabled()) {
				logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
			}
			txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
		}

		txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
		con = txObject.getConnectionHolder().getConnection();

		Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
		txObject.setPreviousIsolationLevel(previousIsolationLevel);
		txObject.setReadOnly(definition.isReadOnly());

		// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
		// so we don't want to do it unnecessarily (for example if we've explicitly
		// configured the connection pool to set it already).
		//如果是自动提交的
		if (con.getAutoCommit()) {
			//设置必须有序提交
			txObject.setMustRestoreAutoCommit(true);
			if (logger.isDebugEnabled()) {
				logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
			}
			//同时关闭自动提交
			con.setAutoCommit(false);
		}
		//开始事务的入口 stmt.executeUpdate("SET TRANSACTION READ ONLY"); ###
		prepareTransactionalConnection(con, definition);
		//设置事务的活跃状态
		txObject.getConnectionHolder().setTransactionActive(true);

		//。。。省略后续代码
}

//prepareTransactionalConnection
protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition)
		throws SQLException {

	if (isEnforceReadOnly() && definition.isReadOnly()) {
		try (Statement stmt = con.createStatement()) {
			//直接执行sql 事务开始后 取决于数据库隔离级别 默认是行锁
			stmt.executeUpdate("SET TRANSACTION READ ONLY");
		}
	}
}

3.6 事务之rollback

  • TransactionAspectSupport#commitTransactionAfterReturning -> AbstractPlatformTransactionManager#commit -> processRollback - > doRollback -> DataSourceTransactionManager#con.rollback()
  • ConnectionImpl#rollback
private void rollbackNoChecks() throws SQLException {
      synchronized (getConnectionMutex()) {
          if (this.useLocalTransactionState.getValue()) {
              if (!this.session.getServerSession().inTransactionOnServer()) {
                  return; // effectively a no-op
              }
          }
		  //执行sql
          this.session.execSQL(null, "rollback", -1, null, false, this.nullStatementResultSetFactory, null, false);
      }
  }

3.7 事务之commit

  • TransactionAspectSupport#commitTransactionAfterReturning ->AbstractPlatformTransactionManager#commit -> processCommit- > doCommit -> DataSourceTransactionManager#con.commit()
  • ConnectionImpl#commit
//执行sql
this.session.execSQL(null, "commit", -1, null, false, this.nullStatementResultSetFactory, null, false);

4 总结

1.事务是根据Spring的AOP机制去完成的
2.AOP生是根据类生成代理类的,所以事务代码块执行时使用this.addMoney(事务块代码)是无效的,只能重新获取该代理类执行事务块代码才会生效
3.事务是每次处理完都会被回收掉的。

以上就是本章的全部内容了。

上一篇:mybatis第九话 - 手写实现一个简单的mybatis版本
下一篇:mybatis第十一话 - mybaits getConnection连接数据库的源码分析

贵有恒何必三更眠五更起,最无益只怕一日曝十日寒

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐