事务工具类
TransactionManager.java
public class TransactionManager {
private final static Logger logger = LoggerFactory.getLogger(TransactionManager.class);
/** 是否开启了事务 **/
private static final ThreadLocal<Boolean> open = new ThreadLocal();
/** 使用ThreadLocal来存储每个线程的嵌套层次 **/
private static ThreadLocal<Integer> transactionLevelHolder = ThreadLocal.withInitial(() -> 0);
/**
* 数据库连接ThreadLocal
* 当某个线程第一次调用 getConnection 方法时,如果该线程还没有为 connectionHolder 设置值,
* 那么 withInitial 提供的初始化方法会被调用,创建一个新的数据库连接并返回。
* 这样,每个线程都有自己独立的数据库连接,确保线程安全和事务管理的独立性
*/
private static final ThreadLocal<Connection> connectionHolder = ThreadLocal.withInitial(() -> {
Connection connection;
try {
if (FGBiz.dspCenter.dbConnectionSet == null) {
throw new RuntimeException("获取数据库连接失败");
}
long begin = System.currentTimeMillis();
connection = ConnSet.getDataSource().getConnection();
logger.info("获取数据库连接耗时:{} ms",System.currentTimeMillis()-begin);
} catch (Exception e) {
throw new RuntimeException("获取数据库连接失败", e);
}
try {
connection.setAutoCommit(false); // 关闭自动提交,开启事务管理
return createProxyConnection(connection);
} catch (SQLException e) {
throw new RuntimeException("关闭自动提交,开启事务管理失败", e);
}
});
// 开始事务
public static void beginTransaction() {
int transactionLevel = transactionLevelHolder.get();
if (transactionLevel == 0) {
getConnection(); // 确保连接已创建并设置为非自动提交
}
transactionLevelHolder.set(transactionLevel + 1);
}
public static Connection getConnection() {
open .set(true);
return connectionHolder.get();
}
// 提交事务
public static void commit() {
int transactionLevel = transactionLevelHolder.get();
if (transactionLevel == 1) {
Connection connection = connectionHolder.get();
try {
if (connection != null && !connection.isClosed()) {
connection.commit();
}
} catch (SQLException e) {
logger.error("commit失败,失败原因:",e);
} finally {
transactionLevelHolder.set(0);
}
} else if (transactionLevel > 1) {
transactionLevelHolder.set(transactionLevel - 1);
}
}
// 回滚事务
public static void rollback() {
int transactionLevel = transactionLevelHolder.get();
if (transactionLevel > 0) {
Connection connection = connectionHolder.get();
try {
if (connection != null && !connection.isClosed()) {
connection.rollback();
}
} catch (SQLException e) {
logger.error("rollback失败,失败原因:",e);
} finally {
transactionLevelHolder.set(0);
}
}
}
/**
* 注意:务必在final里关闭连接
* 关闭连接,并清理ThreadLocal
*/
public static void close() {
Connection connection = connectionHolder.get();
try {
if (connection != null && !connection.isClosed()) {
if(connection instanceof Proxy){
((ConnectionProxy) Proxy.getInvocationHandler(connection)).reallyClose();
}else {
connection.close();
}
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
connectionHolder.remove();
open.remove();
transactionLevelHolder.remove();
}
}
//是否开启了事务
public static boolean isOpen() {
return open.get() != null && open.get();
}
// 创建代理连接的方法
public static Connection createProxyConnection(Connection originalConnection) {
return (Connection) Proxy.newProxyInstance(
Thread.currentThread().getContextClassLoader(),
new Class<?>[]{Connection.class},
new ConnectionProxy(originalConnection)
);
}
// 代理类
public static class ConnectionProxy implements InvocationHandler {
private final Connection originalConnection;
private boolean isClosed = false;
public ConnectionProxy(Connection originalConnection) {
this.originalConnection = originalConnection;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("close".equals(method.getName())) {
// 拦截 close 方法,不执行关闭操作
isClosed = true;
logger.info("事务未结束,暂不关闭连接");
return null;
}
// 其他方法调用原始连接的方法
return method.invoke(originalConnection, args);
}
// 真正关闭连接的方法
public void reallyClose() throws SQLException {
logger.info("事务结束,关闭连接");
originalConnection.close();
}
}
}
TransactionUtil.java
public class TransactionUtil {
private final static Logger logger = LoggerFactory.getLogger(TransactionUtil.class);
/**
* 在事务中处理业务,抛出异常将会回滚,支持事务嵌套
* @param supplier 处理业务
* @return supplier返回
* @param <T> 响应类型
*/
public static <T> T doInTransaction(Supplier<T> supplier) {
try{
logger.info("开始事务处理...");
TransactionManager.beginTransaction();
try {
T apply = supplier.get();
TransactionManager.commit();
logger.info("事务提交");
return apply;
} catch (Exception e) {
TransactionManager.rollback();
logger.info("事务回滚");
throw new BusinessException(e.getMessage(),e);
}
}catch (BusinessException e) {
throw e;
} catch (Exception e) {
throw new BusinessException("事务提交或回滚异常",e);
}finally {
TransactionManager.close(); //关闭连接 并清理ThreadLocal
logger.info("事务处理结束");
}
}
private TransactionUtil() {
}
}
TransactionManager
- 这个类负责管理数据库连接和事务。
- 使用ThreadLocal来存储每个线程的嵌套层次(transactionLevelHolder)。
- 使用ThreadLocal来存储每个线程的数据库连接(connectionHolder)。
- 提供了beginTransaction、commit、rollback和close方法来管理事务。
- 提供了isOpen方法来检查是否已经开启了事务。
- createProxyConnection方法创建一个代理连接用于拦截close方法。
TransactionUtil
- 这个类提供了一个简便的API来进行事务处理。
- doInTransaction方法使用supplier接口来执行业务逻辑,支持事务嵌套。
- 当异常发生时会自动回滚事务,并抛出BusinessException。
- 最后关闭数据库连接并清理ThreadLocal。