2023年7月18日
多线程事务处理-手动回滚2
1、提取子任务执行的代码
@Service
@Slf4j
public class TransactionService {
@Autowired
private PlatformTransactionManager txManager;
public <T> void processInTransaction(Callable<T> callable) {
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus status = txManager.getTransaction(def);
try {
callable.call();
txManager.commit(status);
} catch (Exception e) {
log.error("An error occurred: {}", e.getMessage());
txManager.rollback(status);
throw new RuntimeException(e);
}
}
}
2、主方法中使用,并增加每一个子任务的异常处理
@Service
public class TestTxService {
@Autowired
private TransactionService transactionService;
// List of services
private List<Callable<Void>> services;
public TestTxService(@Autowired Service1 service1,
@Autowired Service2 service2,
@Autowired Service3 service3) {
this.services = Arrays.asList(
() -> {
try {
service1.doSomething();
return null;
} catch (SpecificException1 e) {
// handle exception specific to service 1
log.error("Error in Service1: {}", e.getMessage());
// Additional handling code...
throw e;
}
},
() -> {
try {
service2.doSomething();
return null;
} catch (SpecificException2 e) {
// handle exception specific to service 2
log.error("Error in Service2: {}", e.getMessage());
// Additional handling code...
throw e;
}
},
() -> {
try {
service3.doSomething();
return null;
} catch (SpecificException3 e) {
// handle exception specific to service 3
log.error("Error in Service3: {}", e.getMessage());
// Additional handling code...
throw e;
}
}
);
}
public void testTx() {
int totalThreadCount = services.size(); // Use size of services list
ExecutorService executorService = Executors.newFixedThreadPool(totalThreadCount);
CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
for (Callable<Void> service : services) {
completionService.submit(() -> {
transactionService.processInTransaction(service);
return null;
});
}
executorService.shutdown();
for (int i = 0; i < totalThreadCount; i++) {
try {
completionService.take().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException("An exception was thrown in a thread", e.getCause());
}
}
}
}