概念
主题模式(Topic Exchange)是 RabbitMQ 中一种灵活且强大的消息传递模式,它允许生产者根据消息的特定属性将消息发送到一个交换机,并且消费者可以根据自己的需求来接收感兴趣的消息。主题交换机根据消息的路由键和绑定队列的路由键进行模糊匹配,支持通配符
*
和#
,从而实现了更灵活的消息路由和分发。
工作流程
-
生产者发送消息: 生产者将消息发送到一个主题交换机,并指定一个特定的路由键。
-
交换机根据路由键路由消息: 主题交换机根据消息的路由键和绑定队列的路由键进行模糊匹配。路由键可以包含多个单词,以
.
分隔,例如stock.usd.nyse
、weather.usa.ca.sunny
等。 -
消息发送到匹配的队列: 如果消息的路由键与绑定队列的路由键完全匹配,则将消息发送到对应的队列中。如果路由键中包含通配符
*
或#
,则可以匹配多个单词或多个级别的单词,从而实现更灵活的匹配规则。 -
消费者接收消息: 消费者可以根据自己的需求来选择监听匹配的队列,从而接收感兴趣的消息。消费者可以使用通配符
*
匹配一个单词,或使用#
匹配零个或多个单词。
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开
优点
- 灵活性:生产者可以根据消息的特定属性来发送消息,消费者可以根据自己的需求来接收感兴趣的消息。
- 精确匹配:支持精确的路由键匹配和模糊匹配,可以根据实际需求定义复杂的路由规则。
- 扩展性:可以根据需要动态地添加和修改绑定规则,而不需要停止消息传递服务。
主题模式适用于需要根据消息的特定属性进行灵活路由和分发的场景,例如事件处理、消息过滤、数据分析等。
Springboot集成
示例: 系统应用程序测试的时候,会有不同的BUG,测试人员会将不同的BUG按照规范打上标签(相当于routingKey),然后发送到mq中,然后通过主题模式分发;
标签内容:bug归属.模块.等级 例如: back.order.severity
分发规则如下:
第一个消费者是前端的开发人员:处理所有严重的前端BUG:front.#
第二个消费者是后端负责订单模块开发人员:处理所有的后端order模块BUG:back.order.*
另外还有多个消费者处理不同的BUG,这里只用两个做示例
1.创建队列和交换机并绑定
在TopicConfig中配置
package com.model.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author: Haiven* @Time: 2024/4/22 11:55* @Description: TODO*/
@Configuration
public class TopicConfig {/*** 主题模式交换机* @return exchange*/@Bean(name = "topicExchange")public Exchange getTopicExchange(){return ExchangeBuilder.topicExchange("exchange_topic").build();}/*** 主题队列 01* @return queue*/@Bean(name = "topicQueue01")public Queue getTopicQueue01(){return QueueBuilder.durable("queue_topic_01").build();}/*** 主题队列 02* @return queue*/@Bean(name = "topicQueue02")public Queue getTopicQueue02(){return QueueBuilder.durable("queue_topic_02").build();}/*** 绑定队列 01* @return binding*/@Beanpublic Binding getTopicBinding01(){return BindingBuilder.bind(getTopicQueue01()).to(getTopicExchange())//路由键 队列1接收debug级别的消息.with("front.#").noargs();}/*** 绑定队列 02* @return binding*/@Beanpublic Binding getTopicBinding02(){return BindingBuilder.bind(getTopicQueue02()).to(getTopicExchange())// 路由键 队列2接收info级别的消息.with("back.order.*").noargs();}
}
主题模式的交换机类型为TopicExchange
2.创建消费者
TopicConsumer
package com.model.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @Author: Haiven* @Time: 2024/4/22 10:08* @Description: TODO*/
@Component
public class TopicConsumer {@RabbitListener(queues = {"queue_topic_01"})public void topicConsumer01(String msg){System.out.println("消费者 -01- 接收消息:" + msg);}@RabbitListener(queues = {"queue_topic_02"})public void topicConsumer02(String msg){System.out.println("消费者 -02- 接收消息:" + msg);}
}
3.创建生产者并发送消息
package com.model.controller;import com.code.domain.Response;
import com.model.service.RabbitService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** @Author: Haiven* @Time: 2024/4/19 9:46* @Description: TODO*/
@RestController
@RequestMapping("/producer")
public class ProducerController {@Resourceprivate RabbitService rabbitService;@GetMapping("/simple")public Response<Void> simple(String msg){boolean res = rabbitService.simple(msg);return res ? Response.success() : Response.fail();}@GetMapping("/work")public Response<Void> work(String msg){boolean res = rabbitService.work(msg);return res ? Response.success() : Response.fail();}@GetMapping("/sub")public Response<Void> sub(String msg){boolean res = rabbitService.sub(msg);return res ? Response.success() : Response.fail();}@GetMapping("/routing")public Response<Void> routing(String msg, String type){boolean res = rabbitService.routing(msg, type);return res ? Response.success() : Response.fail();}@GetMapping("/topic")public Response<Void> topic(String msg, String type){boolean res = rabbitService.topic(msg, type);return res ? Response.success() : Response.fail();}
}
package com.model.service.impl;import com.model.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.annotation.Resource;/*** @Author: Haiven* @Time: 2024/4/19 10:51* @Description: TODO*/
@Service
@Slf4j
public class RabbitServiceImpl implements RabbitService {@Resourceprivate RabbitTemplate rabbitTemplate;@Value("${rabbitmq.simple.queue}")private String simpleQueue;@Value("${rabbitmq.work.queue}")private String workQueue;@Overridepublic boolean simple(String msg) {try {rabbitTemplate.convertAndSend(simpleQueue, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean work(String msg) {try {rabbitTemplate.convertAndSend(workQueue, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean sub(String msg) {try {//路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""rabbitTemplate.convertAndSend("exchange_sub","", msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean routing(String msg, String type) {System.out.println("理由模式发送消息:msg="+msg+",type="+type+"");try {//路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""rabbitTemplate.convertAndSend("exchange_routing",type, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean topic(String msg, String type) {System.out.println("主题模式发送消息:msg="+msg+",type="+type+"");try {//主题模式会根据 type的通配符进行分发rabbitTemplate.convertAndSend("exchange_topic",type, msg);return true;}catch (Exception e){e.printStackTrace();return false;}}
}
4.发送消息
接口调用发送消息, type字段为消息的级别
后台接收