2023年7月18日 作者 zeroheart

多线程事务处理-手动回滚2

1、提取子任务执行的代码

@Service
@Slf4j
public class TransactionService {

@Autowired
private PlatformTransactionManager txManager;

public <T> void processInTransaction(Callable<T> callable) {
    DefaultTransactionDefinition def = new DefaultTransactionDefinition();
    def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
    TransactionStatus status = txManager.getTransaction(def);
    try {
        callable.call();
        txManager.commit(status);
    } catch (Exception e) {
        log.error("An error occurred: {}", e.getMessage());
        txManager.rollback(status);
        throw new RuntimeException(e);
    }
}

}

2、主方法中使用,并增加每一个子任务的异常处理

@Service
public class TestTxService {

@Autowired
private TransactionService transactionService;

// List of services
private List<Callable<Void>> services;

public TestTxService(@Autowired Service1 service1, 
                     @Autowired Service2 service2, 
                     @Autowired Service3 service3) {
    this.services = Arrays.asList(
            () -> {
                try {
                    service1.doSomething();
                    return null;
                } catch (SpecificException1 e) {
                    // handle exception specific to service 1
                    log.error("Error in Service1: {}", e.getMessage());
                    // Additional handling code...
                    throw e;
                }
            },
            () -> {
                try {
                    service2.doSomething();
                    return null;
                } catch (SpecificException2 e) {
                    // handle exception specific to service 2
                    log.error("Error in Service2: {}", e.getMessage());
                    // Additional handling code...
                    throw e;
                }
            },
            () -> {
                try {
                    service3.doSomething();
                    return null;
                } catch (SpecificException3 e) {
                    // handle exception specific to service 3
                    log.error("Error in Service3: {}", e.getMessage());
                    // Additional handling code...
                    throw e;
                }
            }
    );
}

public void testTx() {
        int totalThreadCount = services.size(); // Use size of services list
        ExecutorService executorService = Executors.newFixedThreadPool(totalThreadCount);
        CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);

        for (Callable<Void> service : services) {
            completionService.submit(() -> {
                transactionService.processInTransaction(service);
                return null;
            });
        }

        executorService.shutdown();

        for (int i = 0; i < totalThreadCount; i++) {
            try {
                completionService.take().get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException("An exception was thrown in a thread", e.getCause());
            }
        }
    }

}