大多数应用当中,可通过消息服务中间件来提升系统的异步通信和扩展解耦能力。
简介
消息服务中两个重要的概念
消息代理和目的地:当消息发送者发送消息之后,将由消息代理接管,消息代理保证消息发送到指定的目的地。
消息发送的两种方式:
- 队列(Queue):点对点消息通信(point-to-point)
- 主题(Topic):发布(publish)/订阅(subscribe)式消息通信
开始说了,消息可以提升系统的异步通信和扩展解耦能力。异步通信,我们之前讲异步任务的时候已经说过了。给用户发送邮件就是最好,最直接的例子。
至于,扩展解耦能力,最好最直接的例子就是流量削峰,举个例子:整点秒杀。库存只有100件,用户有10000个人,整点用户讲发送10000个请求,难道每个都请求数据库吗?这个时候,我们就可以做个限制,用户发送的请求先到消息队列,然后,再由消息队列统一管理,哪些请求时可以到数据库的,哪些请求时不可以到数据库的,这样就解决了数据库的抗压能力。
实现
点对点式
- 消息发送者发送消息之后,消息代理将消息放在一个队列当中,消息接收者从队列中获取消息内容,消息读取后移除队列
- 消息只有唯一的发送者和接收者,但并不是说只能有一个接收者
发布订阅式
- 发送者(发布者)发送消息到主题(topic),多个接收者(订阅者)监听(订阅)这个主题,那么,就会在消息到达的同时收到消息
JMS和AMQP
- JMS:Java message service :Java消息服务基于JVM消息代理规范,ActiveMQ,HornetMQ就是JMS的实现
- AMQP:advanced message Queue Protocol:高级消息队列协议,也是消息代理的规范,兼容JMS,RabbitMQ就是AMQP的实现。
对比
类型 | JMS | AMQP |
---|---|---|
定义 | Java api | 网络线级协议 |
跨语言 | 否 | 是 |
跨平台 | 否 | 是 |
model | 提供两种消息模式:peer-2-peer,pub/sub | 提供五种消息模式:direct exchange,fanout exchange,topic change,headers exchange,system exchange。本质来讲,后四种和JMS的pub/sub模型没有太大差别,仅是在路由机制上做了更详细的划分 |
支持消息类型 | 多种消息类型:TextMessage,MapMessage,BytesMessage,StreamMessage,ObjectMessage,Message (只有消息头和属性) | byte[]类型,当实际应用中有复杂消息时,可以序列化之后再发送 |
综合评价 | JMS定义了java api层面的标准,在Java体系中,多个client均可通过JMS进行交互,不需要修改代码,但是其对跨平台支持较差 | AMQP天然具有跨平台,跨语言特性 |
RabbitMQ
简介
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
核心概念
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认),fanout,topic,和headers,不同类型的Exchange转发消息的策略有所区别。
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系。
Connection
网络连接,比如一个TCP连接。
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
Broker
表示消息队列服务器实体
流程
AMQP的消息路由过程跟JMS存在一些差异,增加了Exchange和Binding的角色
JMS流程:
- 生产者(publisher)生成某个消息(Message),发送到某个队列(Queue)上
- 消费者(Consumer)监听这个队列(Queue),消费消息
RabbitMQ流程:
- 生产者(publisher)生成某个消息(Message),把这个消息发送给我们的消息代理服务器上(Broker)
- 服务器收到消息之后,把这个消息给到一个合适的交换器(Exchange),(服务器有非常多的交换器)
- 交换器(Exchange)收到这个消息之后,根据路由键(Binding绑定关系)把这个消息给一个或者多个消息队列(Queue)(服务器有很多个消息队列)
- 消费者(Consumer)连接上队列之后取出消息
重点就是:交换器和队列的绑定
重点就是:交换器和队列的绑定
重点就是:交换器和队列的绑定
我们上面说了Exchange有4种,不同类型转发的消息策略不同,那么,这个策略是什么呢?其中,header和direct交换器完全一致,但是header性能上差很多,基本上不用了
重点start
- direct交换器:当我们发送消息时的路由键和绑定中的key完全一致的时候,交换器就将消息发送到该队列当中。它时完全匹配单播模式
- fanout交换器:当我们消息发送到fanout交换器时,不管交换器与队列绑定的路由键时什么,fanout交换器都会把这个消息发送给每一个队列,跟UDP广播类似,fanout交换器发送消息最快。
- topic交换器:该交换器允许我们对路由键做模糊匹配,有选择性的发送给某一个或者多个队列。两个通配符:井号(#)和星号(*)。其中:井号:匹配0个或者多个单词。星号:匹配一个单词。
重点end
安装
首先,打开我们的虚拟机,用SecureCRT连接我们的虚拟机,我用的SecureCRT,至于你用的啥连接虚拟机,随便你。
然后,用docker安装带manager版本的rabbit,带manager的版本自带图形化界面,容易操作。从docker hub上面搜索,我安装的是
1 | docker pull rabbitmq:3.8.1-management |
接着,新建容器,记得带端口号,-d后台运行,映射两个端口号,起自己的名字,加上镜像id
1 | docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 镜像id |
接下来,就可以访问了,通过虚拟机的ip地址加上端口号,账号密码都是guest
1 | 虚拟机ip地址:15672 |
我们在图上,就能看到我们前面说过的:Connection,Channel,Exchange,Queue等等。
最后面那个admin,我们能够设置用户名和密码,就是我们前面登录的guest,并且,能够设置访问的Virtual Hosts。
我们看一下最上面的流程图和消息发送流程
,我就举一个例子:
- 首先,我们先创建一个交换器名字叫:haichenyi
- 其次,我们再创建一个队列,名字也叫:haichenyi
- 接着,我们将这交换器和队列绑定到一起
然后,我们随便发送一条消息
最后,查看消息队列
PS:
- 我们在创建Exchange和Queue的时候,有一个选项:Durability,意思是是否可持久化,也就是,服务器重启之后这个东西是否还存在。就选默认的durable就行了,可持久化的
- 我们在Exchange和Queue绑定的时候,发送消息的时候,都要填一个Routing key,就是上文我们说的绑定规则。
这就是整个流程,这都是页面操作,下面说一下代码里面怎么写,很简单。
用法
首先,添加依赖:
1 | <dependency> |
然后,就是配置:两个可能出错的位置,我已经注释标明了
1 | spring.rabbitmq.host=192.168.113.22 |
代码怎么写呢?
1 | @SpringBootTest |
可以向上面这样测试,发送和接收。实际应用中,我们要向下面这样写:
- 启动类上添加@EnableRabbit注释,开启Rabbit监听功能
- 在我们接收的方法添加@RabbitListener注解,queues是一个数组,方法的参数是发送的数据类型。
1 | @Service |
以上,就是RabbitMQ的简单使用了,上面的Exchange,Queue都是在管理界面创建绑定的,代码里面怎么创建绑定呢?
1 | @Autowired |
然后,发消息的操作就跟前面写的一样了