# spring事务的认识

  • 事务由数据库连接实现,即connection实例,脱离数据库连接无法谈事务
  • 事务传播,本质上是处理connection实例、线程、事务三者的关系
  • 在spring源码层面,提供了四种类型的事务控制实现,分别是jdbc,jms,jta,jca
  • Spring事务默认会对RuntimeException及其子类异常进行回滚, 而不会回滚受检异常,因为受检异常通常是预期行为,可以将抛出的异常转为运行期异常,或者更改默认的策略,即rollBackFor为Exception

# 事务执行的流程

/*xxx: 事务切面的起点*/
/*xxx: 事务拦截器 */
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
 @Override
	@Nullable
	/*xxx: 事务拦截器 执行的方法 */
	public Object invoke(MethodInvocation invocation) throws Throwable {
        //xxx: 省略其它抽象
        /*xxx: 调用父类的 方法*/
		return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
    }
}

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
	@Nullable
	/*xxx: 事务内执行*/
	protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {
     /*xxx: 获取事务属性类*/
		TransactionAttributeSource tas = getTransactionAttributeSource();
		/*xxx: 获取方法上面有 @Transactional注解的属性*/
		final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
		//xxx: 通过该属性,获取到事务管理器
		final TransactionManager tm = determineTransactionManager(txAttr);
        
        /*xxx: 新建事务信息, 会将其保存至线程上下文中 ,通过bindToThread()方法*/
			/*xxx: 新建事务信息,会进行 事务状态的传递 */
			/*xxx: 相当于 开启事务*/
			TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
        /*xxx: 返回值*/
		Object retVal;
        try {
        //xxx: 切面向下执行
		/*xxx: 如果没有事务切面了,就执行业务方法*/
				retVal = invocation.proceedWithInvocation();
        }catch (Throwable ex) {
        	/*xxx: 业务方法执行报错,回滚*/
		completeTransactionAfterThrowing(txInfo, ex);
        }finally {
				/*xxx: 清除当前事务*/
				/*xxx: 类似于 当前事务出栈*/
				cleanupTransactionInfo(txInfo);
		}
        
        /*xxx: 提交事务*/
commitTransactionAfterReturning(txInfo);
			/*xxx: 如果方法正常执行,则必提交事务成功*/
			return retVal;
    }
    
}

# 事务控制模板

package org.springframework.transaction;

public interface PlatformTransactionManager extends TransactionManager {
    
}
package org.springframework.transaction.support;

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
	@Override
	/*xxx: 事务操作 */
	public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException;
    
    /*xxx: 创建新事务*/
	private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
			boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources);
    
    //xxx: 处理已经存在的事务
	private TransactionStatus handleExistingTransaction(
			TransactionDefinition definition, Object transaction, boolean debugEnabled)
			throws TransactionException;
    
    protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException;
    
    @Override
	//xxx: 提交事务
	public final void commit(TransactionStatus status) throws TransactionException;
    
    //xxx: 处理回滚操作
	public final void rollback(TransactionStatus status) throws TransactionException;
}

# 获取事务

  • 之前不存在事务
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
 	@Override
	/*xxx: 事务操作 */
	public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException {
     	//xxx: 获取事务对象(本质上,是将当前线程的 数据库连接对象,与 事务相关联)
		Object transaction = doGetTransaction();
        
        //xxx: 首次进入,connectionHolder为空,不存在事务
		//xxx: 事务传播属性为 强制时,抛错
		if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
        
        //xxx: 如果事务隔离级别为 requered,requires_new,nested, 则加入事务
		else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
         	/*xxx: 创建新事务*/
				return startTransaction(def, transaction, debugEnabled, suspendedResources);   
        }
        
    }
    
}
  • 之前已经存在事务
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
 	@Override
	/*xxx: 事务操作 */
	public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException {
     	//xxx: 获取事务对象(本质上,是将当前线程的 数据库连接对象,与 事务相关联)
		Object transaction = doGetTransaction();
        
        //xxx: 首次进入,connectionHolder为空,不存在事务
		/*xxx: 如果事务已经存在*/
		if (isExistingTransaction(transaction)) {
			/*xxx: 处理已经存在的事务 */
			return handleExistingTransaction(def, transaction, debugEnabled);
		}
        
    }
    
    //xxx: 处理已经存在的事务
	private TransactionStatus handleExistingTransaction(
			TransactionDefinition definition, Object transaction, boolean debugEnabled)
			throws TransactionException {
        //xxx: 不允许有事务,则直接抛错
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
			throw new IllegalTransactionStateException(
					"Existing transaction found for transaction marked with propagation 'never'");
		}
        
        //xxx: 不支持当前事物,则将事务挂起
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
         //xxx: 挂起当前事务
			Object suspendedResources = suspend(transaction);
            
            //xxx: 把事务的一些信息,存在 ThreadLocal中
			return prepareTransactionStatus(
					definition, null, false, newSynchronization, debugEnabled, suspendedResources);
            
        }
        
        //xxx: 事务隔离级别为 新建事务
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
         	//xxx: 挂起当前事务
			SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
				//xxx: 新建事务
				return startTransaction(definition, transaction, debugEnabled, suspendedResources);
			}
			catch (RuntimeException | Error beginEx) {
				//xxx: 复原事务
resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
            
            
        }
        
        //xxx: 内嵌事务
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            //xxx: 如果配置了不允许事务,则抛错
            //xxx: 分为两种方式处理: 保存savePoint,或者新建事务
        }
        
    }
    
}

# 开启事务

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    /*xxx: 创建新事务*/
	private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
			boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
	//xxx: 创建事务状态信息,封装一些事务对象的信息,记录事务状态
		//xxx: 该事务标志为 新事务
		DefaultTransactionStatus status = newTransactionStatus(
				definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
		//xxx: 开启事务,
		//xxx: jdbc: transaction -> DataSourceTransactionObject jta: JtaTransactionObject
		//xxx: jms: transaction-> JmsTransactionObject  jca: CciLocalTransactionObject
		doBegin(transaction, definition);
		//xxx: 开启事务后,改变事务状态
		prepareSynchronization(status, definition);
		return status;        
}
    
}

# 挂起事务

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
	protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
     	//xxx: 事务不为空,则先挂起当前事务
				if (transaction != null) {
					suspendedResources = doSuspend(transaction);
				}   
    }
    
    //xxx: 挂起事务,由子类实现
	protected Object doSuspend(Object transaction) throws TransactionException {
		throw new TransactionSuspensionNotSupportedException(
				"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
	}
    
}

# 唤醒事务

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
	protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
			throws TransactionException {
        Object suspendedResources = resourcesHolder.suspendedResources;
			if (suspendedResources != null) {
				doResume(transaction, suspendedResources);
			}
    }
    
    //xxx: 复原事务,由子类实现
	protected void doResume(@Nullable Object transaction, Object suspendedResources) throws TransactionException {
		throw new TransactionSuspensionNotSupportedException(
				"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
	}
    
}

# 提交事务

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

	//xxx: 提交事务
	public final void commit(TransactionStatus status) throws TransactionException {
        //xxx: 处理事务提交操作
		processCommit(defStatus);
        
    }
    
    private void processCommit(DefaultTransactionStatus status) throws TransactionException {
         if (status.hasSavepoint()) {
             //xxx: 如果是嵌套事务,没有提交,只是将 savepoint 清除
					unexpectedRollback = status.isGlobalRollbackOnly();
					status.releaseHeldSavepoint();
         }
        //xxx: 如果是 required ,最外层的提交后,才会统一提交
				//xxx: 如果是 requires_new ,每一个事务都会进来提交
				else if (status.isNewTransaction()) 		{
                  //xxx: 提交事务
					doCommit(status);  
          }
        //xxx: 省略其它抽象
    }
    
}

# 回滚事务

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    
@Override
	//xxx: 处理回滚操作
	public final void rollback(TransactionStatus status) throws TransactionException {
        processRollback(defStatus, false);
    }
    
    private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
        //xxx: 内嵌事务,则去除回滚点
				if (status.hasSavepoint()) {
                   status.rollbackToHeldSavepoint();
                }
        
        		//xxx: 当前事务为新事务
				else if (status.isNewTransaction()) {
                    //xxx: 则执行回滚
					doRollback(status);
                }
    }
    
}

# 事务控制实现(JDBC为例,常见有jms,jta,jca)

# 模板接口

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    //xxx: 开启事务,由子类实现: spring源码自带实现事务的方式有: jdbc,jta,jca,jms
	protected abstract void doBegin(Object transaction, TransactionDefinition definition)
			throws TransactionException;
    
    //xxx: 获取事务对象,由子类实现,spring默认实现有: jdbc,jms,jta,jca
	protected abstract Object doGetTransaction() throws TransactionException;
    
    //xxx: 挂起事务,由子类实现
	protected Object doSuspend(Object transaction) throws TransactionException {
		throw new TransactionSuspensionNotSupportedException(
				"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
	}
    
    //xxx: 复原事务,由子类实现
	protected void doResume(@Nullable Object transaction, Object suspendedResources) throws TransactionException {
		throw new TransactionSuspensionNotSupportedException(
				"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
	}
    
    //xxx: 事务回滚,由子类实现
    protected abstract void doRollback(DefaultTransactionStatus status) throws TransactionException;
    
    //xxx: 事务提交,由子类实现
	protected abstract void doCommit(DefaultTransactionStatus status) throws TransactionException;

}

# JDBC实现

public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
		implements ResourceTransactionManager, InitializingBean {
    
    @Override
	//xxx: 获取事务
	protected Object doGetTransaction() {
		DataSourceTransactionObject txObject = new DataSourceTransactionObject();
		txObject.setSavepointAllowed(isNestedTransactionAllowed());
		/*xxx:当前数据源,在当前线程 的 连接对象 */
		ConnectionHolder conHolder =
				(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
		//xxx: 将该连接对象,设置进事务中
		txObject.setConnectionHolder(conHolder, false);
		return txObject;
	}
    
    @Override
	//xxx: 开启事务
	protected void doBegin(Object transaction, TransactionDefinition definition) {
     	//xxx: 如果之前没有连接,则新建连接
			if (!txObject.hasConnectionHolder() ||
					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
             	//xxx: 此处是 从连接池获取连接,算是应用层底层与上层的交界处
				Connection newCon = obtainDataSource().getConnection();
                
                //xxx: 将首次创建的连接,保存至 connectionHolder,connectionHolder 由 ThreadLocal实现
				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }
        
        	//xxx: 从当前事务,获取连接(只要是一个事务,则用的同一个连接)
			con = txObject.getConnectionHolder().getConnection();
        
        	//xxx: 设置事务的可读状态
			txObject.setReadOnly(definition.isReadOnly());
        
        	//xxx: 有事务的环境,如果设置了自动提交,则会自动切换到 手动提交
			if (con.getAutoCommit()) {
				txObject.setMustRestoreAutoCommit(true);
				//xxx: 关闭连接得自动提交,这一步实际上就开启了事务
				con.setAutoCommit(false);
			}
        
        	//xxx: 设置事务只读状态
			prepareTransactionalConnection(con, definition);
        
        	//xxx: 如果是新创建的事务
			if (txObject.isNewConnectionHolder()) {
				//xxx: 就 建立 当前线程,和 数据库连接的绑定关系
				TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
			}
    }
    
    @Override
	protected Object doSuspend(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		//xxx: 挂起事务,本质上,就是将当前事务的 数据库连接置空
		txObject.setConnectionHolder(null);
		//xxx: 并且,移除当前线程的 数据库连接对象
		return TransactionSynchronizationManager.unbindResource(obtainDataSource());
	}
    
    @Override
	protected void doResume(@Nullable Object transaction, Object suspendedResources) {
		//xxx: 将之前的数据库连接对象,进行复原
		TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
	}
    
    @Override
	protected void doRollback(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
		Connection con = txObject.getConnectionHolder().getConnection();
        //xxx: 通过数据库实现回滚
        con.rollback();
    }
    
    @Override
	protected void doCommit(DefaultTransactionStatus status) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
		Connection con = txObject.getConnectionHolder().getConnection();
        //xxx: 通过数据库连接,实现事务提交
        con.commit();
    }
}