2023年7月18日 作者 zeroheart

多线程事务处理-手动回滚

@Service
@Slf4j
public class TestTxService {

@Autowired
private PlatformTransactionManager txManager;

@Autowired
private OtherService otherService; // 其他服务

public void testTx() {
    // 定义总的线程数
    int totalThreadCount = 3;

    // 定义事务的基本属性
    DefaultTransactionDefinition def = new DefaultTransactionDefinition();
    def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);

    // 创建一个固定大小的线程池
    ExecutorService executorService = Executors.newFixedThreadPool(totalThreadCount);

    // 创建一个CompletionService来管理异步执行的结果
    CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);

    // 提交任务到线程池
    for (int i = 0; i < totalThreadCount; i++) {
        completionService.submit(() -> {
            // 开始一个新的事务
            TransactionStatus status = txManager.getTransaction(def);
            try {
                // 调用外部服务处理
                otherService.doSomething();

                // 随机抛出异常来模拟运行时错误
                if (ThreadLocalRandom.current().nextBoolean()) {
                    throw new RuntimeException("Manual exception");
                }

                // 提交事务
                txManager.commit(status);
            } catch (Exception e) {
                // 如果出现异常,记录异常信息并回滚事务
                log.error("An error occurred: {}", e.getMessage());
                txManager.rollback(status);
                throw e;
            }

            // 没有返回结果
            return null;
        });
    }

    // 关闭线程池,不再接受新的任务
    executorService.shutdown();

    // 等待所有任务完成
    for (int i = 0; i < totalThreadCount; i++) {
        try {
            // 如果任务执行抛出了异常,这里会得知
            completionService.take().get();
        } catch (InterruptedException e) {
            // 如果等待被中断,恢复中断状态并抛出异常
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            // 如果任务执行抛出了异常,包装并抛出一个新的异常
            throw new RuntimeException("An exception was thrown in a thread", e.getCause());
        }
    }
}

}