RabbitMQ队列及交换机的使用

目录

一、简单模型

1、首先控制台创建一个队列

2、父工程导入依赖 

3、生产者配置文件

 4、写测试类

5、消费者配置文件

6、消费者接收消息

二、WorkQueues模型

1、在控制台创建一个新的队列

2、生产者生产消息

3、创建两个消费者接收消息

4、能者多劳充分利用每一个消费者的能力

三、交换机

四、Fanout交换机

1、 声明队列

2、 创建交换机

​编辑 3、 绑定交换机

4、示例 

五、Diect交换机

1、 声明队列

2、创建交换机

 3、绑定交换机

 4、示例

六、Topic交换机

1、创建队列

2、创建交换机 

3、绑定队列

4、示例

7、、声明队列交换机

1、SpringAMQP提供的类声明

2、基于注解声明

七、消息转换器

配置JSON转换器


一、简单模型

创建一个父工程和两个子工程consumer和publisher

1、首先控制台创建一个队列

命名为simple.queue

 

2、父工程导入依赖 

 <dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>

3、生产者配置文件

spring:rabbitmq:host: 192.168.200.129 # 你的虚拟机IPport: 5672 # 端口virtual-host: / # 虚拟主机username: admin # 用户名password: 123456 # 密码

 4、写测试类

@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, rabbitmq!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

 查看消息

 

5、消费者配置文件

spring:rabbitmq:host: 192.168.200.129 # 你的虚拟机IPport: 5672 # 端口virtual-host: / # 虚拟主机username: admin # 用户名password: 123456 # 密码

6、消费者接收消息

@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

 结果

二、WorkQueues模型

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。 此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了 

1、在控制台创建一个新的队列

命名为work.queue

2、生产者生产消息

/*** workQueue* 向队列中不停发送消息,模拟消息堆积。*/@Testpublic void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "work.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}

3、创建两个消费者接收消息

  @RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】");}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】");}

结果 

 

 如果消费者睡眠时间不同

 @RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】");Thread.sleep(20);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】");Thread.sleep(200);}
  • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
  • 消费者2 sleep了200毫秒,相当于每秒处理5个消息

 

消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。 

4、能者多劳充分利用每一个消费者的能力

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢。而最终总的执行耗时也大大提升。 正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

三、交换机

在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

四、Fanout交换机

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。 在广播模式下,消息发送流程是这样的:

 

  • 1)  可以有多个队列
  • 2)  每个队列都要绑定到Exchange(交换机)
  • 3)  生产者发送的消息,只能发送到交换机
  • 4)  交换机把消息发送给绑定过的所有队列
  • 5)  订阅队列的消费者都能拿到消息

1、 声明队列

 创建两个队列fanout.queue1fanout.queue2,绑定到交换机hmall.fanout

2、 创建交换机

创建一个名为fanout的交换机,类型是Fanout

 3、 绑定交换机

4、示例 

 生产者

/*** Fanout交换机*/@Testpublic void testFanoutExchange() {// 交换机名称String exchangeName = "mq.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);}

消费者

@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");}

 

五、Diect交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息 

1、 声明队列

首先在控制台声明两个队列direct.queue1direct.queue2

 

2、创建交换机

 

 3、绑定交换机

 一个队列绑定两个RoutingKey

 4、示例

生产者:RoutingKey为red

  /*** Direct交换机*/@Testpublic void testSendDirectExchange() {// 交换机名称String exchangeName = "mq.direct";// 消息String message = "hello direct red";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);}

消费者

@RabbitListener(queues = "direct.queue1")public void listenDirectQueue1(String msg) {System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "direct.queue2")public void listenDirectQueue2(String msg) {System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");}

 两个队列都收到消息

RoutingKey为blue

 /*** Direct交换机*/@Testpublic void testSendDirectExchange() {// 交换机名称String exchangeName = "mq.direct";// 消息String message = "hello direct blue";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);}

 只有队列2收到消息

六、Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。 只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

 

 示例

publisher发送的消息使用的RoutingKey共有四种:

  • china.news 代表有中国的新闻消息;
  • china.weather 代表中国的天气消息;
  • japan.news 则代表日本新闻
  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news

1、创建队列

2、创建交换机 

3、绑定队列

 

4、示例

生产者:RoutingKey为china.news

 /*** topicExchange*/@Testpublic void testSendTopicExchange() {// 交换机名称String exchangeName = "mq.topic";// 消息String message = "hello topis china.news";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);}

消费者

  @RabbitListener(queues = "topic.queue1")public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "topic.queue2")public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");}

 

 RoutingKey为china.people

 /*** topicExchange*/@Testpublic void testSendTopicExchange() {// 交换机名称String exchangeName = "mq.topic";// 消息String message = "hello topis china.people";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.people", message);}

只有消费者1收到消息 

7、、声明队列交换机

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

1、Queue:用于声明队列,可以用工厂类QueueBuilder构建
2、Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
3、Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

1、SpringAMQP提供的类声明

示例:创建Fanout交换机队列

@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange2(){return new FanoutExchange("mq.fanout2");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue3(){return new Queue("fanout.queue3");}/*** 绑定队列和交换机1*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue3, FanoutExchange fanoutExchange3){return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange3);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue4(){return new Queue("fanout.queue4");}/*** 绑定队列和交换机2*/@Beanpublic Binding bindingQueue2(){return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange2());}
}

 direct示例

@Configuration
public class DirectConfig {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("mq.direct2").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}

 direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding

2、基于注解声明

/*** 注解声明交换机* @param msg*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue3"),//队列名称exchange = @Exchange(name = "mq.direct", //交换机名称type = ExchangeTypes.DIRECT),//交换机类型key = {"red", "blue"}//RoutingKey))public void listenDirectQueue3(String msg){System.out.println("消费者3接收到direct.queue3的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue4"),exchange = @Exchange(name = "mq.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue4(String msg){System.out.println("消费者4接收到direct.queue4的消息:【" + msg + "】");}

 队列

 交换机

七、消息转换器

 @Testpublic void testSendMap() throws InterruptedException {// 准备消息Map<String,Object> msg = new HashMap<>();msg.put("name", "张三");msg.put("age", 21);// 发送消息rabbitTemplate.convertAndSend("object.queue", msg);}

当发送的数据为Objiet类型时会出现乱码现象,而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 只不过,默认情况下Spring采用的序列化方式是JDK序列化

配置JSON转换器

publisherconsumer两个服务中都引入依赖:

 <dependencies><!--mq消息转换为json--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version></dependency></dependencies>

注意:如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

结果

 消费者接收Object

我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:

  @RabbitListener(queues = "object.queue")public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {System.out.println("消费者接收到object.queue消息:【" + msg + "】");}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/166963.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面试官心声:个个都说会自动化,结果面试一问细节全露馅了

今年我们部门计划招聘几名自动化测试工程师&#xff0c;为此我进行了面试和培训&#xff0c;发现了一个让我感到担忧的趋势&#xff0c;许多候选人可以轻松地回答有关脚本编写、元素定位、框架API等问题。然而一问到实际项目&#xff0c;比如“如何从0开始搭建自动化体系”、“…

集成学习方法之随机森林-入门

1、 什么是集成学习方法 集成学习通过建立几个模型组合的来解决单一预测问题。它的工作原理是生成多个分类器/模型&#xff0c;各自独立地学习和作出预测。这些预测最后结合成组合预测&#xff0c;因此优于任何一个单分类的做出预测。 2、 什么是随机森林 在机器学习中&…

如何借助边缘智能网关打造智慧城市便民驿站

智慧城市驿站是一类提供多样化便利服务的新型智能公共设施&#xff0c;通过融合物联网技术、边缘智能技术、新能源技术等&#xff0c;为城市居民整合提供休闲、购物、卫生、广告、安全等公共服务&#xff0c;进一步提升日常生活体验。本篇就为大家介绍如何基于边缘智能网关&…

FPGA【紫光语法】

寄存器数据类型&#xff1a; reg 默认为 1 bit wide&#xff0c;如果超过 1 bit&#xff0c;则需要 range declaration 设置 reg 的位宽integer 默认位宽为 32 bit&#xff0c;不允许有 range declarationtime 默认位宽为 64 bit&#xff0c;不允许有 range declarat…

Redis常用配置详解

目录 一、Redis查看当前配置命令二、Redis基本配置三、RDB全量持久化配置&#xff08;默认开启&#xff09;四、AOF增量持久化配置五、Redis key过期监听配置六、Redis内存淘汰策略七、总结 一、Redis查看当前配置命令 # Redis查看当前全部配置信息 127.0.0.1:6379> CONFIG…

python:遗传算法(Genetic Algorithm,GA)求解23个测试函数

一、遗传算法 遗传算法&#xff08;Genetic Algorithm&#xff0c;GA&#xff09;起源于对生物系统所进行的计算机模拟研究&#xff0c;是一种随机全局搜索优化方法&#xff0c;它模拟了自然选择和遗传中发生的复制、交叉(crossover)和变异(mutation)等现象&#xff0c;从任一…

Android切换主题生命周期流程与onSaveInstanceState和onRestoreInstanceState,Kotlin

Android切换主题生命周期流程与onSaveInstanceState和onRestoreInstanceState&#xff0c;Kotlin import android.os.Bundle import android.util.Log import androidx.appcompat.app.AppCompatActivityclass MainActivity : AppCompatActivity() {private val TAG "fly&…

面试二总结

bean的生命周期&#xff1a; 数据库采用行级锁索引&#xff08;使用排他锁&#xff09;&#xff1a; mysql事务隔离级别 未提交读(Read uncommitted)是最低的隔离级别。通过名字我们就可以知道&#xff0c;在这种事务隔离级别下&#xff0c;一个事务可以读到另外一个事务未提交…

【C语言】#define宏与函数的优劣对比

本篇文章目录 1. 预处理指令#define宏2. #define定义标识符或宏&#xff0c;要不要最后加上分号&#xff1f;3.宏的参数替换后产生的运算符优先级问题3.1 问题产生3.2 不太完美的解决办法3.3 完美的解决办法 4.#define的替换规则5. 有副作用的宏参数6. 宏与函数的优劣对比6.1 宏…

【Linux】进程间通信——共享内存

目录 一、什么是共享内存 二、共享内存的原理 三、使用共享内存实现进程间通信 3.1 shmget接口 3.1.1 key形参详解 3.2 释放共享内存 3.2.1 ipcs指令 3.2.2 ipcrm指令 3.2.3 shmctl接口 3.3 关联共享内存 3.4 去关联共享内存 3.5 使用共享内存进行进程间通信实例 …

Java基础-IO流

目录 1 File 类的使用 1.1 File类的概念 1.2 构造方法 1.3 常用方法 1.4 课后练习 2 IO流原理及流的分类 2.1 IO原理 2.2 流的分类 2.3 IO流体系 2.4 接口方法 2.4.1 InputStream & Reader相同点 2.4.2 InputStream方法详解 2.4.3 Reader方法详解 2.4.4 Outp…

ant javac任务的fork和executable属性

ant javac任务是用于编译源文件的。 它的fork属性表示是否用JDK编译器在外部执行javac&#xff0c;取值可以为"yes"、“no”&#xff0c;默认值为"no"。 当fork属性的取值为"yes"时&#xff0c;可以用executable属性指明javac可执行文件的完全…

sql高级教程-索引

文章目录 架构简介1.连接层2.服务层3.引擎层4.存储层 索引优化背景目的劣势分类基本语法索引结构和适用场景 性能分析MySq| Query Optimizerexplain 索引优化单表优化两表优化三表优化 索引失效原因 架构简介 1.连接层 最上层是一些客户端和连接服务&#xff0c;包含本地sock通…

汽车屏类产品(五):中控IVI车载信息娱乐系统

前言: 车载信息娱乐系统(IVI)的起源可以追溯到20世纪,按钮调幅收音机被认为是第一个功能。从那以后,IVI系统在创造壮观的车内体验方面变得不可或缺,以至于汽车被称为“车轮上的智能手机”。但随着包括自动驾驶汽车在内的汽车技术的进步,以及对个性化体验的需求不断增长…

Leetcode1839. 所有元音按顺序排布的最长子字符串

Every day a Leetcode 题目来源&#xff1a;1839. 所有元音按顺序排布的最长子字符串 解法1&#xff1a;滑动窗口 要找的是最长美丽子字符串的长度&#xff0c;我们可以用滑动窗口解决。 设窗口内的子字符串为 window&#xff0c;每当 word[right] > window.back() 时&…

最短路相关笔记

Floyd Floyd 算法&#xff0c;是一种在图中求任意两点间最短路径的算法。 Floyd 算法适用于求解无负边权回路的图。 时间复杂度为 O ( n 3 ) O(n^3) O(n3)&#xff0c;空间复杂度 O ( n 2 ) O(n^2) O(n2)。 对于两点 ( i , j ) (i,j) (i,j) 之间的最短路径&#xff0c;有…

算法刷题-链表

算法刷题-链表 203. 移除链表元素 给你一个链表的头节点 head 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回 新的头节点 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,6,3,4,5,6], val 6 输出&#xff1a;[1,2,3,4,5]…

asp.net社区医疗辅助诊断网站系统VS开发sqlserver数据库web结构c#编程

一、源码特点 asp.net社区医疗辅助诊断网站系统 是一套完善的web设计管理系统&#xff0c;系统采用mvc模式&#xff08;BLLDALENTITY&#xff09;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为vs2010&#xff0c;数据库为sqlserver200…

基于白鲸优化的BP神经网络(分类应用) - 附代码

基于白鲸优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码 文章目录 基于白鲸优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码1.鸢尾花iris数据介绍2.数据集整理3.白鲸优化BP神经网络3.1 BP神经网络参数设置3.2 白鲸算法应用 4.测试结果&#xff1a;5.M…

C语言指针

指针 文章目录 指针1.指针概念2.指针变量2.1 定义指针变量2.2 引用指针变量2.3 指针变量作为函数参数 3.通过指针引用数组3.1数组元素的指针3.2 在引用数组元素时指针的运算3.3通过指针引用数组元素3.4用数组名作函数参数3.5 通过指针引用多维数组 4.通过指针引用字符串4.1字符…