2023年7月19日 作者 zeroheart

一个事务中,如何保证mysql和redis的操作一致性

如果mysql写入成功之后,redis宕机了,如何处理?

可以使用分布式事务的tcc玩法:

以下是给三个服务方法添加一些简单业务逻辑的例子。我假设了一些数据和对象,并模拟了一些可能的业务操作。

订单服务:

@Service
public class OrderAction {

    @Autowired
    private OrderMapper orderMapper;

    // Try阶段,创建订单
    public boolean prepareCreateOrder(String txId, String userId, int productId, int amount) {
        // 创建订单的操作,如果成功返回true,否则返回false
        Order order = new Order(txId, userId, productId, amount);
        int result = orderMapper.insert(order);
        return result > 0;
    }

    // Confirm阶段,确认创建订单
    public boolean commitCreateOrder(String txId) {
        // 确认订单的操作,此处可进行日志记录,对于创建订单操作,确认阶段可能无具体操作
        return true;
    }

    // Cancel阶段,取消创建订单
    public boolean cancelCreateOrder(String txId) {
        // 取消订单的操作,即删除对应的订单
        int result = orderMapper.deleteByTxId(txId);
        return result > 0;
    }
}

库存服务:

@Service
public class StorageAction {

    @Autowired
    private StorageMapper storageMapper;

    // Try阶段,减少库存
    public boolean prepareDecreaseStorage(String txId, int productId, int count) {
        // 减少库存的操作
        int result = storageMapper.decreaseStorage(productId, count);
        return result > 0;
    }

    // Confirm阶段,确认减少库存
    public boolean commitDecreaseStorage(String txId) {
        // 确认减少库存的操作,此处可进行日志记录,对于减少库存操作,确认阶段可能无具体操作
        return true;
    }

    // Cancel阶段,取消减少库存
    public boolean cancelDecreaseStorage(String txId, int productId, int count) {
        // 取消减少库存的操作,即增加对应数量的库存
        int result = storageMapper.increaseStorage(productId, count);
        return result > 0;
    }
}

Redis操作服务:

@Service
public class RedisAction {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    // Try阶段,写入Redis
    public boolean prepareWriteRedis(String txId, String key, String value) {
        // 写入Redis的操作
        redisTemplate.opsForValue().set(key, value);
        return true;
    }

    // Confirm阶段,确认写入Redis
    public boolean commitWriteRedis(String txId) {
        // 确认写入Redis的操作,此处可进行日志记录,对于Redis写入操作,确认阶段可能无具体操作
        return true;
    }

    // Cancel阶段,取消写入Redis
    public boolean cancelWriteRedis(String txId, String key) {
        // 取消写入Redis的操作,即删除对应的键值对
        redisTemplate.delete(key);
        return true;
    }
}

// Spring Boot中的Service类
@Service
public class BusinessService {

    @Autowired
    private OrderAction orderAction;

    @Autowired
    private StorageAction storageAction;

    @Autowired
    private RedisAction redisAction;

    // 开始全局事务
    @GlobalTransactional
    public void doBusiness() {
        String txId = UUID.randomUUID().toString();
        String userId = "1";
        int productId = 1;
        int count = 1;
        String key = "key";
        String value = "value";

        // Try阶段
        boolean orderResult = orderAction.prepareCreateOrder(txId, userId, productId, count);
        boolean storageResult = storageAction.prepareDecreaseStorage(txId, productId, count);
        boolean redisResult = redisAction.prepareWriteRedis(txId, key, value);

        if (orderResult && storageResult && redisResult) {
            // Confirm阶段
            orderAction.commitCreateOrder(txId);
            storageAction.commitDecreaseStorage(txId);
            redisAction.commitWriteRedis(txId);
        } else {
            // Cancel阶段
            orderAction.cancelCreateOrder(txId);
            storageAction.cancelDecreaseStorage(txId);
            redisAction.cancelWriteRedis(txId);
        }
    }
}

这只是一个基本的示例,实际情况可能会更复杂。在实际的业务场景中,你可能需要处理更多的异常情况,以及可能的性能和并发问题。对于这些问题,Seata和TCC模式提供了一种解决方案,但可能需要进行一些额外的配置和优化。

也可以使用最终一致性的方案:

消息队列最终一致性: 通过消息队列进行异步操作可以确保操作的最终一致性。一般流程为:

  1. 业务系统将业务操作和相应的补偿操作作为一个消息发送到消息队列。
  2. 消息消费者接收到消息并执行业务操作。
  3. 如果业务操作成功,消息被确认并从队列中删除。
  4. 如果业务操作失败,消息重新放回队列中,等待再次被消费。
以下为一个简单的RabbitMQ示例:

// 生产者
public void sendMessage(TransactionMessage message) {
    rabbitTemplate.convertAndSend("exchange", "routingKey", message);
}

// 消费者
@RabbitListener(queues = "queue")
public void handleMessage(TransactionMessage message) {
    try {
        // 写入Redis
        redisTemplate.opsForValue().set("key", message.getData());
    } catch (Exception e) {
        // 如果操作失败,重新发布消息到队列,等待重试
        rabbitTemplate.convertAndSend("exchange", "routingKey", message);
    }
}