消息中间件整理(一)RabbitMQ
消息中间件整理(一)RabbitMQ

消息中间件整理(一)RabbitMQ

RabbitMQ

  • 特点 :

RabbitMQ是一个开源的在AMQP基础上完整点,可复用的企业消息系统.
支持主流操作系统: Windows, Linux, MacOS
多种开发语言支持: Java, Python, Ruby, .Net, PHP, C++等
RabbitMQ使用Erlang语言开发, 对大规模并发提供了友好的支持.

  • RabbitMQ的执行流程:
  • 消息生产者连接到RabbitMQ服务器, 建立一个TCP连接, 开启一个信道Channel
  • 消息生产者声明一个交换器Exchange, 并设置属性
  • 消息生产者声明一个队列Queue, 并设置相关属性
  • 消息生产者通过路由键RoutingKey将交换器和队列绑定起来
  • 消息生产者将消息Message(包含路由键和交换器等信息)发送到RabbitMQ消息队列服务器
  • RabbitMQ服务器Broker收到消息之后, 相应的交换器根据路由键去查找队列
  • 如果找到了队列, 交换器将消息路由到相应的队列, 队列存储消息等待消费
  • 如果找不到队列, 根据生产者的配置, 将消息丢弃或者回退给生产者
  • 消息生产者关闭信道和TCP连接
  • 消息消费者连接到RabbitMQ服务器, 建立一个TCP连接, 开启一个信道
  • 消息消费者向RabbitMQ服务器请求消费相应队列中的消息
  • 消息消费者等待RabbitMQ服务器的回应和相应队列的消息投递, 接收消息
  • 消息消费者确认(ack)接收到消息
  • RabbitMQ服务器从相应队列中删除已被确认的消息
  • 消息消费者关闭信道和TCP连接

RabbitMQ的消息路由

RabbitMQ在AMQP中增加了交换器Exchange和绑定Binding角色, 消息生产者需要把消息发布到Exchange上, 消息最终到达队列并被消息消费者接收到, 而Binding决定了交换器上的消息应该被发送到哪个队列中
image.png


RabbitMQ的交换器

在RabbitMQ中,有四种交换器:

  • Direct Exchange

直连型交换器,根据消息携带的路由键找到此路由键所绑定的交换器和队列, 通过交换器将消息投递给队列。
执行流程:
有一个队列绑定到一个直连交换器上,同时赋予一个路由键 RoutingKey, 当一个消息携带着路由键AA通过生产者发送给RabbitMQ服务器时,路由键AA所绑定的交换器就会根据这个路由键AA去寻找绑定的路由键也是AA的队列。

  • Topic Exchange

主题交换器,它的路由键和绑定键之间有一定的规则:
*(星号)用来表示一个单词 (必须出现的)
#(井号) 用来表示任意数量(零个或多个)单词\n通配的绑定键是跟队列进行绑定的:
队列Queue1 绑定键为 *.test.*
队列Queue2 绑定键为 test.#
如果一条消息携带的路由键为 A.test.B,那么队列Queue1将会收到;
如果一条消息携带的路由键为 test.A.B,那么队列Queue2将会收到;
当一个队列的绑定键为 # ,那么它会无视路由键, 接收所有的消息

  • Fanout Exchange

扇型交换器,没有路由键概念,即使绑了路由键也会无视。 这个交换器在接收到消息后,会直接转发到绑定到它上面的所有队列。

  • Headers

Headers交换器匹配AMQP消息的Header, 而不是路由键.
Headers交换器和DirectExchange交换器完全一致, 但是性能低下, 几乎不再使用.


安装RabbitMQ(Windows直接安装)

  • 安装Erlang环境

RabbitMQ由Erlang语言开发,运行rabbitmq需要先安装对应版本的Erlang环境

  • RabbitMQ安装成功后,在系统服务中可以看到rabbitmq服务

image.png

  • 安装rabbitmq管理插件

进入rabbitmq安装目录,使用rabbitmq-plugins enable rabbitmq_management命令安装此插件
image.png

  • 中文主机名可能遇到的问题

rabbitmq服务默认在系统服务目录下,若系统用户名为中文,会导致rabbitmq服务启动失败。
解决方案一:控制面板修改主机名,改为英文
解决方案二:修改rabbitmq服务目录(https://blog.csdn.net/u012211603/article/details/88537382)
解决方案三:容器化,使用docker for windows创建一个rabbitmq容器
本机用户名也是中文名,遇到了这个问题,最终卸载了Erlang环境和rabbitmq,采用docker容器

  • 通过15672端口,进入rabbitmq管理页面

默认账号密码为guest
image.png


安装RabbitMQ(通过Docker容器)

  • 安装Docker For Windows

Docker for windows默认只支持win10专业版和企业版
其他版本无法直接使用docker,可以通过docker toolbox来使用
win10家庭版则可以通过伪装专业版的方式来安装docker for windows
https://www.cnblogs.com/samwu/p/10360943.html

  • Windows下Docker的使用

Docker for windows安装完成之后并运行
在cmd命令行或者windows power shell可以查看docker版本信息
其他操作基本与linux下docker操作一致

Windows PowerShell
版权所有 (C) Microsoft Corporation。保留所有权利。
尝试新的跨平台 PowerShell https://aka.ms/pscore6
PS C:\\WINDOWS\\system32> docker --version
Docker version 19.03.5, build 633a0ea
PS C:\\WINDOWS\\system32>
  • 拉取带有rabbitmq_management插件的rabbitmq镜像
PS C:\\WINDOWS\\system32>docker pull rabbitmq:3.7-management
......
PS C:\\WINDOWS\\system32> docker images
REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE
rabbitmq            3.7-management      4eaadc0cb14d        3 days ago          179MB
nginx               latest              2073e0bcb60e        3 weeks ago         127MB
  • 创建rabbitmq容器

容器的5672端口为rabbitmq消息通信端口,映射的宿主机的5672端口
容器的15672端口为rabbitmq的管理端口,也映射到宿主机的15672端口
容器的其他端口暂时无需映射

PS C:\\WINDOWS\\system32>docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq rabbitmq:3.7-management
......
PS C:\\WINDOWS\\system32> docker ps -a
CONTAINER ID        IMAGE                     COMMAND                  CREATED             STATUS              PORTS                                                                                        NAMES
e23698ac46b4        rabbitmq:3.7-management   "docker-entrypoint.s…"   15 hours ago        Up About an hour    4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp   myrabbitmq
  • 访问宿主机的15672端口,进入rabbitmq管理页面

默认账号密码为guest
image.png


Springboot整合RabbitMQ

创建项目

  • 创建一个Springboot项目

image.png

  • 编辑项目的pom文件,引入相关maven依赖
<!--springboot父依赖-->
    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.0.2.RELEASE</version>
    </parent>
    <dependencies>
        <!--springMvc-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--Rabbit MQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

(一)直连型交换机Direct Exchange

消息生产者
  • 创建直连型消息生产者模块RabbitMQ_Provider_DirectExchange

image.png

  • 编辑直连型消息生产者的配置文件application.yml
server:
  port: 8080
spring:
  # 模块名
  application:
    name: RabbitMQ_Provider
  #配置rabbitmq
  rabbitmq:
    # rabbitmq服务器地址
    host: 127.0.0.1
    # rabbitmq消息通信端口
    port: 5672
    # 账号和密码
    username: guest
    password: guest
  • 编辑直连型消息生产者的启动类Provider_DirectExchange_Application
package cn.zack;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Provider_DirectExchange_Application {
    public static void main(String[] args) {
        SpringApplication.run(Provider_DirectExchange_Application.class, args);
    }
}
  • 编辑直连型交换器配置类ProviderMyDirectExchangeConfig
package cn.zack.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ProviderMyDirectExchangeConfig {

    /**
     * 创建队列,名为MyDirectExchangeQueue
     * @return
     */
    @Bean
    public Queue MyDirectExchangeQueue(){
        return new Queue("MyDirectExchangeQueue",true);
    }

    /**
     * 创建直连型交换机,名为MyDirectExchange
     * @return
     */
    @Bean
    DirectExchange MyDirectExchange(){
        return new DirectExchange("MyDirectExchange");
    }

    /**
     * 绑定,将直连队列和直连交换器绑定,并设置绑定键值为MyDirectExchangeRouting
     * @return
     */
    @Bean
    Binding MyDirectExchangeBinding(){
        return BindingBuilder.bind(MyDirectExchangeQueue()).to(MyDirectExchange()).with("MyDirectExchangeRouting");
    }
}
  • 编辑直连型消息生产者消息推送类ProviderMyDirectExchangeController
package cn.zack.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.HashMap;

@RestController
@RequestMapping(path = "directExchange")
public class ProviderMyDirectExchangeController {
    @Autowired
    // 注入rabbitTemplate
    private RabbitTemplate rabbitTemplate;

    @GetMapping(path = "send/{messageData}")
    public String sendDirectExchangeMsg(@PathVariable("messageData") String messageData){
        HashMap<String, String> hashMap = new HashMap<>();
        hashMap.put("createTime",new Date().toString());
        hashMap.put("messageData",messageData);
        // 将消息携带绑定键值MyDirectExchangeRouting,发送到MyDirectExchange交换机
        rabbitTemplate.convertAndSend("MyDirectExchange","MyDirectExchangeRouting",hashMap);
        return "OK";
    }
}
  • 推送消息

启动直连型消息生产者模块\n访问http://localhost:8081/directExchange/send/hello
推送一个hello

  • 查看rabbitmq管理页

可以看到,此时消息队列已经成功收到一条消息,此消息还处于ready状态,未被消费
image.png

消息消费者
  • 创建直连型消息消费者模块

image.png

  • 编辑直连型消息消费者的配置文件application.yml
server:
  port: 8091
spring:
  # 应用名
  application:
    name: RabbitMQ_Consumer_DirectExchange
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
  • 编辑直连型消息消费者的启动类Consumer_DirectExchange_Application
package cn.zack;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Consumer_DirectExchange_Application {
    public static void main(String[] args) {
        SpringApplication.run(Consumer_DirectExchange_Application.class, args);
    }
}
  • 和生产者一样, 编辑直连型消息消费者的配置类ConsumerMyDirectExchangeConfig
package cn.zack.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerMyDirectExchangeConfig {

    /**
     * 创建队列,名为MyDirectExchangeQueue
     *
     * @return
     */
    @Bean
    public Queue MyDirectExchangeQueue() {
        return new Queue("MyDirectExchangeQueue", true);
    }

    /**
     * 创建直连型交换机,名为MyDirectExchange
     *
     * @return
     */
    @Bean
    DirectExchange MyDirectExchange() {
        return new DirectExchange("MyDirectExchange");
    }

    /**
     * 绑定,将直连队列和直连交换器绑定,并设置绑定键值为MyDirectExchangeRouting\n     *
     * @return
     */
    @Bean
    Binding MyDirectExchangeBinding() {
        return BindingBuilder.bind(MyDirectExchangeQueue()).to(MyDirectExchange()).with("MyDirectExchangeRouting");
    }
}
  • 编辑消息消费者的消息监听类ConsumerMyDirectExchangeReceiver
package cn.zack.controller;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * 指定监听的队列名称
 */
@RabbitListener(queues = "MyDirectExchangeQueue")
@Component
public class ConsumerMyDirectExchangeReceiver {
    @RabbitHandler
    public void process(Map hashMap){
        System.out.println("ConsumerDirectExchange接收到消息: " + hashMap.get("messageData"));
        System.out.println("消息发送时间是: " + hashMap.get("createTime"));
    }
}
  • 监听消息

启动直连型消息消费者模块
可以看到控制台输出了监听到的MyDirectExchangeQueue队列的消息
image.png
此时rabbitmq控制台可以看到MyDirectExchangeQueue队列未被消费(ready状态)的消息已经为0
image.png
再次访问直连型消息生产者的消息推送地址
http://localhost:8081/directExchange/send/helloworld
推送一条helloword
可以看到,消息消费者监听到了helloworld这条消息
image.png


(二)主题型交换机Topic Exchange

// todo


(三)消息的持久化

// todo


(四)消息的确认和应答

  • 消息确认模式

默认情况下, 生产者把消息发送出去之后, RabbitMQ服务器(broker)不会返回任何信息给消息生产者.
也就是说消息生产者不知道消息有没有正确到达broker.
如果在消息到达broker之前发生了宕机, 或者broker接收到消息但在写入磁盘时发生了宕机, 这样消息就会丢失.

RabbitMQ提供了两种解决方式:

  1. 通过AMQP的事务机制:
    在AMQP中把信道设置成事务模式之后, 生产者和broker之间会有一种发送/响应机制判断当前命令操作是否可以继续.
    由于事务模式需要生产者应用同步等待broker的执行结果, 性能会急剧下降, 解决方案偏重量级.
  2. 把信道设置为确认模式
    发送方确认模式是RabbitMQ对AMQP的扩展实现, 把信道设置成确认模式之后, 在该信道上发布的所有消息都会被分配一个唯一id, 一旦消息被投递到匹配的队列中, 该信道就会向生产者发送一个包含了消息唯一id的确认消息, 告知生产者此消息发送成功, 已经投递到目的队列.
    发送方确认模式的最大优势是异步, 生产者发送完一条消息之后可以继续发送下一条消息, 当生产者收到确认消息后调用回调方法处理.

以直连模式为例, 开启消息发送方确认模式:

  1. 修改配置文件
server:
  port: 8081
spring:
  # 应用名
  application:
    name: RabbitMQ_Provider_DirectExchange
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 开启消息确认(消息生产者发送消息到rabbitmq服务器后, 会被告知是否发送成功)
    publisher-confirms: true
    publisher-returns: true
  1. 配置类实现rabbitmq确认模式接口
package cn.zack.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Configuration
public class ProviderMyDirectExchangeConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        // 指定 ConfirmCallback
        rabbitTemplate.setConfirmCallback(this);
        // 指定 ReturnCallback
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 创建队列,名为MyDirectExchangeQueue
     *
     * @return
     */
    @Bean
    public Queue MyDirectExchangeQueue() {
        return new Queue("MyDirectExchangeQueue", true);
    }

    /**
     * 创建直连型交换机,名为MyDirectExchange
     *
     * @return
     */
    @Bean
    DirectExchange MyDirectExchange() {
        return new DirectExchange("MyDirectExchange");
    }

    /**
     * 绑定,将直连队列和直连交换器绑定,并设置绑定键值为MyDirectExchangeRouting
     *
     * @return
     */
    @Bean
    Binding MyDirectExchangeBinding() {
        return BindingBuilder.bind(MyDirectExchangeQueue()).to(MyDirectExchange()).with("MyDirectExchangeRouting");
    }

    /**
     * 实现Rabbitmq的ConfirmCallback接口, 开启消息发送成功确认
     *
     * @param correlationData 消息标识
     * @param b 是否成功
     * @param s 失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        System.out.println("唯一消息标识: " + correlationData);
        System.out.println("确认结果: " + b);
        System.out.println("失败原因: " + s);
    }

    /**
     * 实现Rabbitmq的ReturnCallback接口, 开启消息发送失败返回
     *
     * @param message 消息主体
     * @param i 消息主体
     * @param s 描述
     * @param s1 交换器
     * @param s2 路由键
     */
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("消息主体: " + message);
        System.out.println("消息主体: " + i);
        System.out.println("描述: " + s);
        System.out.println("消息使用的交换器: " + s1);
        System.out.println("消息使用的路由键: " + s2);
    }
}
  1. 重启直连交换器的消息生产者
    调用发送消息的方法, 发送一次消息, 控制台可以看到消息确认成功的信息
    image.png
  • 消息应答模式

// todo