博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Trasnaction管理(1)- 线程间事务隔离
阅读量:6146 次
发布时间:2019-06-21

本文共 10479 字,大约阅读时间需要 34 分钟。

问题导读

  • Spring中事务是如何实现的
  • Spring中各个线程间是如何进行连接、事务隔离的

Spring事务配置

Spring的事务管理应该是日常开发中总会碰到的,但是Spring具体是怎么实现线程间的事务隔离的,下面我们就最基本的DataSourceTransactionMnager来看下。

一般使用的是以下的方式配置transaction(当然还有其他aop等方式)

将datasource注入transactionManager之后注册tx:annotation-driven内就可以在代码中使用注解Transactional进行定义事务了

这里是Spring在启动时将transactionManager的代码织入业务代码来实现事务管理(后续会研究如何织入的)。

getTransaction

当调用到相关业务代码前首先调用AbstractPlatformTransactionManager 的getTransaction这个方法会控制事务的传播级别(require,requirenew,support。。。)

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {        Object transaction = doGetTransaction(); //获得现有transaction        // Cache debug flag to avoid repeated checks.        boolean debugEnabled = logger.isDebugEnabled();        if (definition == null) {            // Use defaults if no transaction definition given.            definition = new DefaultTransactionDefinition();        }        if (isExistingTransaction(transaction)) {            // 如果当前transaction存在就使用handleExistingTransaction,这里是主要控制事务传播机制的地方            return handleExistingTransaction(definition, transaction, debugEnabled);        }        // Check definition settings for new transaction.        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());        }        // 当前无transaction存在,根据事务传播级别进行控制        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {            throw new IllegalTransactionStateException(                    "No existing transaction found for transaction marked with propagation 'mandatory'");        }        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {            SuspendedResourcesHolder suspendedResources = suspend(null);            if (debugEnabled) {                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);            }            try {                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);                DefaultTransactionStatus status = newTransactionStatus(                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);                //真正开始事务                doBegin(transaction, definition);                //将事务状态存入TransactionSynchronizationManager                prepareSynchronization(status, definition);                return status;            }            catch (RuntimeException ex) {                resume(null, suspendedResources);                throw ex;            }            catch (Error err) {                resume(null, suspendedResources);                throw err;            }        }        else {            // Create "empty" transaction: no actual transaction, but potentially synchronization.            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {                logger.warn("Custom isolation level specified but no actual transaction initiated; " +                        "isolation level will effectively be ignored: " + definition);            }            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);        }    }
//获得当前transaction(如果有)    protected Object doGetTransaction() {        DataSourceTransactionObject txObject = new DataSourceTransactionObject();        txObject.setSavepointAllowed(isNestedTransactionAllowed());        //从TransactionSynchronizationManager中获得当前线程中的Connection        ConnectionHolder conHolder =                (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);        txObject.setConnectionHolder(conHolder, false);        return txObject;    }        ...    //开始事务    protected void doBegin(Object transaction, TransactionDefinition definition) {        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;        Connection con = null;        try {            if (txObject.getConnectionHolder() == null ||                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {                //从配置中注入的datasource中获得connection                Connection newCon = this.dataSource.getConnection();                if (logger.isDebugEnabled()) {                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");                }                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);            }            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);            con = txObject.getConnectionHolder().getConnection();            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);            txObject.setPreviousIsolationLevel(previousIsolationLevel);            // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,            // so we don't want to do it unnecessarily (for example if we've explicitly            // configured the connection pool to set it already).            if (con.getAutoCommit()) {                txObject.setMustRestoreAutoCommit(true);                if (logger.isDebugEnabled()) {                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");                }                //设置非自动提交                con.setAutoCommit(false);            }            txObject.getConnectionHolder().setTransactionActive(true);            int timeout = determineTimeout(definition);            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);            }            // Bind the session holder to the thread.            if (txObject.isNewConnectionHolder()) {                //将新建的Connection放入TransactionSynchronizationManager                TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());            }        }        catch (Throwable ex) {            if (txObject.isNewConnectionHolder()) {                DataSourceUtils.releaseConnection(con, this.dataSource);                txObject.setConnectionHolder(null, false);            }            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);        }    }            protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {        if (status.isNewSynchronization()) {            //将transaction的状态保存到TransactionSynchronizationManager            TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(                    definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?                            definition.getIsolationLevel() : null);            TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());            TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());            TransactionSynchronizationManager.initSynchronization();        }    }
public abstract class TransactionSynchronizationManager {    //ThreadLocal的resources用来保存TransactionStatus Connection等    private static final ThreadLocal
> resources = new NamedThreadLocal<>("Transactional resources"); ... public static Object getResource(Object key) { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); if (value != null && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } return value; }

可以看到其实Spring只是简单的将获得的连接和事务信息存放到TransactionSynchronizationManager中的ThreadLoacl变量中进行保存,这样就实现了数据库连接及事务的线程安全。

Commit/Rollback

在Commit/rollback阶段是使用从一开始getTransaction方法返回的TransactionStatus(其中存放了connection和transaction的信息)作为参数传入commit/rollback进行commit/rollback并在finally中清理TransactionSynchronizationManager

//commit部分代码    private void processCommit(DefaultTransactionStatus status) throws TransactionException {        try {            boolean beforeCompletionInvoked = false;            try {                prepareForCommit(status);                triggerBeforeCommit(status);                triggerBeforeCompletion(status);                beforeCompletionInvoked = true;                boolean globalRollbackOnly = false;                if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {                    globalRollbackOnly = status.isGlobalRollbackOnly();                }                if (status.hasSavepoint()) {                    if (status.isDebug()) {                        logger.debug("Releasing transaction savepoint");                    }                    status.releaseHeldSavepoint();                }                //如果是自己创建的事物就进行提交,如果不是(比如是嵌套的)就由上层提交                else if (status.isNewTransaction()) {                    if (status.isDebug()) {                        logger.debug("Initiating transaction commit");                    }                    doCommit(status);                }                // Throw UnexpectedRollbackException if we have a global rollback-only                // marker but still didn't get a corresponding exception from commit.                if (globalRollbackOnly) {                    throw new UnexpectedRollbackException(                            "Transaction silently rolled back because it has been marked as rollback-only");                }        ...        }        finally {            //清除当前TransactionSynchronizationManager中transaction状态和释放申请的连接            cleanupAfterCompletion(status);        }    }

小结

Spring内部维护了TransactionSynchronizationManager一个单例,并使用ThreadLocal变量记录所有连接事务的信息,这样就防止了线程之间事务、连接的共享,从而实现事务的隔离

转载于:https://www.cnblogs.com/resentment/p/5744708.html

你可能感兴趣的文章
ajax查询数据库时数据无法更新的问题
查看>>
Kickstart 无人职守安装,终于搞定了。
查看>>
linux开源万岁
查看>>
linux/CentOS6忘记root密码解决办法
查看>>
25个常用的Linux iptables规则
查看>>
集中管理系统--puppet
查看>>
分布式事务最终一致性常用方案
查看>>
Exchange 2013 PowerShell配置文件
查看>>
JavaAPI详解系列(1):String类(1)
查看>>
HTML条件注释判断IE<!--[if IE]><!--[if lt IE 9]>
查看>>
发布和逸出-构造过程中使this引用逸出
查看>>
Oracle执行计划发生过变化的SQL语句脚本
查看>>
使用SanLock建立简单的HA服务
查看>>
发现一个叫阿尔法城的小站(以后此贴为我记录日常常用网址的帖子了)
查看>>
Subversion使用Redmine帐户验证简单应用、高级应用以及优化
查看>>
Javascript Ajax 异步请求
查看>>
DBCP连接池
查看>>
cannot run programing "db2"
查看>>
mysql做主从relay-log问题
查看>>
Docker镜像与容器命令
查看>>