2022年8月24日 作者 zeroheart

rabbitMq延迟队列

这里只是延迟队列的使用,更详细的使用可以参考这篇博客:springboot集成rabbitmq(实战) – 名山丶深处 – 博客园 (cnblogs.com)

RabbitMQ如何实现延迟队列,其实并不是它提供了延迟队列,而是是用过期时间+死信队列做的

基本上就是三个文件

配置、生产者、消费者

配置:


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * rabbitMq 配置
 */
@Configuration
public class RabbitConfig {
    /**
     * 交换机用于重新分配队列
     *
     * @return
     */
    @Bean
    DirectExchange oneExchange() {
        return new DirectExchange("exchange");
    }

    /**
     * 用于延时消费的队列
     *
     * @return
     */
    @Bean
    public Queue repeatTradeQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        // return new Queue("TestDirectQueue",true,true,false);

        Queue queue = new Queue("repeatTradeQueue", true, false, false);
        return queue;
    }

    /**
     * 绑定交换机并指定routing key
     *
     * @return
     */
    @Bean
    public Binding repeatTradeBinding() {
        return BindingBuilder.bind(repeatTradeQueue()).to(oneExchange()).with("repeatTradeQueue");
    }

    /**
     * 配置死信队列-指定队列过期时间,单位毫秒
     *
     * @return
     */
    @Bean
    public Queue deadLetterQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 3000);
        args.put("x-dead-letter-exchange", "exchange");
        args.put("x-dead-letter-routing-key", "repeatTradeQueue");
        return new Queue("deadLetterQueue", true, false, false, args);
    }


    /**
     * 配置死信队列-不指定队列过期时间,就可以实现自己定义过期时间,单位毫秒
     *
     * @return
     */
    @Bean
    public Queue deadLetterMsgQueue() {
        Map<String, Object> args = new HashMap<>();
//        args.put("x-message-ttl", 3000);
        args.put("x-dead-letter-exchange", "exchange");
        args.put("x-dead-letter-routing-key", "repeatTradeQueue");
        return new Queue("deadLetterMsgQueue", true, false, false, args);
    }

}

生产者:

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

/**
 *  生产者
 */
@Component
public class RabbitProducer {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
     * 死信队列 消息发送
     * @param msg
     */
    public void deadLetterSend(String msg) {
        System.out.println("DeadLetterSender 发送时间:" + LocalDateTime.now().toString() + " msg内容:" + msg);
        rabbitTemplate.convertAndSend("deadLetterQueue", msg);
    }

    /**
     * 死信队列 消息发送
     * @param msg
     */
    public void deadLetterTimeSend(String msg, long times) {
        System.out.println("DeadLetterSender 发送时间:" + LocalDateTime.now().toString() + " msg内容:" + msg);
        MessagePostProcessor processor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration(times + "");
                return message;
            }
        };
        rabbitTemplate.convertAndSend("deadLetterMsgQueue", (Object) msg, processor);
    }
}

消费者:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

/**
 * 消费者
 */
@Component
@RabbitListener(queues = "repeatTradeQueue")
public class RabbitConsumer {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("repeatTradeQueue 接收时间:" + LocalDateTime.now().toString() + " 接收内容:" + msg);
    }
}