“RabbitMQ入门指南:从入门到起飞,这一篇就够!打造高效消息通信系统的第一步“。

1.前言

        RabbitMQ是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)的标准,并用Erlang语言编写。作为消息代理,RabbitMQ接收、存储和转发消息,帮助应用程序之间实现异步通信。它提供了一个强大而灵活的消息传递机制,可以在分布式系统中可靠地传递消息,确保消息的顺序性和可靠性。

        RabbitMQ的核心概念包括生产者、消费者、交换机、队列和绑定。生产者负责发送消息,消费者负责接收消息,交换机负责接收来自生产者的消息并将它们路由到一个或多个队列,队列存储消息直到消费者准备接收它们,而绑定则定义了交换机和队列之间的关系。

        RabbitMQ具有许多特性,包括可靠性、灵活的路由、集群和高可用性、可扩展性、管理界面、多种协议支持和可编程性。它被广泛应用于构建分布式系统中的消息队列、异步任务处理、日志收集、事件驱动架构等场景,是一个强大而受欢迎的消息中间件解决方案。

                1.1 前置知识

1. 同步通信 和 异步通信

        微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式,为什么呢? 

解读:

  • 同步通讯:就如同打视频电话,双方的交互都是实时的。因此同一时刻你只能跟一个人打视频电话。

  • 异步通讯:就如同发微信聊天,双方的交互不是实时的,你不需要立刻给对方回应。因此你可以多线操作,同时跟多人聊天。

两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发微信可以同时与多个人收发微信,但是往往响应会有延迟。

所以,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。

同步通信:服务返回响应后才可以进行后续的操作。

存在的问题:

  • 扩展性差
    • 随着业务规模扩大,产品的功能也在不断完善。每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则(面向修改关闭,面向拓展开放),拓展性不好。

  • 性能下降
    • 我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和。

  • 级联失败 
    • 由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。

      这其实就是同步调用的级联失败问题。

    • 比如:比如说支付成功,短信发送出现问题了,就给我们退款了。

    • 级联失败雪崩:由于一系列问题或错误的积累,最终导致系统或项目崩溃或失败的现象。

异步调用: 只发送通知,发送完就可以结束了,具体你有没有收到,什么时候收到,我不关心。

介绍:

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方

  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器

  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。

这样,发送消息的人和接收消息的人就完全解耦了。

优势:

  • 耦合度更低

  • 性能更好

  • 业务拓展性强

  • 故障隔离,避免级联失败

  • 消峰

    • 消峰的原理就是全部都放在消息队列,里面后续的业务慢慢的取

缺点:

  • 完全依赖于Broker的可靠性、安全性和性能

  • 架构复杂,后期维护和调试麻烦

        1.2 不同MQ之间的对比 

消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.

目比较常见的MQ实现:

  • ActiveMQ

  • RabbitMQ

  • RocketMQ

  • Kafka

几种常见MQ的对比:

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

2. RabbitMQ的安装

        2.1 执行Docker命令

docker run -d \-p 5672:5672 \-p 15672:15672 \-e RABBITMQ_DEFAULT_VHOST=default_vhost \-e RABBITMQ_DEFAULT_USER=default_user \-e RABBITMQ_DEFAULT_PASS=default_pass \--hostname my_rabbitmq \--name rabbitmq \rabbitmq
  • 15672:RabbitMQ提供的管理控制台的端口

  • 5672:RabbitMQ的消息发送处理接口

参数说明:

  • docker run: 运行 Docker 容器的命令。

  • -d: 在后台运行容器,即以守护进程的方式运行容器。

  • -p 15672:15672 -p 5672:5672: 将容器的端口 15672(RabbitMQ 控制台 Web 界面的端口)和 5672(RabbitMQ 应用访问的端口)映射到主机的对应端口。这样可以通过主机的这些端口来访问 RabbitMQ。

  • -e RABBITMQ_DEFAULT_VHOST=my_vhost: 设置 RabbitMQ 默认的虚拟机名为 my_vhost。虚拟机(vhost)是 RabbitMQ 中用于隔离不同应用程序或服务的逻辑隔离单元。

  • -e RABBITMQ_DEFAULT_USER=admin: 设置 RabbitMQ 默认的用户名为 admin

  • -e RABBITMQ_DEFAULT_PASS=123456: 设置 RabbitMQ 默认的用户密码为 123456。

  • --hostname myRabbit: 指定容器的主机名为 myRabbit。在 RabbitMQ 中,节点名称被用于存储数据,而默认情况下会使用主机名。因此,在此设置了容器的主机名。

  • --name rabbitmq: 设置容器的名称为 rabbitmq

  • rabbitmq: 指定要使用的容器镜像为 rabbitmq,这是 RabbitMQ 官方提供的 Docker 镜像。

综上所述,该命令的作用是以后台方式启动一个 RabbitMQ 容器,配置了默认的虚拟机名、用户名和密码,并将容器的端口映射到主机上,使得可以通过主机访问 RabbitMQ 控制台和应用服务。

        2.2 设置开机自启动

docker update rabbitmq --restart=always

         2.3 启动 rabbitmq_management

docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management

 为什么要启动这个:

这个命令的作用是在名为“rabbitmq”的Docker容器中启用RabbitMQ管理插件。具体来说,它启用了RabbitMQ的管理插件,这个插件提供了一个Web界面,可以通过浏览器来管理RabbitMQ服务器。
通常情况下,启用管理插件是为了方便地监控和管理RabbitMQ服务器,可以通过浏览器访问http://<RabbitMQ服务器的IP地址>:15672来打开RabbitMQ的管理界面。

3. RabbitMQ 核心模块介绍

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方 (发送给交换机)

  • consumer:消费者,也就是消费消息的一方(和队列进行绑定(监听))

  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

    • 交换机只能路由消息,无法存储消息
    • 交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定 
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue,因为RabiitMQ性能很强,单个项目使用会造成巨大的浪费,所以多个项目,实现一套MQ,virtual host就是为了不同交换机产生隔离(和容器概念一样)

4. 数据隔离 

点击Admin选项卡,首先会看到RabbitMQ控制台的用户管理界面:

这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima这个用户。仔细观察用户表格中的字段,如下:

  • Nameitheima,也就是用户名

  • Tagsadministrator,说明itheima用户是超级管理员,拥有所有权限

  • Can access virtual host/,可以访问的virtual host,这里的/是默认的virtual host

对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host的隔离特性,将不同项目隔离。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。

  • 给每个项目创建不同的virtual host,将每个项目的数据隔离。

5.SpringAMQP 

        Spring AMQP(Spring for Advanced Message Queuing Protocol)是 Spring 框架的一个模块,用于与 AMQP(Advanced Message Queuing Protocol)兼容的消息中间件进行集成。AMQP 是一种消息协议,用于在分布式应用程序之间传递消息。

        将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

        但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

5. 1 导入依赖

<!--    引入SpringAMQP的依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

5. 2 控制台新建一个队列

5. 3 消息发送

首先配置MQ的地址,在配置文件中添加配置

spring:rabbitmq:username: windStop  # RabbitMQ用户名host: 8.130.10.216  # RabbitMQ主机地址password: 123       # RabbitMQ密码port: 5672          # RabbitMQ端口号virtual-host: /windStop  # RabbitMQ虚拟主机

然后编写测试类ConsumerApplicationTest,并利用RabbitTemplate 实现消息发送。

RabbitTemplate 是一个RabbitMQ的模板类,用于发送消息到RabbitMQ队列或者交换机。

@SpringBootTest
public class ConsumerApplicationTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void ConsumerTest(){// 定义要发送的队列String queue = "simple.queue";// 定义要发送的信息String message = "你好啊, spring AMPQ";rabbitTemplate.convertAndSend(queue,message);}
}

 5.4  消息接收

 上同,首先配置MQ的地址。

spring:
  rabbitmq:
    username: windStop  # RabbitMQ用户名
    host: 8.130.10.216  # RabbitMQ主机地址
    password: 123       # RabbitMQ密码
    port: 5672          # RabbitMQ端口号
    virtual-host: /windStop  # RabbitMQ虚拟主机
 

listener包中新建一个类SpringRabbitListener

@Component
@Slf4j
public class SpringRabbitListener {/*** 接收到的消息会以String类型的msg参数传入方法中* @param msg*/@RabbitListener(queues = "simple.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列public void listenSimpleQueueMessage(String msg){log.info("Spring 消费者接收到的消息:{}",msg);}
} 

5.5  总结一下核心步骤 

Spring AMQP收发消息的步骤如下:

  1. 引入spring-boot-starter-amqp依赖。
  2. 配置RabbitMQ服务端信息,包括用户名、密码、主机地址、端口号等。
  3. 使用RabbitTemplate来发送消息到RabbitMQ服务器。
  4. 使用@RabbitListener注解声明要监听的队列,并编写相应的方法来处理接收到的消息内容。

6.  Work Queues

概念:任务模型。简单来说就是让多个消费者绑定到一个队列中,共同消费队列中的信息。

介绍一下:生产者消费者模型

  • 生产者和消费者之间解耦:生产者和消费者之间通过一个共享的缓冲区(队列)来进行通信,彼此不直接依赖。
  • 异步性:生产者可以持续不断地生成数据,而消费者可以独立地处理这些数据,实现异步处理。
  • 实现多线程并发:生产者和消费者可以在不同的线程中运行,提高系统的吞吐量和效率。

     在RabbitMQ中,生产者将消息发送到队列中,而消费者则从队列中获取消息进行处理,实现了生产者消费者模型的应用。

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了

存在的问题: 

        发送完消息会被哪个消费者处理呢?还是所有消费者都会处理?怎么分配?

        6.1 入门案例

6.1.1 创建队列

首先,我们在控制台创建一个新的队列,命名为work.queue

6.1.2 定义两个消费者模型
/*** 接收到的消息会以String类型的msg参数传入方法中* @param msg*/
@RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenWorkQueueMessage1(String msg){System.out.println("消费者1接收到消息:" + msg + "," + LocalDateTime.now());
}@RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenWorkQueueMessage2(String msg){System.err.println("消费者2接收到消息:" + msg + "," + LocalDateTime.now());
}
6.1.3 生产者发送五十条信息            
@Test
public void workQueueTest(){// 定义要发送的队列String queue = "work.queue";//循环发送五十条数据for (int i = 1; i <= 50; i++) {String message = "你好, SpringAMQP" + i;rabbitTemplate.convertAndSend(queue,message);}
}

通过输出结果可以分析答案是:

        消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。每个都是收到一条。

6.1.4 模拟快慢消费者
/*** 接收到的消息会以String类型的msg参数传入方法中* @param msg*/
@RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenWorkQueueMessage1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:" + msg + "," + LocalDateTime.now());Thread.sleep(50);//通过睡眠短时间模拟快消费者
}@RabbitListener(queues = "work.queue")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenWorkQueueMessage2(String msg) throws InterruptedException {System.err.println("消费者2接收到消息:" + msg + "," + LocalDateTime.now());Thread.sleep(500);//通过睡眠长时间模拟慢消费者
}

注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

  • 消费者1 sleep了50毫秒,相当于每秒钟处理20个消息

  • 消费者2 sleep了500毫秒,相当于每秒处理2个消息

        也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。 

        6.2 能者多劳 (动态分配权重)

        默认情况下,RabbitMO的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没

有考虑到消费者是否已经处理完消息,可能出现消息堆积。

        默认情况下:无论你有没有处理完都给你分配。设置为1,就是处理完才给你分配。

分配消费者的预取限制

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息 

 这是一个消费者的预取(Prefetch)限制设置。它定义了消费者在从 RabbitMQ 服务器获取消息时一次预取的消息数量。 

 如果不设置。消费者默认的预取限制将会是无限制的,即一次性获取尽可能多的消息。

  1. 资源过度占用: 如果消费者一次性获取大量消息,但处理消息的速度较慢,就会导致大量消息堆积在消费者端,消耗大量内存和其他系统资源。这可能导致系统的负载急剧增加,甚至导致系统崩溃。

  2. 不可控的消费行为: 一次性获取大量消息会导致消费者处理速度不可控,快速消费完部分消息后,可能会因为处理时间长的消息而导致整体处理速度下降。

  3. 不公平的消息分发: 如果一次性获取大量消息,可能会导致消息在消费者之间分布不均匀,一些消费者可能会快速处理完消息而另一些消费者处理速度较慢,从而导致消息处理效率不高。

  4. 消息积压和延迟: 一次性获取大量消息可能导致消息积压,影响系统对消息的实时处理能力,也会增加消息的处理延

综上所述,合适的预取限制可以帮助控制系统资源的使用,确保消息的平稳处理,避免系统崩溃和消息处理效率低下的问题。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

7. 交换机类型 

在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机。

  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。

  • Consumer:消费者,与以前一样,订阅队列,没有变化。当消费者处理完毕后,队列中存储的数据就会被删除。
     

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机。

  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列。

  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符。

  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

        7.1 Fanout交换机 

在广播模式下,消息发送流程是这样的:复制成n份,发送给每一个队列。 

  • 1) 可以有多个队列

  • 2) 每个队列都要绑定到Exchange(交换机)

  • 3) 生产者发送的消息,只能发送到交换机

  • 4) 交换机把消息发送给绑定过的所有队列

  • 5) 订阅队列的消费者都能拿到消息

作用解析:分布式架构中,每个模块绑定一个队列,然后对于支付完成后,我们就可以广播给多个队列让他们进行处理,比如:支付后发信息通知,支付后添加积分。 

         7.1.1 代码实现
1. 创建队列:

2. 创建交换机:

3.  绑定队列和交换机之间的关系: 

4. 添加消费者:
@RabbitListener(queues = "fanout.queue1")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenFanoutQueueMessage1(String msg){System.err.println("消费者fanout1 接收到消息:" + msg + "," + LocalDateTime.now());
}@RabbitListener(queues = "fanout.queue2")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenFanoutQueueMessage2(String msg) {System.err.println("消费者fanout2 接收到消息:" + msg + "," + LocalDateTime.now());
}
5.  添加生成者:
@Test
public void fanoutTest(){// 定义要发送的队列String exchangeNane = "windStop.fanout";// 定义要发送的信息String message = "大家好啊!";rabbitTemplate.convertAndSend(exchangeNane,null ,message);
}

交换机的作用是什么?

  • 接收publisher发送的消息

  • 将消息按照规则路由到与之绑定的队列

  • 不能缓存消息,路由失败,消息丢失

  • FanoutExchange的会将消息路由到每个绑定的队列 

7.2 Direct交换机 

        在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。按照规则进行路由。并且一个队列可以绑定多个规则(路由键)。

在Direct模型下:

  • 每一个Queue都与Exchange设置一个BindingKey(路由key)。
  • 发布者发送消息时,指定消息的RoutingKey。
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。

暗号一样,才会给你。 

        作用解析:分布式架构中,每个模块绑定一个队列,然后对于支付完成后,我们就可以广播给多个队列让他们进行处理,也有些操作不需要发给所有,就需要按照键匹配,支付后发信息通知,支付后添加积分。我取消支付就不需要这两种,但我还需要其他的支付的操作。

        7.2.1 代码实现

1. 创建队列

2. 创建交换机

3. 创建交换机和路由之间的关系

         3.1 进入交换机

        3.2 绑定关系并且指定 BindingKey

因为RabbitMQ官网没有设置同时绑定多个BindingKey,所以要想绑定多个BindingKey就要bind多次。

 

绑定成功后的页面

4. 创建消费者

/*** 订阅* @param msg 接收到的内容*/
@RabbitListener(queues = "direct.queue1")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenDirectQueueMessage1(String msg){System.err.println("消费者fanout1 接收到消息:" + msg + "," + LocalDateTime.now());
}@RabbitListener(queues = "direct.queue2")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenDirectQueueMessage2(String msg) {System.err.println("消费者fanout2 接收到消息:" + msg + "," + LocalDateTime.now());
}

5. 创建生产者

这个时候就需要指定第二个参数:

@Test
public void directTest(){// 定义要发送的队列String exchangeNane = "windStop.direct";// 定义要发送的信息String message = "红色:震惊,李旭居然是人!";rabbitTemplate.convertAndSend(exchangeNane,"red",message);
}

因为二者都绑定了,red这个路由key,所以direct.queue1和direct.queue2都能收到。

@Test
public void directTest2(){// 定义要发送的队列String exchangeNane = "windStop.direct";// 定义要发送的信息String message = "蓝色:明天就要上课了。";rabbitTemplate.convertAndSend(exchangeNane,"blue",message);
}

 因为只有direct.queue1绑定了blue这个路由key,所以只有direct.queue1能收到。

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

7.3. Topic交换机

7.3.1 .说明

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如item.insert

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu

  • item.*:只能匹配item.spu

假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news 代表有中国的新闻消息;

  • china.weather 代表中国的天气消息;

  • japan.news 则代表日本新闻

  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:

    • china.news

    • china.weather

  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:

    • china.news

    • japan.news

接下来,我们就按照上图所示,来演示一下Topic交换机的用法。

首先,在控制台按照图示例子创建队列、交换机,并利用通配符绑定队列和交换机。创建步骤和上述一样,最终结果如下:

7.3.2 创建消费者 
/*** 通配符订阅* @param msg 接收到的内容*/
@RabbitListener(queues = "topic.queue1")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenTopicQueueMessage1(String msg){System.out.println("消费者topic1 接收到消息:" + msg + "," + LocalDateTime.now());
}@RabbitListener(queues = "topic.queue2")//表示该方法是一个RabbitMQ消息监听器,会监听名为"simple.queue"的队列
public void listenTopicQueueMessage2(String msg) {System.err.println("消费者topic2 接收到消息:" + msg + "," + LocalDateTime.now());
}
7.3.3 创建生产者
@Test
public void testTopicExchange() {// 交换机名称String exchangeName = "windStop.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

 因为二者都匹配了,前者前缀匹配,后者后缀匹配,所以topic.queue1和topic.queue2都能收到。

@Test
public void testTopicExchange2() {// 交换机名称String exchangeName = "windStop.topic";// 消息String message = "今天天气真不错,我的心情好极了";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}

只有topic.queue1会匹配到。.weacher不符合topic.queue2的后缀要求。  

7.3.4 总结

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割

  • Topic交换机与队列绑定时的bindingKey可以指定通配符

  • #:代表0个或多个词

  • *:代表1个词

8. 声明队列和交换机

        在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,需要重新创建发布时候的RabiitMQ,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。-> 使用可视化面板创建 

因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。-> 代码创建

         8.1 代码创建的基本API

声明队列和交换机:
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  1. Queue:用于声明队列,可以用工厂类QueueBuilder构建。
  2. Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建。
  3. Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建。
 1. 创建队列

2. 创建交换机

我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:

而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象:

        3. fanout示例

        基于AMQP协议的消息队列系统,通过声明式的配置方式,RabbitMQ客户端会在应用启动时自动创建交换机和队列,并建立它们之间的对应关系,从而为应用程序提供便捷的消息队列支持。

@Configuration
public class FanoutConfig {// 声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("windStop.fanout");}// 声明第 1 个队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列 1 和 交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 声明第 2 个队列@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
       4. direct示例

direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:

@Configuration
public class DirectConfig {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}

这种方式,虽然可以实现但是很臃肿,每绑定一个BindingKey就需要多写个路由关系的方法。

8.2 基于注解声明

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

例如,我们同样声明Direct模式的交换机和队列:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

是不是简单多了。

介绍一下:

  1. @RabbitListener(bindings = @QueueBinding(...)): 这里声明了一个RabbitMQ的消息监听器,通过bindings参数指定了队列绑定的相关配置。

  2. value = @Queue(name = "direct.queue1"): 在这里,我们声明了一个名为"direct.queue1"的队列。这表示我们将会监听这个特定的队列。

  3. exchange = @Exchange(name = "windStop.direct", type = ExchangeTypes.DIRECT): 这里声明了一个名为"windStop.direct"的订阅类型的交换机。订阅交换机(Direct Exchange)根据消息的routing key将消息路由到特定的队列。

  4. key = {"red", "blue"}: 这里指定了队列和交换机之间的绑定关系。对于队列"direct.queue1",它将会接收所有routing key为"red"或"blue"的消息。

再试试Topic模式:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

9. 总结

        这段文档内容非常全面地介绍了 RabbitMQ 的核心概念、安装、配置以及各种交换机类型的使用方法。它包括了 RabbitMQ 的前言介绍,不同类型的通信方式,不同 MQ 的对比,以及 RabbitMQ 的安装和核心模块介绍。同时也涵盖了 Spring AMQP 的使用方法和示例,以及 Work Queues、交换机类型(Fanout、Direct、Topic)的详细说明和代码实现。此外祝大家周末愉快!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/346180.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt开发技术:Q3D图表开发笔记(四):Q3DSurface三维曲面图颜色样式详解、Demo以及代码详解

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/139424086 各位读者&#xff0c;知识无穷而人力有穷&#xff0c;要么改需求&#xff0c;要么找专业人士&#xff0c;要么自己研究 红胖子网络科技博…

PawSQL优化 | 分页查询太慢?别忘了投影下推

​在进行数据库应用开发中&#xff0c;分页查询是一项非常常见而又至关重要的任务。但你是否曾因为需要获取总记录数的性能而感到头疼&#xff1f;现在&#xff0c;让PawSQL的投影下推优化来帮你轻松解决这一问题&#xff01;本文以TPCH的Q12为案例进行验证&#xff0c;经过Paw…

Redisson分布式锁原理解析

前言 首先Redis执行命令是单线程的&#xff0c;所以可以利用Redis实现分布式锁&#xff0c;而对于Redis单线程的问题&#xff0c;是其线程模型的问题&#xff0c;本篇重点是对目前流行的工具Redisson怎么去实现的分布式锁进行深入理解&#xff1b;开始之前&#xff0c;我们可以…

Vmess协议是什么意思? VLESS与VMess有什么区别?

VMess 是一个基于 TCP 的加密传输协议&#xff0c;所有数据使用 TCP 传输&#xff0c;是由 V2Ray 原创并使用于 V2Ray 的加密传输协议&#xff0c;它分为入站和出站两部分&#xff0c;其作用是帮助客户端跟服务器之间建立通信。在 V2Ray 上客户端与服务器的通信主要是通过 VMes…

ThinkPHP发邮件配置教程?群发功能安全吗?

ThinkPHP发邮件的注意事项&#xff1f;如何优化邮件发送的性能&#xff1f; 无论是用户注册、密码重置还是消息提醒&#xff0c;发送邮件都是一个常见的需求。AokSend将详细介绍如何在ThinkPHP框架中配置和发送邮件&#xff0c;帮助开发者轻松实现邮件功能。 ThinkPHP发邮件&…

43【PS 作图】颜色速途

1 通过PS让画面细节模糊&#xff0c;避免被过多的颜色干扰 2 分析画面的颜色 3 作图 参考网站&#xff1a; 色感不好要怎么提升呢&#xff1f;分享一下我是怎么练习色感的&#xff01;_哔哩哔哩_bilibili https://www.bilibili.com/video/BV1h1421Z76p/?spm_id_from333.1007.…

【Python教程】3-控制流、循环结构与简单字符串操作

在整理自己的笔记的时候发现了当年学习python时候整理的笔记&#xff0c;稍微整理一下&#xff0c;分享出来&#xff0c;方便记录和查看吧。个人觉得如果想简单了解一名语言或者技术&#xff0c;最简单的方式就是通过菜鸟教程去学习一下。今后会从python开始重新更新&#xff0…

MySQL之查询性能优化(七)

查询性能优化 排序优化 无论如何排序都是一个成本很高的操作&#xff0c;所以从性能角度考虑&#xff0c;应尽可能避免排序或者尽可能避免对大量数据进行排序。前面已经提到了&#xff0c;当不能使用索引生成排序结果的时候&#xff0c;MySQL需要自己进行排序&#xff0c;如果…

人脸考勤项目实训

第一章 Python-----Anaconda安装 文章目录 第一章 Python-----Anaconda安装前言一、Anaconda是什么&#xff1f;二、Anaconda的前世今生二、Windows安装步骤1.官网下载2.安装步骤安装虚拟环境 总结 前言 工欲善其事必先利其器&#xff0c;项目第一步&#xff0c;安装我们的环境…

【Unity UGUI】Screen.safeArea获取异形屏数据失败

Screen.safeArea获取不到异形屏的尺寸位置等数据 检查AndroidManifest.xml文件是否有设置&#xff1a;android:theme"style/UnityThemeSelector"&#xff0c;没有加上即可 android:theme"style/UnityThemeSelector"

第1章Hello world 4/5:对比Rust/Java/C++创建和运行Hello world全过程:运行第一个程序

讲动人的故事,写懂人的代码 1.7 对比Rust/Java/C++创建和运行Hello world全过程 有了会听懂人类的讲话,还能做记录的编程助理艾极思,他们三人的讨论内容,都可以变成一份详细的会议纪要啦。 接下来,我们一起看看艾极思是如何记录下赵可菲创建和运行Java程序Hello world,…

简记:为Docker配置服务代理

简记 为Docker配置服务代理 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:https://blog.csdn.net/qq_28550263/art…

Leetcode3040. 相同分数的最大操作数目 II

Every day a Leetcode 题目来源&#xff1a;3040. 相同分数的最大操作数目 II 解法1&#xff1a;记忆化搜索 第一步可以做什么&#xff1f;做完后&#xff0c;剩下要解决的问题是什么&#xff1f; 删除前两个数&#xff0c;剩下 nums[2] 到 nums[n−1]&#xff0c;这是一个…

分享一个 .NET Core Console 项目中应用 NLog 写日志的详细例子

前言 日志在软件开发中扮演着非常重要的角色&#xff0c;通常我们用它来记录应用程序运行时发生的事件、错误信息、警告以及其他相关信息&#xff0c;帮助在调试和排查问题时更快速地定位和解决 Bug。 通过日志&#xff0c;我们可以做到&#xff1a; 故障排除和调试&#xff…

4.大模型微调技术LoRA

大模型低秩适配(LoRA)技术 现有PEFT 方法的局限与挑战 Adapter方法,通过增加模型深度而额外增加了模型推理延时。Prompt Tuning、Prefix Tuning、P-Tuning等方法中的提示较难训练,同时缩短了模型可用的序列长度。往往难以同时实现高效率和高质量,效果通常不及完全微调(f…

已解决Error || RuntimeError: size mismatch, m1: [32 x 100], m2: [500 x 10]

已解决Error || RuntimeError: size mismatch, m1: [32 x 100], m2: [500 x 10] 原创作者&#xff1a; 猫头虎 作者微信号&#xff1a; Libin9iOak 作者公众号&#xff1a; 猫头虎技术团队 更新日期&#xff1a; 2024年6月6日 博主猫头虎的技术世界 &#x1f31f; 欢迎来…

基于Java-SpringBoot-VUE-MySQL的高校数字化迎新管理系统

基于Java-SpringBoot-VUE-MySQL的高校数字化迎新管理系统 登陆界面 联系作者 如需本项目源代码&#xff0c;可扫码或者VX:bob1638联系作者。 首页图表 系统功能持续更新中。。。 介绍 这是一款主要用于高校迎新的系统&#xff0c;主要是采用了SpringBoot2.X VUE2.6 ElementUI2.…

mysql 数据库datetime 类型,转换为DO里面的long类型后,只剩下年了,没有了月和日

解决方法也简单&#xff1a; 自定义个一个 Date2LongTypeHandler <resultMap id"BeanResult" type"XXXX.XXXXDO"><result column"gmt_create" property"gmtCreate" jdbcType"DATE" javaType"java.lang.Long&…

软件游戏steam_api.dll丢失的解决方法,总结5种有效的方法

在玩电脑游戏时&#xff0c;我们经常会遇到一些错误提示&#xff0c;其中之一就是“游戏缺少steam_api.dll”。这个问题可能让很多玩家感到困惑和烦恼。那么&#xff0c;究竟是什么原因导致游戏缺少steam_api.dll呢&#xff1f;又该如何解决这个问题呢&#xff1f;本文将为大家…

Jmeter压测 —— 1秒发送1次请求

场景&#xff1a;有时候测试场景需要设置请求频率为一秒一次&#xff08;或几秒一次&#xff09;实现方法一&#xff1a;1、首先需要在线程组下设置循环次数&#xff08;可以理解为请求的次数&#xff09; 次数设置为请求300次&#xff0c;其中线程数跟时间自行设置 2、在设置…