事务工具类

TransactionManager.java

public class TransactionManager {
    private final static Logger logger = LoggerFactory.getLogger(TransactionManager.class);
    /** 是否开启了事务 **/
    private static final ThreadLocal<Boolean> open = new ThreadLocal();

    /** 使用ThreadLocal来存储每个线程的嵌套层次 **/
    private static ThreadLocal<Integer> transactionLevelHolder = ThreadLocal.withInitial(() -> 0);

    /**
     * 数据库连接ThreadLocal
     * 当某个线程第一次调用 getConnection 方法时,如果该线程还没有为 connectionHolder 设置值,
     * 那么 withInitial 提供的初始化方法会被调用,创建一个新的数据库连接并返回。
     * 这样,每个线程都有自己独立的数据库连接,确保线程安全和事务管理的独立性
     */
    private static final ThreadLocal<Connection> connectionHolder = ThreadLocal.withInitial(() -> {
        Connection connection;
        try {
            if (FGBiz.dspCenter.dbConnectionSet == null) {
                throw new RuntimeException("获取数据库连接失败");

            }
            long begin = System.currentTimeMillis();
            connection = ConnSet.getDataSource().getConnection();
            logger.info("获取数据库连接耗时:{} ms",System.currentTimeMillis()-begin);

        } catch (Exception e) {
            throw new RuntimeException("获取数据库连接失败", e);
        }
        try {
            connection.setAutoCommit(false); // 关闭自动提交,开启事务管理
            return createProxyConnection(connection);
        } catch (SQLException e) {
            throw new RuntimeException("关闭自动提交,开启事务管理失败", e);
        }
    });

    // 开始事务
    public static void beginTransaction() {
        int transactionLevel = transactionLevelHolder.get();
        if (transactionLevel == 0) {
            getConnection(); // 确保连接已创建并设置为非自动提交
        }
        transactionLevelHolder.set(transactionLevel + 1);
    }
    public static Connection getConnection() {
        open .set(true);
        return connectionHolder.get();
    }

    // 提交事务
    public static void commit() {
        int transactionLevel = transactionLevelHolder.get();
        if (transactionLevel == 1) {
            Connection connection = connectionHolder.get();
            try {
                if (connection != null && !connection.isClosed()) {
                    connection.commit();
                }
            } catch (SQLException e) {
                logger.error("commit失败,失败原因:",e);
            } finally {
                transactionLevelHolder.set(0);
            }
        } else if (transactionLevel > 1) {
            transactionLevelHolder.set(transactionLevel - 1);
        }
    }

    // 回滚事务
    public static void rollback() {
        int transactionLevel = transactionLevelHolder.get();
        if (transactionLevel > 0) {
            Connection connection = connectionHolder.get();
            try {
                if (connection != null && !connection.isClosed()) {
                    connection.rollback();
                }
            } catch (SQLException e) {
                logger.error("rollback失败,失败原因:",e);
            } finally {
                transactionLevelHolder.set(0);
            }
        }
    }

    /**
     * 注意:务必在final里关闭连接
     * 关闭连接,并清理ThreadLocal
     */
    public static void close() {
        Connection connection = connectionHolder.get();
        try {
            if (connection != null && !connection.isClosed()) {
                if(connection instanceof Proxy){
                    ((ConnectionProxy) Proxy.getInvocationHandler(connection)).reallyClose();
                }else {
                    connection.close();
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            connectionHolder.remove();
            open.remove();
            transactionLevelHolder.remove();
        }
    }


    //是否开启了事务
    public static boolean isOpen() {
        return open.get() != null && open.get();
    }
    // 创建代理连接的方法
    public static Connection createProxyConnection(Connection originalConnection) {
        return (Connection) Proxy.newProxyInstance(
                Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{Connection.class},
                new ConnectionProxy(originalConnection)
        );
    }

    // 代理类
    public static class ConnectionProxy implements InvocationHandler {
        private final Connection originalConnection;
        private boolean isClosed = false;

        public ConnectionProxy(Connection originalConnection) {
            this.originalConnection = originalConnection;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if ("close".equals(method.getName())) {
                // 拦截 close 方法,不执行关闭操作
                isClosed = true;
                logger.info("事务未结束,暂不关闭连接");
                return null;
            }
            // 其他方法调用原始连接的方法
            return method.invoke(originalConnection, args);
        }

        // 真正关闭连接的方法
        public void reallyClose() throws SQLException {
            logger.info("事务结束,关闭连接");
            originalConnection.close();
        }
    }
}

TransactionUtil.java

public class TransactionUtil {
    private final static Logger logger = LoggerFactory.getLogger(TransactionUtil.class);

    /**
     * 在事务中处理业务,抛出异常将会回滚,支持事务嵌套
     * @param supplier 处理业务
     * @return supplier返回
     * @param <T> 响应类型
     */
    public static <T> T doInTransaction(Supplier<T> supplier) {
        try{
            logger.info("开始事务处理...");
            TransactionManager.beginTransaction();
            try {
                T apply = supplier.get();
                TransactionManager.commit();
                logger.info("事务提交");
                return apply;
            } catch (Exception e) {
                TransactionManager.rollback();
                logger.info("事务回滚");
                throw new BusinessException(e.getMessage(),e);
            }
        }catch (BusinessException e) {
            throw e;
        } catch (Exception e) {
            throw new BusinessException("事务提交或回滚异常",e);
        }finally {
            TransactionManager.close(); //关闭连接 并清理ThreadLocal
            logger.info("事务处理结束");
        }
    }

    private TransactionUtil() {
    }
}

TransactionManager

  • 这个类负责管理数据库连接和事务。
  • 使用ThreadLocal来存储每个线程的嵌套层次(transactionLevelHolder)。
  • 使用ThreadLocal来存储每个线程的数据库连接(connectionHolder)。
  • 提供了beginTransaction、commit、rollback和close方法来管理事务。
  • 提供了isOpen方法来检查是否已经开启了事务。
  • createProxyConnection方法创建一个代理连接用于拦截close方法。

TransactionUtil

  • 这个类提供了一个简便的API来进行事务处理。
  • doInTransaction方法使用supplier接口来执行业务逻辑,支持事务嵌套。
  • 当异常发生时会自动回滚事务,并抛出BusinessException。
  • 最后关闭数据库连接并清理ThreadLocal。