# 基于 RabbitMQ 的最终一致性分布式事务

# 1. 整体思路

注册新用户后,可以慢慢等待促销中心为新用户发电子优惠券,并非强制要求同时性。

  1. 可靠生产 : 保证消息一定要发送到 RabitMQ 服务。
  2. 可靠消费 : 保证消息取出来一定正确消费掉。

最终使多方数据达到一致。

# 2. 简单方案

rabbitmq-transaction-02

# 生产方的『可靠性』

这里的『可靠性』指的是一旦 A 服务(事务的发起方)本地操作执行成功后,要务必确保消息一定要发送至 RabbitMQ 。

如果发送失败,那么:

  1. 撤销 A 服务的本地操作;

  2. 如果 A 服务的本地操作是无法撤销的,那么消息需要重发;如果重复仍然失败,那么则需要人工干预。

为了确保消息发送方的可靠性需要执行的以下操作:

# 新增本地消息表

在 A 服务的本地数据库中添加一张通用的消息表,形如:

drop table if exists `message`;
create table `message`
(
    `id`              BIGINT AUTO_INCREMENT,
    `exchange`        VARCHAR(128)  NOT NULL,
    `routing_key`     VARCHAR(128)  NOT NULL,
    `message_content` VARCHAR(4096) NOT NULL,
    `status`          VARCHAR(128)  NOT NULL,
    `retry_count`     INT           NOT NULL DEFAULT 3,
    `version`         BIGINT        NOT NULL default 0,
    PRIMARY KEY (`id`)
) ENGINE = InnoDB;

其中 retry_count 是配合定时任务实现消息重发;version 是用来实现乐观锁的。极简情况下,这两个字段可以没有,那么整个功能就更简单一些。

在 A 服务做完本身的业务操作后,要向消息表中添加一条记录,表示有一条待发送消息。注意,A 服务的本身的操作和向消息表中添加记录这两个操作要在同一个事务中。

@Transactional
public void register(String username, String password) {
    userRepository.insert(username, password);
    String messageContent = String.format("\"userId\" : %s", username);
    messageRepository.insert("user.register", "user.register-user-success", messageContent);
    messageService.send("user.register", "user.register-user-success", messageContent, String.valueOf(messageId));
}

# 确认消息发送成功或失败

生产者为了明确知道发送到了 RabbitMQ 或发送失败,因此需要开启『发送者确认』功能。

  • 配置文件

    # 确认消息已发送到交换机(Exchange)
    spring.rabbitmq.publisher-confirm-type=CORRELATED
    
  • 设置回调方法

    @Configuration
    public class RabbitMQConfig {
    
      ...
    
      @Bean
      public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
          RabbitTemplate rabbitTemplate = new RabbitTemplate();
          rabbitTemplate.setConnectionFactory(connectionFactory);
    
          // 设置开启 Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
          rabbitTemplate.setMandatory(true);
    
          // 关键就是以下两句
          rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
              // 这里的 correlationData 来源于 convertAndSend 方法。
              log.debug("消息唯一标识 : {}", correlationData);
              if (ack) {
                  log.debug("消息已发送至 RabbitMQ(的Exchange),修改 id 为 {} 的状态。", correlationData.getId());
                  messageRepository.changeStatus2Success(Long.parseLong(correlationData.getId()));
              } else {
                  log.debug("消息未能发送到 Exchange。失败原因 {}", cause);
                  // ...
              }
          });
    
          return rabbitTemplate;
      }
    }
    

消费者发送消息至 RabbitMQ 成功,那么执行数据库操作,将 message 表中的该消息的状态改为成功(或等价的其它)状态。

如果发送消息失败,那么有两种后续处理方式:

  1. 和成功的请款一样,直接修改数据库中的消息的状态,改为失败。(当然这种只发送一次的一锤子买卖并不人性)。

  2. 结合表中的 retry_count 字段和 Spring Task 功能,实现一个定时任务。不停地从消息表中取出待发送消息,直至消息发送成功,或重试次数耗完。

@Slf4j
@Service
public class ReliableMessageService {

    @Autowired private RabbitTemplate rabbitTemplate;
    @Autowired private MessageRepository messageRepository;

    // 发送消息 
    public void send(String routingKey, String messageContent, String messageId) {
        rabbitTemplate.convertAndSend(routingKey, messageContent, new CorrelationData(messageId));
    }

    // 发送消息 
    public void send(String exchange, String routingKey, String messageContent, String messageId) {
        rabbitTemplate.convertAndSend(exchange, routingKey, messageContent, new CorrelationData(messageId));
    }


    // 定时任务:每隔 3 分钟从数据库中读取未发送的消息发送到 RabbitMQ
    @Scheduled(fixedDelay = 3 * 1000)
    private void autoSend() {
        List<Message> messageList = messageRepository.selectNeededToBeSentMessage();
        log.debug("执行定时任务。查询出有 {} 条待发送消息。", messageList.size());
        for (Message message : messageList) {
            log.debug("待发送消息 : id-{}", message.getId());
            // if (messageRepository.touchMessage(message.getId(), message.getVersion()) > 0) {
                rabbitTemplate.convertAndSend(message.getExchange(), message.getRoutingKey(), message.getMessageContent(), new CorrelationData(message.getId().toString()));
                messageRepository.reduceRetryCount(message.getId());    // 重试
            // }
        }
    }
}

如果考虑到 A 服务可能会部署多个,那么这里可以再加上乐观锁以保证避免消息发送多次。

# RabbitMQ 删除过期消息

由于存在消费者因故未能消费消息的可能,这会导致这些消息堆积在 RabbitMQ 中。因此需要在创建队列,或发送消息时指定过期时间,以便于让 RabbitMQ 将这些在规定时间内未能消费的消息移除。

@Bean
public Queue queue() {
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-message-ttl", 10 * 60 * 1000); // 10 分钟的过期时间
    return new Queue("user.register", true, false, false, arguments);
}

# 消费方的『可靠性』

B 服务在收到消息后,去执行本地操作可能失败。此时,由于 B 服务确实是已经收到了该消息,(默认情况下)该消息已经被 RabbitMQ 移除了,无法重发。

# 消费者开启手动 ACK 模式

开启手动 ACK 模式的目的是先去执行 B 服务的本地操作,在操作执行成功后再『回复』RabbitMQ 已收到消息。这种情况下,如果 B 服务本地操作失败,那么就没有去『确认』收到该消息,RabbitMQ 自然就会重发该消息。

  • 配置文件

    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    
  • 业务代码

    @Transactional
    @RabbitHandler
    public void process(String message,
                        Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException {
    
        log.info("收到用户已注册的消息。去操作数据据,未其派发优惠券。 {}", message);
        log.info("操作数据库/派发优惠券成功。 向 RabbitMQ 回复 ACK。");
    
        Thread.sleep(3 * 1000);
    
        channel.basicAck(tag, false);
    }
    

# 3. 改进方案:可靠消息服务

可靠消息服务,就是将发送消息(和消息表)的功能由一个专门的微服务来处理,这个微服务是真正意义上的消息的生产者,它向 RabbitMQ 投递消息。

rabbitmq-transaction-03

其它模块并不直接和 RabbitMQ 产生联系,他们是逻辑上的消息的生产者。