几个mq的对比
1、什么是MQ?
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。
类似于数据库一样需要独立部署在服务器上的一种应用,提供接口给其他系统调用。
主要用于各个系统之间通信的解耦。
举例:
比如登陆系统,在登陆之后需要调用短信系统给用户发短信说已经登陆,同时还需要调用日志系统记录登陆日志,需要调用积分系统对登陆签到的积分进行增加 等等。
这种情况下,登陆系统和日志系统,短信系统,积分系统等等 强耦合,其中存在可能调用失败,信息丢失等风险,同时会提高系统复杂度。
比如说登陆之后调用日志系统失败,那么该次登陆的日志信息就会丢失,无法再找回。
而且顺序执行,会导致登陆系统运行效率低。
那么如果使用消息中间件,登陆之后只需要将任务推入到消息队列中,就不用去管了。其他系统则从队列中去获取任务。
实现解耦和异步调用 (异步是相对于同步而言,同步是就等待,当系统执行某个任务的时候,一定要等到该任务结束,系统才会继续往下执行,异步则不等待。)
同时还有可以实现横向拓展 安全可靠优点
JMS(Java Message Service),即Java消息服务,是一组Java应用程序接口(Java API),它提供创建、发送、接收、读取消息的服务。由Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
2、为什么要使用MQ(MQ的作用)
在实际工作中什么样的业务场景,这个业务场景有个什么技术挑战,如果不用MQ可能会比较麻烦,包括现在用了MQ以后有哪些好处等等。
MQ的核心作用:解耦、异步、削锋。
2.1 系统解耦

场景一,最初A系统通过接口调用发送数据给BCD三个系统,如果D系统突然说:现在不需要数据了,你不用给我传数据了,这时候A系统只能修改代码,将调用D系统的代码删除;E系统又说需要这个数据,那么A系统负责人无奈,只能再去改代码… 再来点更加崩溃的事儿,A系统要时时刻刻考虑BCDE四个系统如果挂了咋办,要不要重发,要不要把消息存起来……?
上述场景中,BCDE都需要用到A系统提供的数据,A系统跟其他四个系统严重耦合,需要时时刻刻考虑其他四个系统要是挂了怎么办,需不需要重新发送数据给他们,这个时候的A系统内心是崩溃的。
但是如果使用了MQ之后 ,A系统的数据只需要放到MQ里面,其他的系统想请求获取数据只需要去MQ里面消费即可,如果突然不想请求了,就取消对MQ的消费就行了,A系统根本不需要考虑给谁去响应这个数据,也不需要去维护代码,也不用考虑其他系统是否调用成功,失败超时等情况。

总结:通过MQ发布订阅消息的模型,A系统就成功的跟其他系统解耦了。
面试技巧:你需要思考一下,在你自己的系统里面有没有类似的情况,一个系统或者模块,调用了多个系统或者模块,它们互相之间的调用非常复杂,并且维护起来很麻烦,但其实这个调用是不需要直接同步调用接口的,如果用MQ给它异步化解耦也是可以的,你就需要思考在你的项目里,是不是可以用MQ给它进行系统的解耦,可以自己组织一下语言回答。
2.2 异步调用

(详情看文章中–异步发送)
场景二,还是ABCD四个系统,A系统收到一个请求,需要在自己本地写库,还需要往BCD三个系统写库,A系统自己写本地库需要3ms,往其他系统写库相对较慢,B系统200ms ,C系统350ms,D系统400ms,这样算起来,整个功能从请求到响应的时间为3ms+200ms+350ms+400ms=953ms,接近一秒,对于用户来说,点个按钮要等这么长时间,基本是无法接受的,侧面也反映出这家研发人员技术不咋地。
一般的互联网企业,对于用户请求响应的时间要求在100ms-200ms之间,这样,用户的眼睛存在视觉暂停现象,用户响应时间在此范围内就可以了,所以上面的现象是不可取的。
如果用了MQ,用户发送请求到A系统耗时3ms,A系统发送三条消息到MQ,假如耗时5ms,用户从发送请求到相应3ms+5ms=8ms,仅用了8ms,用户的体验非常好。

2.3 流量削峰
(减少高峰时期对服务器压力)

场景三,2020年爆发的这场新冠病毒,导致各大线上商城APP里面的口罩被抢购一空,在这种情况下,JD商城开启了一场每晚八点的抢购3Q口罩的活动,每天下午三点进行预约,晚上八点抢购,从JD商城刚上线这个活动,小明连续抢了近一个周,也算是见证了一个百万并发量系统从出现问题到完善的一个过程,最初第一天,抢购的时候,一百多万预约,到八点抢购估计也能有百万的并发量,可是第一天,到八点抢的时候,由于并发量太高,直接把JD服务器弄崩了,直接报了异常,可能JD在上线这个活动的时候也没能够想到会有那么高的并发,打了一个猝不及防,但是这只是在前一两天出现报异常的情况,后面却没有再出现异常信息,到后来再抢购只是响应的时间变得很慢,但是JD系统并没有崩溃,这种情况下一般就是用了MQ(或者之前用了MQ,这次换了个吞吐量级别更高的MQ),也正是利用了MQ的三大好处之一——削峰。
JD系统每天0—19点,系统风平浪静,结果一到八点抢购的时候,每秒并发达到百万,
假设JD数据库没秒能处理1.5w条并发请求(并非实际数据,主要为了举例),到八点抢购的时候,每秒并发百万,这直接导致系统异常,但是八点一过,可能也就几万用户在线操作,每秒的请求可能也就几百条,对整个系统毫无压力。
如果使用了MQ,每秒百万个请求写入MQ,因为JD系统每秒能处理1W+的请求,JD系统处理完然后再去MQ里面再拉取1W+的请求处理,每次不要超过自己能处理的最大请求量就ok,这样下来,等到八点高峰期的时候,系统也不会挂掉,但是近一个小时内,系统处理请求的速度是肯定赶不上用户的并发请求的,所以都会积压在MQ中,甚至可能积压千万条,但是高峰期过后,每秒只会有一千多的并发请求进入MQ,但是JD系统还是会以每秒1W+的速度处理请求,所以高峰期一过,JD系统会很快消化掉积压在MQ的请求,在用户那边可能也就是等的时间长一点,但是绝对不会让系统挂掉。

解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。
削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。
3、消息队列的优缺点
优点
解耦、异步、削峰( 系统解耦,异步调用,流量削峰)
综上所述,各种对比之后,个人意见:
一般的业务系统要引入MQ,最早大家都用ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;
后来大家开始用RabbitMQ,但是确实erlang语言阻止了大量的java工程师去深入研究和掌控他,对公司而言,几乎处于不可控的状态,但是确实人是开源的,比较稳定的支持,活跃度也高;
不过现在确实越来越多的公司,会去用RocketMQ,确实很不错,RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的企业使用
所以中小型公司,技术实力较为一般,技术挑战不是特别高,用RabbitMQ是不错的选择,其实RocketMQ也可以(只是有些人担心这个技术万一被抛弃,社区黄掉的风险,虽然目前 RocketMQ 已捐给 Apache,但 主要考虑的是GitHub 上的活跃度其实不算高,但个人觉得黄掉的可能性比较小);大型公司,基础架构研发实力较强,用RocketMQ是很好的选择
如果是大数据领域的实时计算、日志采集等场景,用Kafka是业内标准的,绝对没问题,社区活跃度很高,几乎是该领域的事实性规范
另外角度比较:

下图列出了全球范围内这些MQ在2018.12~2019.12一年时间内,在Google Trends的搜索频率,某种程度可以反映出这些中间件的火爆程度。

从这张图上,我们可以看出来,Kafka是一枝独秀,RabbmitMQ紧接其后,ActiveMQ和Apache Pulsar也有一定的占比。而RocketMQ的搜索量可以说是微乎其微。其实除了RocketMQ的其他几个MQ产品,可以根据这张图初步对比下流行程度。但是对于RocketMQ必须排除在外,因为一些原因,很多国内的用户无法通过Google进行搜索,因此关于RocketMQ的统计实际上是不准确的。
对比RocketMQ与其他MQ有哪些功能特性上的差异。功能特性,主要取决于产品定位,如Kafka定位于高吞吐的流失日志和实时计算场景;ActiveMQ、RabbitMQ等则定位于企业级消息中间件,因此提供了很多企业开发时非常有用的功能,如延迟消息、事务消息、消息重试、消息过滤等,而这些特性Kafka都不具备,但是这类产品的吞吐量要明显的低于Kafka。
RocketMQ则是结合了Kafka和ActiveMQ、RabbitMQ的特性。在性能上,可以与Kafka抗衡;而在企业级MQ的特性上,则具备了很多ActiveMQ、RabbitMQ提供的特性。因此,企业在选择消息中间件选型时,RocketMQ是非常值得考虑的一款产品。
Redis
是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
RabbitMQ
是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了一个经纪人(Broker)构架,这意味着消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持。
————————————————
版权声明:本文为CSDN博主「爱码士88」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/JemeryShen/article/details/126742783
Spring Boot整合RabbitMQ实践教程
(48条消息) Spring Boot整合RabbitMQ实践教程_C3Stones的博客-CSDN博客
RabbitMQ是使用Erlang语言来编写的,并且基于AMQP协议。Erlang语言在数据交互方面性能较优秀,具有和原生Socket一样的延迟,这也是RabbitMQ高性能的原因所在。
RabbitMQ特点:
a. 开源、性能交友,消息持久化不丢失,可靠性得到保障
b. 提供可靠性的消息投递模式、返回模式
c. 与spring-boot-starter-amqp完美整合,API功能丰富且资料较多
d. 支持集群,保障高可用
- AMQP协议 AMQP(Advanced Message Queuing Protocol),是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件不同产品、不同开发语言等条件的限制。
AMQP基本概念:
Server:服务,接收客户端请求。
Connection:连接,应用于Server之间的TCP连接。
Channel:信道,消息的读写在信道中进行。每个客户端可以建立多个通道,每个通道即一个会话任务。
Message:消息,应用于Server之间传递的数据实体。每个消息由Properties和Body组成,Properties存储消息的优先级、延迟等配置属性,Body及消息内容。
Virtual Host:虚拟主机,用于Server内部之间逻辑隔离。每个Server之间可以有多个虚拟主机,每个虚拟主机内有多个交换机和队列,每个虚拟主机内的交换机和队列名称不能相同。
Exchange:交换机,用于接收消息,并按照一定的路由规则将消息绑定到对应的队列。常见的交换机类型有:Direct、Topic、Fanout、Header等。
Queue:队列,用于保存消息并交给消费者消费消息。
Binding:绑定,交换机和队列之间的虚拟连接,每个绑定连接中包含对一个或多个路由键。
RoutingKey:路由键,生产者发生消息时会指定对应的路由键规则,交换机根据规则将消息绑定到具体的队列上,消息者通过队列消息消息。
概念中提到两个属性:Connection和Channel。既然存在Connection,又为什么需要Channel。原因是:在一个业务场景中必然存在多个线程环境操作RabbitMQ Server进行生产和消费消息,因此将会建立多个Connection,即多个TCP连接。对于操作系统而言,TCP连接的创建和销毁是十分浪费资源的,在高并发使用场景中,势必达到性能瓶颈。所以RabitMQ采用TCP连接复用方式,使用Channel建立应用与Server连接,提高效率,也便于管理。


- 消息确认 消费者在消费消息往往会因为网络或其他原因导致异常,因此需要和队列建立确认机制才能表明该条消息已经成功消费。因此在AMQP协议中给出两种建议:
(1)自动确认模式:即消息发送给消费者后立即从队列中删除;
(2)手动确认模式:即消费者者将消息者处理后给队列发送确认回执(ACK机制,Acknowledgement),再根据情况删除消息。 - 安装RabbitMQ Docker部署RabbitMQ
Docker部署RabbitMQ – C3Stones – 博客园 (cnblogs.com)
- Direct Exchange示例
创建生产者
修改pom.xml
4.0.0
com.c3stones
direct-exchange-provider-demo
0.0.1-SNAPSHOT
direct-exchange-provider-demo
Direct Exchange Provider Demo
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
添加配置文件application.yml
server:
port: 8980
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
RabbitMQ配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
- RabbitMQ配置类
- @author CL
*
*/
@Configuration
public class RabbitMqConfig {
- 交换机名称
*/
public static final String EXCHANGE_NAME = “c3stones.direct”;
- 路由键
*/
public static final String ROUNTING_KEY = “test”;
- 队列名称
*/
public static final String QUEUE_NAME = “test.queue”;
- 配置Direct交换机
- @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(EXCHANGE_NAME);
}
- @return
- 配置队列
- @return
*/
@Bean
public Queue testQueue() {
return new Queue(QUEUE_NAME);
}
- @return
- 将队列与交换机通过路由键绑定
- @return
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(testQueue()).to(directExchange()).with(ROUNTING_KEY);
}
- @return
- @author CL
}
创建发送消息Controller
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.c3stones.config.RabbitMqConfig;
/**
- 发送消息Controller
- @author CL
*
*/
@RestController
public class SendMsgController {
private RabbitTemplate rabbitTemplate; /**- 发送消息
- @param msg 消息内容
- @return
*/
@RequestMapping(value = “/send”, method = RequestMethod.GET)
public boolean send(String msg) {
try {
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUNTING_KEY, msg);
} catch (AmqpException e) {
log.error(“发送消息异常:{}”, e);
return false;
}
return true;
}
}
创建启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
- @author CL
/**
- 启动类
- @author CL
*
*/
@SpringBootApplication
public class DirectProviderApplication {
SpringApplication.run(DirectProviderApplication.class, args);
}- @author CL
}
启动项目,并测试发送消息
查看RabbitMQ管理界面
可以看出RabbitMQ Server已经接收到消息,并经过交换机成功转发到队列中,此时消息为等待消费状态。
创建消费者
修改pom.xml
4.0.0
com.c3stones
direct-exchange-consumer-demo
0.0.1-SNAPSHOT
direct-exchange-consumer-demo
Direct Exchange Consumer Demo
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
添加配置文件application.yml
server:
port: 8981
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
创建处理消息Service
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
- 处理消息Service
- @author CL
*
*/
@Component
public class HandleMsgService {
- 方法1-处理消息
- @param msg 消息内容
*/
@RabbitListener(queues = “test.queue”)
public void handle1(String msg) {
log.info(“方法1已接收到消息:{}”, msg);
}
- @param msg 消息内容
- @author CL
}
创建启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
- 启动类
- @author CL
*
*/
@SpringBootApplication
public class DirectConsumerApplication {
SpringApplication.run(DirectConsumerApplication.class, args);
}- @author CL
}
启动项目,并观察控制台打印日志
2020-07-24 16:45:30.284 INFO 5400 — [ntContainer#0-1] com.c3stones.service.HandleMsgService : 方法1已接收到消息:测试
可以看到,在项目启动后消费者Service已经成功监听到消息,并消费。
修改处理消息Service,配置多个监听方法监听同一队列
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
- 处理消息Service
- @author CL
*
*/
@Component
public class HandleMsgService {
- 方法1-处理消息
- @param msg 消息内容
*/
@RabbitListener(queues = “test.queue”)
public void handle1(String msg) {
log.info(“方法1已接收到消息:{}”, msg);
}
- @param msg 消息内容
- 方法2-处理消息
- @param msg 消息内容
*/
@RabbitListener(queues = “test.queue”)
public void handle2(String msg) {
log.info(“方法2已接收到消息:{}”, msg);
}
- @param msg 消息内容
- 方法3-处理消息
- @param msg 消息内容
*/
@RabbitListener(queues = “test.queue”)
public void handle3(String msg) {
log.info(“方法3已接收到消息:{}”, msg);
}
- @param msg 消息内容
- @author CL
}
重启消费者,并启动生产者
通过Postman发送三条消息,并观察消费者控制台
2020-07-24 16:47:49.471 INFO 8532 — [ntContainer#0-1] com.c3stones.service.HandleMsgService : 方法2已接收到消息:测试1
2020-07-24 16:47:51.567 INFO 8532 — [ntContainer#1-1] com.c3stones.service.HandleMsgService : 方法1已接收到消息:测试2
2020-07-24 16:47:54.070 INFO 8532 — [ntContainer#2-1] com.c3stones.service.HandleMsgService : 方法3已接收到消息:测试3
可以看到多个方法监听队列,并不会重复消费消息,而是轮询消费。
- Fanout Exchange示例
创建生产者
修改pom.xml
4.0.0
com.c3stones
fanout-exchange-provider-demo
0.0.1-SNAPSHOT
fanout-exchange-privoder-demo
Fanout Exchange Provider Demo
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
添加配置文件application.yml
server:
port: 8982
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
创建RabbitMQ配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
- RabbitMQ配置类
- @author CL
*
*/
@Configuration
public class RabbitMqConfig {
- 交换机名称
*/
public static final String EXCHANGE_NAME = “c3stones.fanout”;
- 队列1名称
*/
public static final String QUEUE_NAME_1 = “test1.fanout.queue”;
- 队列2名称
*/
public static final String QUEUE_NAME_2 = “test2.fanout.queue”;
- 队列3名称
*/
public static final String QUEUE_NAME_3 = “test3.fanout.queue”;
- 配置Direct交换机
- @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
- @return
- 配置队列1
- @return
*/
@Bean
public Queue test1Queue() {
return new Queue(QUEUE_NAME_1);
}
- @return
- 配置队列2
- @return
*/
@Bean
public Queue test2Queue() {
return new Queue(QUEUE_NAME_2);
}
- @return
- 配置队列3
- @return
*/
@Bean
public Queue test3Queue() {
return new Queue(QUEUE_NAME_3);
}
- @return
- 将队列1与交换机绑定
- @return
*/
@Bean
public Binding bindingQueue1() {
return BindingBuilder.bind(test1Queue()).to(fanoutExchange());
}
- @return
- 将队列2与交换机绑定
- @return
*/
@Bean
public Binding bindingQueue2() {
return BindingBuilder.bind(test2Queue()).to(fanoutExchange());
}
- @return
- 将队列3与交换机绑定
- @return
*/
@Bean
public Binding bindingQueue3() {
return BindingBuilder.bind(test3Queue()).to(fanoutExchange());
}
- @return
- @author CL
}
创建发送消息Controller
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.c3stones.config.RabbitMqConfig;
/**
- 发送消息Controller
- @author CL
*
*/
@RestController
public class SendMsgController {
private RabbitTemplate rabbitTemplate; /**- 发送消息
- @param msg 消息内容
- @return
*/
@RequestMapping(value = “/send”, method = RequestMethod.GET)
public boolean send1(String msg) {
try {
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, null, msg);
} catch (AmqpException e) {
log.error(“发送消息异常:{}”, e);
return false;
}
return true;
}
- @author CL
}
创建启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
- 启动类
- @author CL
*
*/
@SpringBootApplication
public class FanoutProviderApplication {
SpringApplication.run(FanoutProviderApplication.class, args);
}- @author CL
}
启动项目,并测试发送消息
创建消费者
修改pom.xml
4.0.0
com.c3stones
fanout-exchange-consumer-demo
0.0.1-SNAPSHOT
fanout-exchange-consumer-demo
Fanout Exchange Consumer Demo
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
添加配置文件application.yml
server:
port: 8983
spring:
rabbitmq:
host: 1277.0.0.1
port: 5672
username: guest
password: guest
创建处理消息Service
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
- 处理消息Service
- @author CL
*
*/
@Component
public class HandleMsgService {
- 方法1-处理消息
- @param msg 消息内容
*/
@RabbitListener(queues = “test1.fanout.queue”)
public void handle1(String msg) {
log.info(“方法1已接收到消息:{}”, msg);
}
- @param msg 消息内容
- 方法2-处理消息
- @param msg 消息内容
*/
@RabbitListener(queues = “test2.fanout.queue”)
public void handle2(String msg) {
log.info(“方法2已接收到消息:{}”, msg);
}
- @param msg 消息内容
- 方法3-处理消息
- @param msg 消息内容
*/
@RabbitListener(queues = “test3.fanout.queue”)
public void handle3(String msg) {
log.info(“方法3已接收到消息:{}”, msg);
}
- @param msg 消息内容
- @author CL
}
创建启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
- 启动类
- @author CL
*
*/
@SpringBootApplication
public class FanoutConsumerApplication {
SpringApplication.run(FanoutConsumerApplication.class, args);
}- @author CL
}
启动项目,并观察控制台打印日志
2020-07-24 17:11:51.177 INFO 12620 — [ntContainer#0-1] com.c3stones.service.HandleMsgService : 方法3已接收到消息:测试
2020-07-24 17:11:51.180 INFO 12620 — [ntContainer#1-1] com.c3stones.service.HandleMsgService : 方法2已接收到消息:测试
2020-07-24 17:11:51.189 INFO 12620 — [ntContainer#2-1] com.c3stones.service.HandleMsgService : 方法1已接收到消息:测试
可以看到,与该类型交换机绑定的队列,均可监听到消息。
- Topic Exchange示例
创建生产者
修改pom.xml
4.0.0
com.c3stones
topic-exchange-provider-demo
0.0.1-SNAPSHOT
topic-exchange-provider-demo
Topic Exchange Provider Demo
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
添加配置文件application.yml
server:
port: 8984
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
创建RabbitMQ配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
- RabbitMQ配置类
- @author CL
*
*/
@Configuration
public class RabbitMqConfig {
- 交换机名称
*/
public static final String EXCHANGE_NAME = “c3stones.topic”;
- 绑定键1
*/
public static final String BINDING_KEY_1 = “topic.key1”;
- 绑定键2
*/
public static final String BINDING_KEY_2 = “topic.key2”;
- 绑定键前缀,即以topic.开头的键值都会被监听
*/
public static final String BINDING_KEY_PREFIX = “topic.#”;
- 队列1名称
*/
public static final String QUEUE_NAME_1 = “test1.queue”;
- 队列2名称
*/
public static final String QUEUE_NAME_2 = “test2.queue”;
- 配置Direct交换机
- @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE_NAME);
}
- @return
- 配置队列1
- @return
*/
@Bean
public Queue test1Queue() {
return new Queue(QUEUE_NAME_1);
}
- @return
- 配置队列2
- @return
*/
@Bean
public Queue test2Queue() {
return new Queue(QUEUE_NAME_2);
}
- @return
- 将队列1与交换机通过绑定键1绑定
- @return
*/
@Bean
public Binding bindingQueue1() {
return BindingBuilder.bind(test1Queue()).to(topicExchange()).with(BINDING_KEY_1);
}
- @return
- 将队列2与交换机通过绑定键前缀绑定
- @return
*/
@Bean
public Binding bindingQueue2() {
return BindingBuilder.bind(test2Queue()).to(topicExchange()).with(BINDING_KEY_PREFIX);
}
- @return
- @author CL
}
创建发送消息Controller
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.c3stones.config.RabbitMqConfig;
/**
- 发送消息Controller
- @author CL
*
*/
@RestController
public class SendMsgController {
private RabbitTemplate rabbitTemplate; /**- 发送消息1
- @param msg 消息内容
- @return
*/
@RequestMapping(value = “/send1”, method = RequestMethod.GET)
public boolean send1(String msg) {
try {
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.BINDING_KEY_1, msg);
} catch (AmqpException e) {
log.error(“发送消息1异常:{}”, e);
return false;
}
return true;
}
- 发送消息2
- @param msg 消息内容
- @return
*/
@RequestMapping(value = “/send2”, method = RequestMethod.GET)
public boolean send2(String msg) {
try {
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.BINDING_KEY_2, msg);
} catch (AmqpException e) {
log.error(“发送消息2异常:{}”, e);
return false;
}
return true;
}
}
- @author CL
创建启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
- 启动类
- @author CL
*
*/
@SpringBootApplication
public class TopicProviderApplication {
SpringApplication.run(TopicProviderApplication.class, args);
}- @author CL
}
启动项目,并测试发送两条消息
创建消费者
修改pom.xml
4.0.0
com.c3stones
topic-exchange-consumer-demo
0.0.1-SNAPSHOT
topic-exchange-consumer-demo
Topic Exchange Consumer Demo
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
添加配置文件application.yml
server:
port: 8985
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
创建处理消息Service
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
- 处理消息Service
- @author CL
*
*/
@Component
public class HandleMsgService {
- 方法1-处理队列1消息
- @param msg 消息内容
*/
@RabbitListener(queues = “test1.queue”)
public void handle1(String msg) {
log.info(“方法1已接收到消息:{}”, msg);
}
- @param msg 消息内容
- 方法2-处理队列2消息
- @param msg 消息内容
*/
@RabbitListener(queues = “test2.queue”)
public void handle2(String msg) {
log.info(“方法2已接收到消息:{}”, msg);
}
- @param msg 消息内容
- @author CL
}
创建启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
- 启动类
- @author CL
*
*/
@SpringBootApplication
public class TopicConsumerApplication {
SpringApplication.run(TopicConsumerApplication.class, args);
}- @author CL
}
启动项目,并观察控制台打印日志
2020-07-24 18:32:29.916 INFO 3776 — [ntContainer#1-1] com.c3stones.service.HandleMsgService : 方法1已接收到消息:测试1
2020-07-24 18:32:29.916 INFO 3776 — [ntContainer#0-1] com.c3stones.service.HandleMsgService : 方法2已接收到消息:测试1
2020-07-24 18:32:29.919 INFO 3776 — [ntContainer#0-1] com.c3stones.service.HandleMsgService : 方法2已接收到消息:测试2
可以看到,方法2监听绑定键以“topic.”开头的消息,即两条消息都会监听到,方法1只监听绑定键为“topic.key1”的消息,即只能监听到第一条消息。
- 消息确认示例 以Direct Exchange为例。
修改生产者配置文件application.yml
server:
port: 8980
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirm: true # 确认消息已发送到交换机
publisher-returns: true # 确认消息已发送到队列
修改RabbitMQ配置类,添加发送确认回调
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
- RabbitMQ配置类
- @author CL
*
*/
@Configuration
public class RabbitMqConfig {
private ConnectionFactory connectionFactory; /**- 交换机名称
*/
public static final String EXCHANGE_NAME = “c3stones.confirm.direct”;
- 路由键
*/
public static final String ROUNTING_KEY = “test.confirm.key”;
- 队列名称
*/
public static final String QUEUE_NAME = “test.confirm.queue”;
- 配置Direct交换机
- @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(EXCHANGE_NAME);
}
- @return
- 配置队列
- @return
*/
@Bean
public Queue testQueue() {
return new Queue(QUEUE_NAME);
}
- @return
- 将队列与交换机通过路由键绑定
- @return
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(testQueue()).to(directExchange()).with(ROUNTING_KEY);
}
- @return
- 配置消息发送模板
- @return
*/
@Bean
public RabbitTemplate createRabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("发送到交换机失败!原因:{}", cause); } }
}); // 强制调用回调方法
rabbitTemplate.setMandatory(true); // 确认消息已发送到队列
rabbitTemplate.setReturnCallback(new ReturnCallback() {@Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("绑定到队列异常,消息:{},回应码:{},回应文本:{},交换机:{},路由键:{}", message, replyCode, replyText, exchange, routingKey); }
}); return rabbitTemplate;
}- @return
- @author CL
}
修改发送消息Controller,添加发送消息ID
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.c3stones.config.RabbitMqConfig;
/**
- 发送消息Controller
- @author CL
*
*/
@RestController
public class SendMsgController {
private RabbitTemplate rabbitTemplate; /**- 发送消息
- @param msg 消息内容
- @return
*/
@RequestMapping(value = “/send”, method = RequestMethod.GET)
public boolean send(String msg) {
try {
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUNTING_KEY, msg,
new CorrelationData(UUID.randomUUID().toString()));
} catch (AmqpException e) {
log.error(“发送消息异常:{}”, e);
return false;
}
return true;
}
}
- @author CL
启动消息,并测试发送消息
请在发送消息Controller中分别指定正确的交换机名称正确队列、错误交换机正确队列、错误交换机错误队列、正确交换机错误队列四种情况,并仔细观察控制台。
修改消费者配置文件
server:
port: 8981
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
direct:
acknowledge-mode: manual # 手动确认
simple:
acknowledge-mode: manual # 手动确认
修改处理消息Service
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
/**
- 处理消息Service
- @author CL
*
*/
@Component
public class HandleMsgService {
- 方法1-处理消息
- @param msg 消息内容
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = “test.confirm.queue”, durable = “true”), key = “test.confirm.key”, exchange = @Exchange(“c3stones.confirm.direct”)))
public void handle1(Message message, Channel channel) {
try {
log.info(“方法1已接收到消息:{}”, message.getBody()); // 模拟处理异常
// int a = 1 / 0; // 正常消费,手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.info(“方法1处理消息异常:{}”, e); // 正常消费,将消息重新放入队列里
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException e1) {
log.info(“将消息重新放入队列里异常:{}”, e1);
}
}
}
- @param msg 消息内容
- @author CL
}
启动项目,并测试
测试正常处理消息,和处理消息时发送异常两种情况,并观察RabbitMQ Server中队列详情。