2023年7月18日
多线程事务处理-手动回滚
@Service
@Slf4j
public class TestTxService {
@Autowired
private PlatformTransactionManager txManager;
@Autowired
private OtherService otherService; // 其他服务
public void testTx() {
// 定义总的线程数
int totalThreadCount = 3;
// 定义事务的基本属性
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(totalThreadCount);
// 创建一个CompletionService来管理异步执行的结果
CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
// 提交任务到线程池
for (int i = 0; i < totalThreadCount; i++) {
completionService.submit(() -> {
// 开始一个新的事务
TransactionStatus status = txManager.getTransaction(def);
try {
// 调用外部服务处理
otherService.doSomething();
// 随机抛出异常来模拟运行时错误
if (ThreadLocalRandom.current().nextBoolean()) {
throw new RuntimeException("Manual exception");
}
// 提交事务
txManager.commit(status);
} catch (Exception e) {
// 如果出现异常,记录异常信息并回滚事务
log.error("An error occurred: {}", e.getMessage());
txManager.rollback(status);
throw e;
}
// 没有返回结果
return null;
});
}
// 关闭线程池,不再接受新的任务
executorService.shutdown();
// 等待所有任务完成
for (int i = 0; i < totalThreadCount; i++) {
try {
// 如果任务执行抛出了异常,这里会得知
completionService.take().get();
} catch (InterruptedException e) {
// 如果等待被中断,恢复中断状态并抛出异常
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
// 如果任务执行抛出了异常,包装并抛出一个新的异常
throw new RuntimeException("An exception was thrown in a thread", e.getCause());
}
}
}
}