# spring事务的认识
- 事务由数据库连接实现,即connection实例,脱离数据库连接无法谈事务
- 事务传播,本质上是处理connection实例、线程、事务三者的关系
- 在spring源码层面,提供了四种类型的事务控制实现,分别是jdbc,jms,jta,jca
- Spring事务默认会对RuntimeException及其子类异常进行回滚, 而不会回滚受检异常,因为受检异常通常是预期行为,可以将抛出的异常转为运行期异常,或者更改默认的策略,即rollBackFor为Exception
# 事务执行的流程
/*xxx: 事务切面的起点*/
/*xxx: 事务拦截器 */
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
@Override
@Nullable
/*xxx: 事务拦截器 执行的方法 */
public Object invoke(MethodInvocation invocation) throws Throwable {
//xxx: 省略其它抽象
/*xxx: 调用父类的 方法*/
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
@Nullable
/*xxx: 事务内执行*/
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
/*xxx: 获取事务属性类*/
TransactionAttributeSource tas = getTransactionAttributeSource();
/*xxx: 获取方法上面有 @Transactional注解的属性*/
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
//xxx: 通过该属性,获取到事务管理器
final TransactionManager tm = determineTransactionManager(txAttr);
/*xxx: 新建事务信息, 会将其保存至线程上下文中 ,通过bindToThread()方法*/
/*xxx: 新建事务信息,会进行 事务状态的传递 */
/*xxx: 相当于 开启事务*/
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
/*xxx: 返回值*/
Object retVal;
try {
//xxx: 切面向下执行
/*xxx: 如果没有事务切面了,就执行业务方法*/
retVal = invocation.proceedWithInvocation();
}catch (Throwable ex) {
/*xxx: 业务方法执行报错,回滚*/
completeTransactionAfterThrowing(txInfo, ex);
}finally {
/*xxx: 清除当前事务*/
/*xxx: 类似于 当前事务出栈*/
cleanupTransactionInfo(txInfo);
}
/*xxx: 提交事务*/
commitTransactionAfterReturning(txInfo);
/*xxx: 如果方法正常执行,则必提交事务成功*/
return retVal;
}
}
# 事务控制模板
package org.springframework.transaction;
public interface PlatformTransactionManager extends TransactionManager {
}
package org.springframework.transaction.support;
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
@Override
/*xxx: 事务操作 */
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
/*xxx: 创建新事务*/
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources);
//xxx: 处理已经存在的事务
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException;
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException;
@Override
//xxx: 提交事务
public final void commit(TransactionStatus status) throws TransactionException;
//xxx: 处理回滚操作
public final void rollback(TransactionStatus status) throws TransactionException;
}
# 获取事务
- 之前不存在事务
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
@Override
/*xxx: 事务操作 */
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
//xxx: 获取事务对象(本质上,是将当前线程的 数据库连接对象,与 事务相关联)
Object transaction = doGetTransaction();
//xxx: 首次进入,connectionHolder为空,不存在事务
//xxx: 事务传播属性为 强制时,抛错
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
//xxx: 如果事务隔离级别为 requered,requires_new,nested, 则加入事务
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
/*xxx: 创建新事务*/
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
}
}
- 之前已经存在事务
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
@Override
/*xxx: 事务操作 */
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
//xxx: 获取事务对象(本质上,是将当前线程的 数据库连接对象,与 事务相关联)
Object transaction = doGetTransaction();
//xxx: 首次进入,connectionHolder为空,不存在事务
/*xxx: 如果事务已经存在*/
if (isExistingTransaction(transaction)) {
/*xxx: 处理已经存在的事务 */
return handleExistingTransaction(def, transaction, debugEnabled);
}
}
//xxx: 处理已经存在的事务
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//xxx: 不允许有事务,则直接抛错
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//xxx: 不支持当前事物,则将事务挂起
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
//xxx: 挂起当前事务
Object suspendedResources = suspend(transaction);
//xxx: 把事务的一些信息,存在 ThreadLocal中
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
//xxx: 事务隔离级别为 新建事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
//xxx: 挂起当前事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
//xxx: 新建事务
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
//xxx: 复原事务
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
//xxx: 内嵌事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//xxx: 如果配置了不允许事务,则抛错
//xxx: 分为两种方式处理: 保存savePoint,或者新建事务
}
}
}
# 开启事务
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
/*xxx: 创建新事务*/
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
//xxx: 创建事务状态信息,封装一些事务对象的信息,记录事务状态
//xxx: 该事务标志为 新事务
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//xxx: 开启事务,
//xxx: jdbc: transaction -> DataSourceTransactionObject jta: JtaTransactionObject
//xxx: jms: transaction-> JmsTransactionObject jca: CciLocalTransactionObject
doBegin(transaction, definition);
//xxx: 开启事务后,改变事务状态
prepareSynchronization(status, definition);
return status;
}
}
# 挂起事务
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
//xxx: 事务不为空,则先挂起当前事务
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
}
//xxx: 挂起事务,由子类实现
protected Object doSuspend(Object transaction) throws TransactionException {
throw new TransactionSuspensionNotSupportedException(
"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
}
}
# 唤醒事务
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
doResume(transaction, suspendedResources);
}
}
//xxx: 复原事务,由子类实现
protected void doResume(@Nullable Object transaction, Object suspendedResources) throws TransactionException {
throw new TransactionSuspensionNotSupportedException(
"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
}
}
# 提交事务
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
//xxx: 提交事务
public final void commit(TransactionStatus status) throws TransactionException {
//xxx: 处理事务提交操作
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
if (status.hasSavepoint()) {
//xxx: 如果是嵌套事务,没有提交,只是将 savepoint 清除
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
//xxx: 如果是 required ,最外层的提交后,才会统一提交
//xxx: 如果是 requires_new ,每一个事务都会进来提交
else if (status.isNewTransaction()) {
//xxx: 提交事务
doCommit(status);
}
//xxx: 省略其它抽象
}
}
# 回滚事务
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
@Override
//xxx: 处理回滚操作
public final void rollback(TransactionStatus status) throws TransactionException {
processRollback(defStatus, false);
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
//xxx: 内嵌事务,则去除回滚点
if (status.hasSavepoint()) {
status.rollbackToHeldSavepoint();
}
//xxx: 当前事务为新事务
else if (status.isNewTransaction()) {
//xxx: 则执行回滚
doRollback(status);
}
}
}
# 事务控制实现(JDBC为例,常见有jms,jta,jca)
# 模板接口
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
//xxx: 开启事务,由子类实现: spring源码自带实现事务的方式有: jdbc,jta,jca,jms
protected abstract void doBegin(Object transaction, TransactionDefinition definition)
throws TransactionException;
//xxx: 获取事务对象,由子类实现,spring默认实现有: jdbc,jms,jta,jca
protected abstract Object doGetTransaction() throws TransactionException;
//xxx: 挂起事务,由子类实现
protected Object doSuspend(Object transaction) throws TransactionException {
throw new TransactionSuspensionNotSupportedException(
"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
}
//xxx: 复原事务,由子类实现
protected void doResume(@Nullable Object transaction, Object suspendedResources) throws TransactionException {
throw new TransactionSuspensionNotSupportedException(
"Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
}
//xxx: 事务回滚,由子类实现
protected abstract void doRollback(DefaultTransactionStatus status) throws TransactionException;
//xxx: 事务提交,由子类实现
protected abstract void doCommit(DefaultTransactionStatus status) throws TransactionException;
}
# JDBC实现
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
@Override
//xxx: 获取事务
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
/*xxx:当前数据源,在当前线程 的 连接对象 */
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
//xxx: 将该连接对象,设置进事务中
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
@Override
//xxx: 开启事务
protected void doBegin(Object transaction, TransactionDefinition definition) {
//xxx: 如果之前没有连接,则新建连接
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//xxx: 此处是 从连接池获取连接,算是应用层底层与上层的交界处
Connection newCon = obtainDataSource().getConnection();
//xxx: 将首次创建的连接,保存至 connectionHolder,connectionHolder 由 ThreadLocal实现
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
//xxx: 从当前事务,获取连接(只要是一个事务,则用的同一个连接)
con = txObject.getConnectionHolder().getConnection();
//xxx: 设置事务的可读状态
txObject.setReadOnly(definition.isReadOnly());
//xxx: 有事务的环境,如果设置了自动提交,则会自动切换到 手动提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
//xxx: 关闭连接得自动提交,这一步实际上就开启了事务
con.setAutoCommit(false);
}
//xxx: 设置事务只读状态
prepareTransactionalConnection(con, definition);
//xxx: 如果是新创建的事务
if (txObject.isNewConnectionHolder()) {
//xxx: 就 建立 当前线程,和 数据库连接的绑定关系
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
//xxx: 挂起事务,本质上,就是将当前事务的 数据库连接置空
txObject.setConnectionHolder(null);
//xxx: 并且,移除当前线程的 数据库连接对象
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
@Override
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
//xxx: 将之前的数据库连接对象,进行复原
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}
@Override
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
//xxx: 通过数据库实现回滚
con.rollback();
}
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
//xxx: 通过数据库连接,实现事务提交
con.commit();
}
}