RabbitMQ
demo地址:https://github.com/zackzhangCN/RabbitMQDemo.git
-
特点 :
RabbitMQ是一个开源的在AMQP基础上完整点,可复用的企业消息系统.
支持主流操作系统: Windows, Linux, MacOS
多种开发语言支持: Java, Python, Ruby, .Net, PHP, C++等
RabbitMQ使用Erlang语言开发, 对大规模并发提供了友好的支持.
-
RabbitMQ的执行流程:
- 消息生产者连接到RabbitMQ服务器, 建立一个TCP连接, 开启一个信道Channel
- 消息生产者声明一个交换器Exchange, 并设置属性
- 消息生产者声明一个队列Queue, 并设置相关属性
- 消息生产者通过路由键RoutingKey将交换器和队列绑定起来
- 消息生产者将消息Message(包含路由键和交换器等信息)发送到RabbitMQ消息队列服务器
- RabbitMQ服务器Broker收到消息之后, 相应的交换器根据路由键去查找队列
- 如果找到了队列, 交换器将消息路由到相应的队列, 队列存储消息等待消费
- 如果找不到队列, 根据生产者的配置, 将消息丢弃或者回退给生产者
- 消息生产者关闭信道和TCP连接
- 消息消费者连接到RabbitMQ服务器, 建立一个TCP连接, 开启一个信道
- 消息消费者向RabbitMQ服务器请求消费相应队列中的消息
- 消息消费者等待RabbitMQ服务器的回应和相应队列的消息投递, 接收消息
- 消息消费者确认(ack)接收到消息
- RabbitMQ服务器从相应队列中删除已被确认的消息
- 消息消费者关闭信道和TCP连接
RabbitMQ的消息路由
RabbitMQ在AMQP中增加了交换器Exchange和绑定Binding角色, 消息生产者需要把消息发布到Exchange上, 消息最终到达队列并被消息消费者接收到, 而Binding决定了交换器上的消息应该被发送到哪个队列中
RabbitMQ的交换器
在RabbitMQ中,有四种交换器:
-
Direct Exchange
直连型交换器,根据消息携带的路由键找到此路由键所绑定的交换器和队列, 通过交换器将消息投递给队列。
执行流程:
有一个队列绑定到一个直连交换器上,同时赋予一个路由键 RoutingKey, 当一个消息携带着路由键AA通过生产者发送给RabbitMQ服务器时,路由键AA所绑定的交换器就会根据这个路由键AA去寻找绑定的路由键也是AA的队列。
-
Topic Exchange
主题交换器,它的路由键和绑定键之间有一定的规则:
*
(星号)用来表示一个单词 (必须出现的)
#
(井号) 用来表示任意数量(零个或多个)单词\n通配的绑定键是跟队列进行绑定的:
队列Queue1 绑定键为 *.test.*
队列Queue2 绑定键为 test.#
如果一条消息携带的路由键为 A.test.B,那么队列Queue1将会收到;
如果一条消息携带的路由键为 test.A.B,那么队列Queue2将会收到;
当一个队列的绑定键为 #
,那么它会无视路由键, 接收所有的消息
-
Fanout Exchange
扇型交换器,没有路由键概念,即使绑了路由键也会无视。 这个交换器在接收到消息后,会直接转发到绑定到它上面的所有队列。
-
Headers
Headers交换器匹配AMQP消息的Header, 而不是路由键.
Headers交换器和DirectExchange交换器完全一致, 但是性能低下, 几乎不再使用.
安装RabbitMQ(Windows直接安装)
-
安装Erlang环境
RabbitMQ由Erlang语言开发,运行rabbitmq需要先安装对应版本的Erlang环境
-
RabbitMQ安装成功后,在系统服务中可以看到rabbitmq服务
-
安装rabbitmq管理插件
进入rabbitmq安装目录,使用rabbitmq-plugins enable rabbitmq_management命令安装此插件
-
中文主机名可能遇到的问题
rabbitmq服务默认在系统服务目录下,若系统用户名为中文,会导致rabbitmq服务启动失败。
解决方案一:控制面板修改主机名,改为英文
解决方案二:修改rabbitmq服务目录(https://blog.csdn.net/u012211603/article/details/88537382)
解决方案三:容器化,使用docker for windows创建一个rabbitmq容器
本机用户名也是中文名,遇到了这个问题,最终卸载了Erlang环境和rabbitmq,采用docker容器
-
通过15672端口,进入rabbitmq管理页面
默认账号密码为guest
安装RabbitMQ(通过Docker容器)
-
安装Docker For Windows
Docker for windows默认只支持win10专业版和企业版
其他版本无法直接使用docker,可以通过docker toolbox来使用
win10家庭版则可以通过伪装专业版的方式来安装docker for windows
https://www.cnblogs.com/samwu/p/10360943.html
-
Windows下Docker的使用
Docker for windows安装完成之后并运行
在cmd命令行或者windows power shell可以查看docker版本信息
其他操作基本与linux下docker操作一致
Windows PowerShell
版权所有 (C) Microsoft Corporation。保留所有权利。
尝试新的跨平台 PowerShell https://aka.ms/pscore6
PS C:\\WINDOWS\\system32> docker --version
Docker version 19.03.5, build 633a0ea
PS C:\\WINDOWS\\system32>
-
拉取带有rabbitmq_management插件的rabbitmq镜像
PS C:\\WINDOWS\\system32>docker pull rabbitmq:3.7-management
......
PS C:\\WINDOWS\\system32> docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
rabbitmq 3.7-management 4eaadc0cb14d 3 days ago 179MB
nginx latest 2073e0bcb60e 3 weeks ago 127MB
-
创建rabbitmq容器
容器的5672端口为rabbitmq消息通信端口,映射的宿主机的5672端口
容器的15672端口为rabbitmq的管理端口,也映射到宿主机的15672端口
容器的其他端口暂时无需映射
PS C:\\WINDOWS\\system32>docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq rabbitmq:3.7-management
......
PS C:\\WINDOWS\\system32> docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e23698ac46b4 rabbitmq:3.7-management "docker-entrypoint.s…" 15 hours ago Up About an hour 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp myrabbitmq
-
访问宿主机的15672端口,进入rabbitmq管理页面
默认账号密码为guest
Springboot整合RabbitMQ
创建项目
-
创建一个Springboot项目
-
编辑项目的pom文件,引入相关maven依赖
<!--springboot父依赖-->
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.0.2.RELEASE</version>
</parent>
<dependencies>
<!--springMvc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--Rabbit MQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
(一)直连型交换机Direct Exchange
消息生产者
-
创建直连型消息生产者模块RabbitMQ_Provider_DirectExchange
-
编辑直连型消息生产者的配置文件application.yml
server:
port: 8080
spring:
# 模块名
application:
name: RabbitMQ_Provider
#配置rabbitmq
rabbitmq:
# rabbitmq服务器地址
host: 127.0.0.1
# rabbitmq消息通信端口
port: 5672
# 账号和密码
username: guest
password: guest
-
编辑直连型消息生产者的启动类Provider_DirectExchange_Application
package cn.zack;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Provider_DirectExchange_Application {
public static void main(String[] args) {
SpringApplication.run(Provider_DirectExchange_Application.class, args);
}
}
-
编辑直连型交换器配置类ProviderMyDirectExchangeConfig
package cn.zack.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ProviderMyDirectExchangeConfig {
/**
* 创建队列,名为MyDirectExchangeQueue
* @return
*/
@Bean
public Queue MyDirectExchangeQueue(){
return new Queue("MyDirectExchangeQueue",true);
}
/**
* 创建直连型交换机,名为MyDirectExchange
* @return
*/
@Bean
DirectExchange MyDirectExchange(){
return new DirectExchange("MyDirectExchange");
}
/**
* 绑定,将直连队列和直连交换器绑定,并设置绑定键值为MyDirectExchangeRouting
* @return
*/
@Bean
Binding MyDirectExchangeBinding(){
return BindingBuilder.bind(MyDirectExchangeQueue()).to(MyDirectExchange()).with("MyDirectExchangeRouting");
}
}
-
编辑直连型消息生产者消息推送类ProviderMyDirectExchangeController
package cn.zack.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.HashMap;
@RestController
@RequestMapping(path = "directExchange")
public class ProviderMyDirectExchangeController {
@Autowired
// 注入rabbitTemplate
private RabbitTemplate rabbitTemplate;
@GetMapping(path = "send/{messageData}")
public String sendDirectExchangeMsg(@PathVariable("messageData") String messageData){
HashMap<String, String> hashMap = new HashMap<>();
hashMap.put("createTime",new Date().toString());
hashMap.put("messageData",messageData);
// 将消息携带绑定键值MyDirectExchangeRouting,发送到MyDirectExchange交换机
rabbitTemplate.convertAndSend("MyDirectExchange","MyDirectExchangeRouting",hashMap);
return "OK";
}
}
-
推送消息
启动直连型消息生产者模块\n访问http://localhost:8081/directExchange/send/hello
推送一个hello
-
查看rabbitmq管理页
可以看到,此时消息队列已经成功收到一条消息,此消息还处于ready状态,未被消费
消息消费者
-
创建直连型消息消费者模块
-
编辑直连型消息消费者的配置文件application.yml
server:
port: 8091
spring:
# 应用名
application:
name: RabbitMQ_Consumer_DirectExchange
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
-
编辑直连型消息消费者的启动类Consumer_DirectExchange_Application
package cn.zack;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Consumer_DirectExchange_Application {
public static void main(String[] args) {
SpringApplication.run(Consumer_DirectExchange_Application.class, args);
}
}
-
和生产者一样, 编辑直连型消息消费者的配置类ConsumerMyDirectExchangeConfig
package cn.zack.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerMyDirectExchangeConfig {
/**
* 创建队列,名为MyDirectExchangeQueue
*
* @return
*/
@Bean
public Queue MyDirectExchangeQueue() {
return new Queue("MyDirectExchangeQueue", true);
}
/**
* 创建直连型交换机,名为MyDirectExchange
*
* @return
*/
@Bean
DirectExchange MyDirectExchange() {
return new DirectExchange("MyDirectExchange");
}
/**
* 绑定,将直连队列和直连交换器绑定,并设置绑定键值为MyDirectExchangeRouting\n *
* @return
*/
@Bean
Binding MyDirectExchangeBinding() {
return BindingBuilder.bind(MyDirectExchangeQueue()).to(MyDirectExchange()).with("MyDirectExchangeRouting");
}
}
-
编辑消息消费者的消息监听类ConsumerMyDirectExchangeReceiver
package cn.zack.controller;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 指定监听的队列名称
*/
@RabbitListener(queues = "MyDirectExchangeQueue")
@Component
public class ConsumerMyDirectExchangeReceiver {
@RabbitHandler
public void process(Map hashMap){
System.out.println("ConsumerDirectExchange接收到消息: " + hashMap.get("messageData"));
System.out.println("消息发送时间是: " + hashMap.get("createTime"));
}
}
-
监听消息
启动直连型消息消费者模块
可以看到控制台输出了监听到的MyDirectExchangeQueue队列的消息
此时rabbitmq控制台可以看到MyDirectExchangeQueue队列未被消费(ready状态)的消息已经为0
再次访问直连型消息生产者的消息推送地址
http://localhost:8081/directExchange/send/helloworld
推送一条helloword
可以看到,消息消费者监听到了helloworld这条消息
(二)主题型交换机Topic Exchange
// todo
(三)消息的持久化
// todo
(四)消息的确认和应答
-
消息确认模式
默认情况下, 生产者把消息发送出去之后, RabbitMQ服务器(broker)不会返回任何信息给消息生产者.
也就是说消息生产者不知道消息有没有正确到达broker.
如果在消息到达broker之前发生了宕机, 或者broker接收到消息但在写入磁盘时发生了宕机, 这样消息就会丢失.
RabbitMQ提供了两种解决方式:
- 通过AMQP的事务机制:
在AMQP中把信道设置成事务模式之后, 生产者和broker之间会有一种发送/响应机制判断当前命令操作是否可以继续.
由于事务模式需要生产者应用同步等待broker的执行结果, 性能会急剧下降, 解决方案偏重量级. - 把信道设置为确认模式
发送方确认模式是RabbitMQ对AMQP的扩展实现, 把信道设置成确认模式之后, 在该信道上发布的所有消息都会被分配一个唯一id, 一旦消息被投递到匹配的队列中, 该信道就会向生产者发送一个包含了消息唯一id的确认消息, 告知生产者此消息发送成功, 已经投递到目的队列.
发送方确认模式的最大优势是异步, 生产者发送完一条消息之后可以继续发送下一条消息, 当生产者收到确认消息后调用回调方法处理.
以直连模式为例, 开启消息发送方确认模式:
- 修改配置文件
server:
port: 8081
spring:
# 应用名
application:
name: RabbitMQ_Provider_DirectExchange
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 开启消息确认(消息生产者发送消息到rabbitmq服务器后, 会被告知是否发送成功)
publisher-confirms: true
publisher-returns: true
- 配置类实现rabbitmq确认模式接口
package cn.zack.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class ProviderMyDirectExchangeConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
// 指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(this);
// 指定 ReturnCallback
rabbitTemplate.setReturnCallback(this);
}
/**
* 创建队列,名为MyDirectExchangeQueue
*
* @return
*/
@Bean
public Queue MyDirectExchangeQueue() {
return new Queue("MyDirectExchangeQueue", true);
}
/**
* 创建直连型交换机,名为MyDirectExchange
*
* @return
*/
@Bean
DirectExchange MyDirectExchange() {
return new DirectExchange("MyDirectExchange");
}
/**
* 绑定,将直连队列和直连交换器绑定,并设置绑定键值为MyDirectExchangeRouting
*
* @return
*/
@Bean
Binding MyDirectExchangeBinding() {
return BindingBuilder.bind(MyDirectExchangeQueue()).to(MyDirectExchange()).with("MyDirectExchangeRouting");
}
/**
* 实现Rabbitmq的ConfirmCallback接口, 开启消息发送成功确认
*
* @param correlationData 消息标识
* @param b 是否成功
* @param s 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("唯一消息标识: " + correlationData);
System.out.println("确认结果: " + b);
System.out.println("失败原因: " + s);
}
/**
* 实现Rabbitmq的ReturnCallback接口, 开启消息发送失败返回
*
* @param message 消息主体
* @param i 消息主体
* @param s 描述
* @param s1 交换器
* @param s2 路由键
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("消息主体: " + message);
System.out.println("消息主体: " + i);
System.out.println("描述: " + s);
System.out.println("消息使用的交换器: " + s1);
System.out.println("消息使用的路由键: " + s2);
}
}
- 重启直连交换器的消息生产者
调用发送消息的方法, 发送一次消息, 控制台可以看到消息确认成功的信息
-
消息应答模式
// todo