# Spring Boot 整合 RabbitMQ

Spring Boot 提供了 spring-boot-starter-amqp 组件对实现了 AMQP 协议的消息队列(RabbitMQ)的快速整合。

# 1. hello world

提示

我们分发送和接收 2 部分来学习 Spring Boot 和 RabbitMQ 的整合。

  1. 在 pom.xml 中引入 spring-boot-starter-amqp

    rabbitmq-01

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    注意

    虽然你在界面上选择的是 RabbitMQ ,但是本质上引入的是 AMQP ,因为 RabbitMQ 是 AMQP 的一种实现,也是默认实现。

  2. 启用自动配置

    老规矩,使用 @EnableRabbit 注解标注于配置类上,以表示使用 RabbitMQ 的注解功能。

  3. 配置文件

    配置 RabbitMQ 的连接地址、端口以及账户信息:

    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=root
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=hemiao
    ## -----------------------------------------------
    logging.level.root=INFO
    logging.level.xxx.yyy.zzz=DEBUG
    logging.pattern.console=${CONSOLE_LOG_PATTERN:\
    %clr(${LOG_LEVEL_PATTERN:%5p}) \
    %clr([%15.15t]){faint} \
    %clr(%-40.40logger{39}){cyan} \
    %clr(:){faint} %m%n\
    ${LOG_EXCEPTION_CONVERSION_WORD:%wEx}}
    
  4. 编写消息接收者/消费者的代码:HelloReceiver.java

    @Slf4j
    @Component
    public class HelloReceiver {
    
        @RabbitListener(queues = "Q1")
        public void process(String hello) {
            log.info("Receiver : {}", hello);
        }
    
    }
    
  5. 验证

    在 RabbitMQ 的管理台页面上,直接向 Exchange 发送消息,确保 Exchange 会把消息转到 Q1 队列,随后,你会发现你写的代码自动触发执行了。

# 2. 创建 Exchange、Queue 和 Binding

提示

类似于 Hibernate/JPA 和 spring-data-elasticsearch 的自动建表建库功能,spring-boot-starter-amqp 可以帮我们去创建 Exchange、Queue 以及它俩之间的 Binding 关系。但是,这个功能有利有弊,有人喜欢,有人不喜欢。

# 创建 Exchange

@Bean
public Exchange exchange() {
//  return new TopicExchange("test-exchange-1"); 
    return new TopicExchange("test-exchange-1", true, false);
}

参数说明:

参数 说明
name 字符串值,exchange 的名称。
durable 布尔值,表示该 exchage 是否持久化。
它决定了当 RabbitMQ 重启后,你是否还能 “看到” 重启前创建的 exchange 。
autoDelete 布尔值,表示当该 exchange 没“人”(queue)用时,是否会被自动删除。
即,实现逻辑上的临时交换机。项目启动时连接到 RabbitMQ ,创建交换机;项目停止时断开连接,RabbitMQ 自动删除交换机。

不指定 durable 和 autoDelete 时,默认分别是 truefalse 。表示持久化、不用自动删除。

补充,这背后调用的是原生 API 中的 Channel.exchangeDeclare() 方法。

# 创建 Queue

@Bean
public Queue queue() {
//  return new Queue("test-queue-1"); 
    return new Queue("test-queue-1", true, false, false);
}

参数说明:

参数 说明
name 字符串值,queue 的名称。
durable 布尔值,表示该 queue 是否持久化。
它决定了当 RabbitMQ 重启后,你是否还能 “看到” 重启前创建的 queue 。
另外,需要注意的是,queue 的持久化不等于其中的消息也会被持久化。
exclusive 布尔值,表示该 queue 是否排它式使用。排它式使用意味着仅声明他的连接可见/可用,其它连接不可见/不可用。
autoDelete 布尔值,表示当该 queue 没“人”(connection)用时,是否会被自动删除。
即,实现逻辑上的临时队列。项目启动时连接到 RabbitMQ ,创建队列;项目停止时断开连接,RabbitMQ 自动删除队列。

不指定 durable、exclusive 和 autoDelete 时,默认为 truefalsefalse 。表示持久化、非排它、不用自动删除。

补充,这背后调用的是原生 API 中的 Channel.queueDeclare() 方法。

# 创建 Binding

@Bean
public Binding binding(Exchange exchange, Queue queue) {
    return BindingBuilder
        .bind(queue).to(exchange).with("*.orange.*")
        .noargs();
}

# 3. 发送消息

spring-rabbit 提供了 RabbitTemplate 来简化原生 API 的消息发送方法。

(最简单的情况下),你可以直接要求 Spring 给你注入一个 RabbitTemplate,通过它来发送消息:

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void demo() {
    rabbitTemplate.convertAndSend("queue-demo-1", "hello world");
}

.convertAndSend 方法的第一个参数是 routing-key,第二个参数是你所要发送的消息。

在没有明确指定 Exchange 的情况下,该消息发送给了 RabbitMQ 的 default-exchange。而 default-exchage 是将 routing-key 视为 queue-name 。

也就是说,上述代码中的 routing-keyqueue-demo-1,那么该消息最终是发送给 queue-demo-1 队列。

提示

.convertAndSend 方法是 .send 方法的包装/简化。.send 方法的调用相对比较繁琐。

# 4. 接收/消费消息(PUSH 型)

接收/消费消息的方式有两种:Push 型和 Pull 型。

Push 型表示由 RabbitMQ Broker 负责将消息推送给消费者。消费者在一开始指定/配置监听哪个队列的消息后,就无需考虑其它。当该队列收到消息后,消费者的指定方法就会被触发执行。

PUSH 消费的配置非常简单,对你的消费者类的 “被触发方法” 标注 @RabbitListener 注解。当然,前提是消费者类要托管给 Spring:

@Component
public class Consumer1 {

    private static final Logger log = LoggerFactory.getLogger(Consumer1.class);

    @RabbitListener(queues = "queue-demo-1")
    public void process(String message) {
        log.info("Consumer 1: {}", message);
    }

}

# 5. 对象的支持

Spring Boot 已经完美支持对象的发送和接收,不需要额外的配置。

警告

所传递的对象必须要实现 Serializable 接口。

!声明队列

@Bean
public Queue departmentQueue() {
    return new Queue("hello");
}
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void demo() {
    rabbitTemplate.convertAndSend("hello", LocalDate.now());
}


@Slf4j
@Component
public class MessageReceiver {

    @RabbitListener(queues = "hello")
    public void process(LocalDate date) {
        log.info("Receiver : {}", date);
    }

}

# 6. Topic Exchange

Topic 是 RabbitMQ 中最灵活的一种方式,可以根据 routing_key 自由地绑定不同的队列。

考虑到环境中残留的之前的相关信息对测试的影响,如果发现测试代码的执行结果『莫名其妙』,记得在 RabbitMQ 的 web 管理系统中将相关内容清除干净,构造一个纯净的测试环境测试。

rabbitmq

首先对 Topic 规则配置:

/* 两个 Queue */
@Bean("Q1")
public Queue queue1() { return new Queue("Q1"); }

@Bean("Q2")
public Queue queue2() { return new Queue("Q2"); }

/* 一个 Exchange */
@Bean
public TopicExchange topicExchange() { return new TopicExchange("topic-exchange"); }

/* 三个 Binding:关联 Exchange 和 Queue */
@Bean
public Binding binding1(@Qualifier("Q1") Queue queue, TopicExchange topicExchange) {
    return BindingBuilder
        .bind(queue).to(topicExchange).with("*.orange.*")
        .noargs();
}

@Bean
public Binding binding21(@Qualifier("Q2") Queue queue, TopicExchange topicExchange) {
    return BindingBuilder
        .bind(queue).to(topicExchange).with("*.*.rabbit")
        .noargs();
}

@Bean
public Binding binding22(@Qualifier("Q2") Queue queue, TopicExchange topicExchange) {
    return BindingBuilder
        .bind(queue).to(topicExchange).with("lazy.#")
        .noargs();
}

即便不编写消费者,你也可以在 15672 管理台页面上,直接看到各个 Queue 中有多少条消息。

创建两个消费者:

@Slf4j
@Component
public class C1 {

    @RabbitListener(queues = "Q1")
    public void process(String message) {
        log.info("C1: {}", message);
    }

}

@Slf4j
@Component
public class C2 {

    @RabbitListener(queues = "Q2")
    public void process(String message) {
        log.info("C2: {}", message);
    }

}

测试:(这里偷了个懒,没有去创建发送者类,直接在 Junit 中使用了 AmqpTemplate 发送消息)

@Autowired
private AmqpTemplate rabbitTemplate;


@Test
public void demo1() throws InterruptedException {
    rabbitTemplate.convertAndSend("testTopic", "hello.orange", "hello orange");
    rabbitTemplate.convertAndSend("testTopic", "hello.orange.world", "hello orange world");
    rabbitTemplate.convertAndSend("testTopic", "hello.world.rabbit", "hello world rabbit");
    rabbitTemplate.convertAndSend("testTopic", "lazy", "lazy");
    rabbitTemplate.convertAndSend("testTopic", "lazy.good", "good");
    rabbitTemplate.convertAndSend("testTopic", "lazy.good.bye", "goodbye");
    Thread.sleep(1000L);
}

# 7. Fanout Exchange

@Bean("green") 
public Queue greenQueue() { return new Queue("green"); }

@Bean("red")
public Queue redQueue() { return new Queue("red"); }

@Bean("orange")
public Queue orangeQueue() { return new Queue("orange"); }

@Bean
public FanoutExchange exchange() { return new FanoutExchange("testFanout"); }

@Bean
public Binding binging1(FanoutExchange exchange, @Qualifier("green") Queue queue) { 
    return BindingBuilder
        .bind(queue).to(exchange).with("")
        .noargs();
}

@Bean
public Binding binging2(FanoutExchange exchange, @Qualifier("red") Queue queue) { 
    return BindingBuilder
        .bind(red).to(exchange).with("")
        .noargs();
}

@Bean
public Binding binging3(FanoutExchange exchange, @Qualifier("orange") Queue queue) { 
    return BindingBuilder
        .bind(orange).to(exchange).with("") 
        .noargs();
}
@Test
public void demo2() throws InterruptedException {
    rabbitTemplate.convertAndSend("blue", "", "green");
    rabbitTemplate.convertAndSend("blue", "", "red");
    rabbitTemplate.convertAndSend("blue", "", "orange");
    Thread.sleep(1000L);
}

Customer-A、Customer-B、Customer-C 都会收到这三条消息,即,控制台会打印出 9 条日志。

# 8. 接收/消费消息(PULL 型)

PULL 型消费意味着需要消费者主动从 RabbitMQ Broker 上『取』消息。

PULL 型消费『不依靠』@RabbitListener 注解。而是需要在代码中手动调用 .receiveAndConvert 方法。

.receiveAndConvert 方法是 .receive 方法的简化版。

@Test
public void demo5() {
    rabbitTemplate.convertAndSend("queue-demo-1", "hello world");
}

@Test
public void demo4() {
    log.info("{}", rabbitTemplate.receiveAndConvert("queue-demo-1"));
}

# 9. 发送者确认

注意

发送者如何知道自己所发送的消费成功抵达了 RabbitMQ Broker 中的 Exchange 中,乃至成功抵达了 RabbitMQ Broker 中的 Queue 中?

生产者确认

# 确认消息已到 Exchange

RabbitMQ 有一个配置属性 spring.rabbitmq.publisher-confirm-type 控制是否开启确认功能。该属性默认值是 NONE ,表示不开启消息确认。

  • publisher-confirm-type = SIMPLE

    当改属性的值为 SIMPLE 时,表示支持以简单(同步阻塞等待)方式获得确认与否的信息。

    这里会调用 Template#waitForConfirms 方法,不过这个方法有个要求,它必须在 Template#invoke 方法中使用。

    String str = rabbitTemplate.invoke((operations) -> {
        // 参数 operations 实际上就是 Template 。
        operations.convertAndSend("red", "hello world");
        log.info("{}", operations.waitForConfirms(1000));   // 阻塞等待 1 秒,以获得确认信息。
        return "over"; // lambda 表达式的值将成为 invoke 方法的返回值。
    });
    log.info("{}", str);
    

    你可以向不存在的 Exchange 发送消息已验证效果。

  • publisher-confirm-type = CORRELATED

    当改属性的值为 CORRELATED 时,表示支持以异步回调方式获得确认与否的信息。

    在之前的代码中,是 spring-rabbit 帮我们创建 ConnectionFactory,再进一步创建 RabbitTemplate,并注入到我们的代码中进而被我们使用。

    现在由于需要对 RabbitTemplate 进行设置,因此,我们需要自己创建并设置 RabbitTemplate。(不过,还是需要 spring-rabbit 帮我们创建 Connection Factory,并注入)

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
    
        // 当 Exchange 收到消息后,这里设置的回调方法会被触发执行
        rabbitTemplate.setConfirmCallback( ... );
    
        return rabbitTemplate;
    }
    

    你可以使用 lamda 表达式来简化下列匿名实现类。

    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        /**
         * 该方法无论 Exchange 能否收到消息都会执行。
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, java.lang.String cause) {
            if (ack)
                log.info("消息已发送至 Exchange");
            else 
                log.info("消息未能发送到 Exchange。{}", cause);
        }
    });
    
    
    

# 确认消息已到 Message Queue

## 确认消息已发送到队列(Queue)
spring.rabbitmq.publisher-returns=true
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);

    // 设置开启 Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
    rabbitTemplate.setMandatory(true);

    ...

    // 当消息『走不到』RabbitMQ 的 Queue 时会被触发执行。
    rabbitTemplate.setReturnCallback( ... );

    return rabbitTemplate;
}

你可以使用 lamda 表达式来简化下列匿名实现类。

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    /**
     * 该方法在 Queue 无法收到消息时被触发执行。Queue 能收到消息,反而不会执行。
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("ReturnCallback 消息:{}", message);
        log.info("ReturnCallback 回应码:{}", replyCode);
        log.info("ReturnCallback 回应信息:{}", replyText);
        log.info("ReturnCallback 交换机:{}", exchange);
        log.info("ReturnCallback 路由键:{}", routingKey);
    }
});

你可以向不存在的 Exchange 和 Queue 发送消息已验证效果。

# 10. 消费端的确认与拒绝

默认情况下,RabbitMQ 启用的是消费端自动(auto)回复。即,当消费端收到消息,就会给 RabbitMQ Broker 作出回复,表示已收到。

只有在消费端回复 RabbitMQ Broker 之后,RabbitMQ Broker 才会将该消息从消息队列中移除。

回复的行为除了有 AUTO 之外,还有 NONE 和 MANUAL

NONE 表示不回复,即,RabbitMQ Broker 永远不可能知道消费者端到底有没有收到消息。RabbitMQ Broker 发出

MANUAL 则意味着需要在消费者端手动发送回复信息。在消费者回复前,该消息在消费端未回复前在 RabbitMQ Brocker 上一直处于 Unacked 状态。如果消费者始终都不回复该消息,那么直到消费者与 RabbitMQ 断开连接之后,这条消息才会重新变为 Ready 状态。

启用消费端的确认功能需要打开配置开关:

spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual

于此同时,消息消费者的处理方法需要改造成以下形式:

@Component
public class Consumer2 {

    @RabbitListener(queues = "queue-demo-1")
    public void process(String message, 
            Channel channel, 
            @Header(AmqpHeaders.DELIVERY_TAG) long tag) {

        ...

    }
}

# 确认消息

确认消息使用 channel.basicAck 方法:

channel.basicAck(tag, false);

basicAck 方法需要传递两个参数:

  • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel(Channel 是比 Connection 更小的单位),RabbitMQ 通过 Channel 向消费者投递消息时,都会为该消息分配一个唯一性标识:delivery tag 。同一个 Channel 中的消息的 delivery tag 都是唯一且单调递增的。

  • multiple:是否批量确认。当参数为 false 时,意味着确认单条消息,RabbitMQ 仅从消息队列中删除该消息;当参数为 true 时,意味着批量确认,RabbitMQ 会从消息队列中删除编号小于等于该消息的所有信息。

# 拒绝消息

拒绝消息使用 channel.basicReject 方法:

channel.basicReject(tag, false);

basicReject 方法也需要传力两个参数:

  • deliveryTag(唯一标识 ID):同上。

  • requeue(重入标识):标识该消息是否需要 RabbitMQ Broker 重新入队。(有可能的话,会被该队列的其它消费者消费)。

另外,拒绝的方法还有 .basicNack,表示批量拒绝。