dynamic-datasource icon indicating copy to clipboard operation
dynamic-datasource copied to clipboard

新增多数据源事务传播机制

Open zhaohaoh opened this issue 4 years ago • 9 comments

1.主要提取DynamicLocalTransactionInterceptor中的方法到TransactionalTemplate TransactionalInfo transactionInfo = transactionalExecutor.getTransactionInfo(); if (!StringUtils.isEmpty(TransactionContext.getXID())) { return transactionalExecutor.execute(); } boolean state = true; Object o; String xid = LocalTxUtil.startTransaction(); try { o = transactionalExecutor.execute(); } catch (Exception e) { state = !isRollback(e, transactionInfo); throw e; } finally { if (state) { LocalTxUtil.commit(xid); } else { LocalTxUtil.rollback(xid); } } return o; 原代理方法改为获取注解属性对象的封装 Method method = methodInvocation.getMethod(); final DSTransactional dsTransactional = method.getAnnotation(DSTransactional.class);

    TransactionalExecutor transactionalExecutor = new TransactionalExecutor() {
        @Override
        public Object execute() throws Throwable {
            return methodInvocation.proceed();
        }

        @Override
        public TransactionalInfo getTransactionInfo() {
            TransactionalInfo transactionInfo = new TransactionalInfo();
            transactionInfo.setPropagation(dsTransactional.propagation());
            transactionInfo.setNoRollbackFor(dsTransactional.noRollbackFor());
            transactionInfo.setRollbackFor(dsTransactional.rollbackFor());
            return transactionInfo;
        }
    }

2.AbstractRoutingDataSource获取数据源中获取事务的数据源的方法加入xid ConnectionProxy connection = ConnectionFactory.getConnection(ds); return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection; ConnectionProxy connection = ConnectionFactory.getConnection(xid, ds); return connection == null ? getConnectionProxy(xid,ds, determineDataSource().getConnection()) : connection; 3,。增加事务异常类TransactionException

public class TransactionException extends RuntimeException { public TransactionException(String message) { super(message); }

public TransactionException(String message, Throwable cause) {
    super(message, cause);
}

} 4.ConnectionFactory 对于事务数据源的处理通过xid保存 通过xid获取指定事务 public class ConnectionFactory {

private static final ThreadLocal<Map<String, ConnectionProxy>> CONNECTION_HOLDER =
        new ThreadLocal<Map<String, ConnectionProxy>>() {
private static final ThreadLocal<Map<String, Map<String, ConnectionProxy>>> CONNECTION_HOLDER =
        new ThreadLocal<Map<String, Map<String, ConnectionProxy>>>() {
            @Override
            protected Map<String, ConnectionProxy> initialValue() {
                return new ConcurrentHashMap<>(8);
            protected Map<String, Map<String, ConnectionProxy>> initialValue() {
                return new ConcurrentHashMap<>();
            }
        };

public static void putConnection(String ds, ConnectionProxy connection) {
    Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
    if (!concurrentHashMap.containsKey(ds)) {
public static void putConnection(String xid, String ds, ConnectionProxy connection) {
    Map<String, Map<String, ConnectionProxy>> concurrentHashMap = CONNECTION_HOLDER.get();
    Map<String, ConnectionProxy> connectionProxyMap = concurrentHashMap.get(xid);
    if (connectionProxyMap == null) {
        connectionProxyMap = new ConcurrentHashMap<>();
        concurrentHashMap.put(xid, connectionProxyMap);
    }
    if (!connectionProxyMap.containsKey(ds)) {
        try {
            connection.setAutoCommit(false);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        concurrentHashMap.put(ds, connection);
        connectionProxyMap.put(ds, connection);
    }
}

public static ConnectionProxy getConnection(String ds) {
    return CONNECTION_HOLDER.get().get(ds);
public static ConnectionProxy getConnection(String xid, String ds) {
    Map<String, Map<String, ConnectionProxy>> concurrentHashMap = CONNECTION_HOLDER.get();
    Map<String, ConnectionProxy> connectionProxyMap = concurrentHashMap.get(xid);
    if (CollectionUtils.isEmpty(connectionProxyMap)) {
        return null;
    }
    return connectionProxyMap.get(ds);
}

public static void notify(Boolean state) {
public static void notify(String xid, Boolean state) {
    Map<String, Map<String, ConnectionProxy>> concurrentHashMap = CONNECTION_HOLDER.get();
    try {
        Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
        for (ConnectionProxy connectionProxy : concurrentHashMap.values()) {
            connectionProxy.notify(state);
        if (CollectionUtils.isEmpty(concurrentHashMap)) {
            return;
        }
        Map<String, ConnectionProxy> connectionProxyMap = concurrentHashMap.get(xid);
        for (ConnectionProxy connectionProxy : connectionProxyMap.values()) {
            if (connectionProxy != null) {
                connectionProxy.notify(state);
            }
        }
    } finally {
        CONNECTION_HOLDER.remove();
        concurrentHashMap.remove(xid);
    }
}

5.DsPropagation 事务传播机制枚举 6.src/main/java/com/baomidou/dynamic/datasource/tx/LocalTxUtil.java 事务工具提交关闭事务加入xid进行获取 public static void startTransaction() { if (!StringUtils.isEmpty(TransactionContext.getXID())) { log.debug("dynamic-datasource exist local tx [{}]", TransactionContext.getXID()); public static String startTransaction() { String xid = TransactionContext.getXID(); if (!StringUtils.isEmpty(xid)) { log.debug("dynamic-datasource exist local tx [{}]", xid); } else { String xid = UUID.randomUUID().toString(); xid = UUID.randomUUID().toString(); TransactionContext.bind(xid); log.debug("dynamic-datasource start local tx [{}]", xid); } return xid; }

/**
 * 手动提交事务
 */
public static void commit() {
    ConnectionFactory.notify(true);
public static void commit(String xid) {
    ConnectionFactory.notify(xid, true);
    log.debug("dynamic-datasource commit local tx [{}]", TransactionContext.getXID());
    TransactionContext.remove();
}

/**
 * 手动回滚事务
 */
public static void rollback() {
    ConnectionFactory.notify(false);
public static void rollback(String xid) {
    ConnectionFactory.notify(xid, false);
    log.debug("dynamic-datasource rollback local tx [{}]", TransactionContext.getXID());
    TransactionContext.remove();
}

7.事务传播机制使用的事务暂停xid对象 用于事务暂停和恢复xid package com.baomidou.dynamic.datasource.tx;

import javax.annotation.Nonnull;

public class SuspendedResourcesHolder { /** * The xid */ private String xid;

public SuspendedResourcesHolder(String xid) {
    if (xid == null) {
        throw new IllegalArgumentException("xid must be not null");
    }
    this.xid = xid;
}

@Nonnull
public String getXid() {
    return xid;
}

} 8。TransactionalExecutor 事务执行器接口 9.TransactionalInfo 对事务注解中属性的封装 10.TransactionalTemplate 事务模板方法 对事务的操作抽取到模板方法中实现。后续可自定义扩展模板方法 处理异常和事务传播机制 package com.baomidou.dynamic.datasource.tx;

import com.baomidou.dynamic.datasource.exception.TransactionException; import com.baomidou.mybatisplus.core.toolkit.ArrayUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils;

import java.util.Objects;

@Slf4j public class TransactionalTemplate {

public Object execute(TransactionalExecutor transactionalExecutor) throws Throwable {
    TransactionalInfo transactionInfo = transactionalExecutor.getTransactionInfo();
    DsPropagation propagation = transactionInfo.propagation;
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction()) {
                    suspendedResourcesHolder = suspend();
                }
                return transactionalExecutor.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction()) {
                    suspendedResourcesHolder = suspend();
                }
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (!existingTransaction()) {
                    return transactionalExecutor.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // default
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction()) {
                    throw new TransactionException("Existing transaction found for transaction marked with propagation never");
                } else {
                    // Execute without transaction and return.
                    return transactionalExecutor.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (!existingTransaction()) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }
        return doExecute(transactionalExecutor);
    } finally {
        resume(suspendedResourcesHolder);
    }
}

private Object doExecute(TransactionalExecutor transactionalExecutor) throws Throwable {
    TransactionalInfo transactionInfo = transactionalExecutor.getTransactionInfo();
    if (!StringUtils.isEmpty(TransactionContext.getXID())) {
        return transactionalExecutor.execute();
    }
    boolean state = true;
    Object o;
    String xid = LocalTxUtil.startTransaction();
    try {
        o = transactionalExecutor.execute();
    } catch (Exception e) {
        state = !isRollback(e, transactionInfo);
        throw e;
    } finally {
        if (state) {
            LocalTxUtil.commit(xid);
        } else {
            LocalTxUtil.rollback(xid);
        }
    }
    return o;
}

private boolean isRollback(Throwable e, TransactionalInfo transactionInfo) {
    boolean isRollback = true;
    Class<? extends Throwable>[] rollbacks = transactionInfo.rollbackFor;
    Class<? extends Throwable>[] noRollbackFor = transactionInfo.noRollbackFor;
    if (ArrayUtils.isNotEmpty(noRollbackFor)) {
        for (Class<? extends Throwable> noRollBack : noRollbackFor) {
            int depth = getDepth(e.getClass(), noRollBack);
            if (depth >= 0) {
                return false;
            }
        }
    }
    if (ArrayUtils.isNotEmpty(rollbacks)) {
        for (Class<? extends Throwable> rollback : rollbacks) {
            int depth = getDepth(e.getClass(), rollback);
            if (depth >= 0) {
                return isRollback;
            }
        }
    }
    return false;
}

private int getDepth(Class<?> exceptionClass, Class<? extends Throwable> rollback) {
    if (rollback == Throwable.class || rollback == Exception.class) {
        return 0;
    }
    // If we've gone as far as we can go and haven't found it...
    if (exceptionClass == Throwable.class) {
        return -1;
    }
    if (Objects.equals(exceptionClass, rollback)) {
        return 0;
    }
    return getDepth(exceptionClass.getSuperclass(), rollback);
}

private void resume(SuspendedResourcesHolder suspendedResourcesHolder) {
    if (suspendedResourcesHolder != null) {
        String xid = suspendedResourcesHolder.getXid();
        TransactionContext.bind(xid);
    }
}

public SuspendedResourcesHolder suspend() {
    String xid = TransactionContext.getXID();
    if (xid != null) {
        if (log.isInfoEnabled()) {
            log.info("Suspending current transaction, xid = {}", xid);
        }
        TransactionContext.unbind(xid);
        return new SuspendedResourcesHolder(xid);
    } else {
        return null;
    }
}

public boolean existingTransaction() {
    return !StringUtils.isEmpty(TransactionContext.getXID());
}

}

zhaohaoh avatar Nov 24 '21 07:11 zhaohaoh

多数据源事务传播机制

zhaohaoh avatar Nov 24 '21 07:11 zhaohaoh

在玩呢?

huayanYu avatar Nov 24 '21 07:11 huayanYu

你好,有看么

zhaohaoh avatar Nov 26 '21 01:11 zhaohaoh

恩,改动比较大,暂时不会合并。空了会详细测试。先放这里

huayanYu avatar Nov 26 '21 02:11 huayanYu

好的,需要改动或兼容可以@

zhaohaoh avatar Nov 26 '21 02:11 zhaohaoh

有冲突

huayanYu avatar Nov 29 '21 02:11 huayanYu

之前是基于3.4改的,我看下3.5

zhaohaoh avatar Nov 29 '21 03:11 zhaohaoh

更新了下comment ,冲突主要是方法参数加入了xid,可以替换。还有个冲突是原代理类中的事务的方法抽取到了事务模板方法类中

zhaohaoh avatar Nov 29 '21 04:11 zhaohaoh

有考虑么,冲突可以先用我的版本试试

zhaohaoh avatar Mar 18 '22 01:03 zhaohaoh

merge

zhaohaoh avatar Dec 25 '22 13:12 zhaohaoh