MQ(Message Queue)是一种消息中间件,它可以实现应用程序之间的异步通信。MQ的主要作用是解耦生产者和消费者,提高系统的并发性能。MQ分为同步通讯和异步通讯两种方式。
1. 同步通讯
同步通讯是指在发送消息和接收消息的过程中,生产者和消费者必须保持连接状态。这种方式适用于顺序和一致性要求较高的场景,如电话通话、视频聊天和实时游戏等。然而,同步调用也存在一些问题,如耦合度高、性能下降、资源浪费和级联失败等。
2. 异步通讯
异步通讯是指在发送消息和接收消息的过程中,生产者和消费者不需要保持连接状态。这种方式适用于效率和并发性要求较高的场景,如电子邮件、消息队列和事件驱动模式等。事件驱动模式具有服务解耦、性能提升、吞吐量提高、服务没有强依赖以及不担心级联失败问题和流量削峰等优势。然而,异步通信的缺点是需要Broker来处理消息的存储和转发。
常见的MQ框架有RabbitMQ、ActiveMQ和Kafka等。其中,RabbitMQ是一款基于Erlang语言开发的开源消息队列系统,具有简单易用、功能强大等特点。要快速入门RabbitMQ,可以访问官方网址:https://www.rabbitmq.com/ 。
以下是关于RabbitMQ的快速入门指南:
1. 单机部署
在Centos7虚拟机中使用Docker来安装RabbitMQ:
- 在线拉取:docker pull rabbitmq:3-management
- 本地导入:docker load -i mq.tar
- 执行下面的命令来运行MQ容器:docker run \n -e RABBITMQ_DEFAULT_USER=root \n -e RABBITMQ_DEFAULT_PASS=123456 \n --name mq \n --hostname mq1 \n -p 15672:15672 \n -p 5672:5672 \n -d \n rabbitmq:3-management
2. 常见消息模型
RabbitMQ支持多种消息模型,包括基本队列、交换器、路由键和虚拟主机等。其中,基本队列是最常见的消息模型,用于存储生产者发送的消息。交换器用于将生产者发送的消息路由到相应的队列,路由键用于标识消息所属的队列。虚拟主机用于隔离不同的应用或用户。
3. SpringAMQP
Spring AMQP是一个基于Spring框架的Java实现的AMQP客户端库,它与语言和平台无关。通过使用Spring AMQP,可以方便地在Java应用中集成RabbitMQ作为消息中间件。
. 在consumer服务中编写消费逻辑,监听simple.queue这个队列。
2. Work Queue工作队列模型:提高消息处理速度,避免队列消息堆积prefetch。案例:模拟WorkQueue,实现一个队列绑定多个消费者。消息预取机制没有考虑到不同消费者的消费能力的问题,所以我们可以做一个消费预取限制:preFetch。重启消费者consumer类,并运行生产者测试类。
3. 发布、订阅模型-Fanout:
1. 发布订阅模式:允许将同一消息发送给多个消费者exchange。
1. 交换机的作用:接收publisher发送的消息,将消息按照规则路由到与之绑定的队列。不能缓存消息,路由失败,消息丢失。
2. 常见exchange类型包括:FanoutDirectTopic。exchange负责消息路由。
2. Fanout Exchange:绑定的queue。案例:利用SpringAMQP演示FanoutExchange的使用。
1. 在consumer服务中,利用代码声明队列、交换机,并将两者绑定。
2. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2。
3. 在publisher中编写测试方法,向itcast.fanout发送消息。
4. 发布、订阅模型-Direct:规则路BindingKeyRoutingKeyBindingKeyRoutingKey。案例:利用SpringAMQP演示DirectExchange的使用。
1. 利用@RabbitListener声明Exchange.Queue、RoutingKey。
2. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2。启动消费者类,可以在MQ服务器上看到新增的队列和路由。
3. 在publisher中编写测试方法,向itcast. direct发送消息。查看控制台信息:队列1确实也是绑定的blue。
描述下Direct交换机与Fanout交换机的差异?
1. Fanout交换机将消息路由给每一个与之绑定的队列。
2. Direct交换机根据RoutingKey判断路由给哪个队列。
3. 如果多个队列具有相同的RoutingKey,则与Fanout功能类似。
5. 发布、订阅模型-Topic:必须是多个单词的列表,并且以.分割。
## Queue与Exchange指定BindingKey时可以使用通配符
### 案例:利用SpringAMQP演示TopicExchange的使用
1. 利用@RabbitListener声明Exchange、Queue、RoutingKey。
2. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2。
3. 在publisher中编写测试方法,向itcast. topic发送消息。
4. 消息转换器。
### 案例:测试发送Object类型消息
说明:在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
消息转换器:
```java
org.springframework.amqp.support.converter.MessageConverterSimpleMessageConverterObjectOutputStreamMessageConverter
```
推荐用JSON方式序列化,步骤如下:
consumer模块接收消息的形式:
```java
public void onMessage(Message message){
String content = new String((byte[])message.getBody());
System.out.println("接收到的消息内容为:"+content);
}
```