public class TransactionProxyFactoryBean extends AbstractSingletonProxyFactoryBean implements BeanFactoryAware { private final TransactionInterceptor transactionInterceptor = new TransactionInterceptor();//这个拦截器通过AOP发挥作用,通过这个拦截器的实现,Spring封装了事务处理实现 private Pointcut pointcut;
public TransactionProxyFactoryBean() { }
public void setTransactionManager(PlatformTransactionManager transactionManager) { this.transactionInterceptor.setTransactionManager(transactionManager); } //通过依赖注入的事务属性以Properties的形式出现,把BeanDefinition中读到的事务管理的属性信息注入到TransactionInterceptor中 public void setTransactionAttributes(Properties transactionAttributes) { this.transactionInterceptor.setTransactionAttributes(transactionAttributes); }
public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) { this.transactionInterceptor.setTransactionAttributeSource(transactionAttributeSource); }
public void setPointcut(Pointcut pointcut) { this.pointcut = pointcut; }
public void afterPropertiesSet() { if(this.getTransactionManager() == null && this.beanFactory == null) { throw new IllegalStateException("Set the \'transactionManager\' property or make sure to run within a BeanFactory containing a PlatformTransactionManager bean!"); } else if(this.getTransactionAttributeSource() == null) { throw new IllegalStateException("Either \'transactionAttributeSource\' or \'transactionAttributes\' is required: If there are no transactional methods, then don\'t use a transaction aspect."); } }
public void setTransactionAttributes(Properties transactionAttributes) { NameMatchTransactionAttributeSource tas = new NameMatchTransactionAttributeSource(); tas.setProperties(transactionAttributes); this.transactionAttributeSource = tas; }
@Override public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); }
if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled); }
// Check definition settings for new transaction. if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } //没有事务存在,需要根据事务传播属性设置来创建事务,这里会看到事务传播属性的设置:mandatory、required required_new nested等 // No existing transaction found -> check propagation behavior to find out how to proceed. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
/** * Create a TransactionStatus for an existing transaction. */ private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); }
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (useSavepointForNestedTransaction()) {//在Spring管理的事务中,创建事务保存点 // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } }
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {//返回的SuspendedResourcesHolder会作为参数传给TransactionStatus if (TransactionSynchronizationManager.isSynchronizationActive()) { List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null;//把挂起事务的处理交给具体事务处理器去完成,如果具体的事务处理器不支持事务挂起,则默认抛出TransactionSuspensionNotSupportedException if (transaction != null) { suspendedResources = doSuspend(transaction); }//这里在线程中保存与事务处理有关的信息,并重置线程中相关的ThreadLocal变量 String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException ex) { // doSuspend failed - original transaction is still active… 如果处理失败,则恢复原始的事务 doResumeSynchronization(suspendedSynchronizations); throw ex; } catch (Error err) { // doSuspend failed - original transaction is still active... doResumeSynchronization(suspendedSynchronizations); throw err; } } else if (transaction != null) { // Transaction active but no synchronization active. Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // Neither transaction nor synchronization active. return null; } }
/** * Reactivate transaction synchronization for the current thread * and resume all given synchronizations. * @param suspendedSynchronizations List of TransactionSynchronization objects */doSuspend 失败则恢复事务 private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) { TransactionSynchronizationManager.initSynchronization();//维护着ThreadLocal变量 for (TransactionSynchronization synchronization : suspendedSynchronizations) { synchronization.resume(); TransactionSynchronizationManager.registerSynchronization(synchronization); } }
/** * This implementation of commit handles participating in existing * transactions and programmatic rollback requests. * Delegates to {@code isRollbackOnly}, {@code doCommit} * and {@code rollback}. * @see org.springframework.transaction.TransactionStatus#isRollbackOnly() * @see #doCommit * @see #rollback */ @Override public final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); }
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) {//如果事务处理过程中发生了异常,调用回滚。 if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); }//处理回滚 processRollback(defStatus); // Throw UnexpectedRollbackException only at outermost transaction boundary // or if explicitly asked to. if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } return; } //处理提交入口 processCommit(defStatus); }
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try {//事务的提交准备工作由具体的事务处理器来完成 prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); }//嵌套事务的处理过程。 if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) {//根据当前线程中保存的事务状态进行处理,如果当前的事务是一个新的事务,调用具体事务处理器的完成提交,如果当前所持有的事务不是一个新事务,则不提交,由已经存在的事务来完成提交 if (status.isDebug()) { logger.debug("Initiating transaction commit"); } doCommit(status); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (globalRollbackOnly) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } catch (Error err) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, err); throw err; }
// Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); }