【SpringCloud】RabbitMQ——五种方式实现发送和接收消息

SpringAMQP

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

1.Basic Queue 简单队列模型

1.1引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2消息发送

在publisher服务的application.yml中添加配置

spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: haojiale # 用户名password: 123321 # 密码

在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送

package com.example.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

1.3消息接收

在consumer服务的application.yml中添加的配置内容同消息发送一样

在consumer服务中编写监听器,消费消息

package com.example.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

2.WorkQueue

WorkQueue也被称为任务模型。就是让多个消费者绑定到一个队列,共同消费队列中的消息
在这里插入图片描述

当消息处理比较耗时的时候,可能生产消息的速度远远大于消息的消费速度,消息就会堆积越来越多,无法及时处理,使用work模型,多个消费者共同处理消息提升消费速度

2.1消息发送

循环发送消息,模拟大量消息堆积现象

在publisher服务中添加一个测试方法:

/*** workQueue* 向队列中不停发送消息,模拟消息堆积。*/
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

2.2消息接收

模拟多个消费者绑定同一个队列,在consumer服务中添加2个新的方法:

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

执行测试方法结果是消费者1很快处理完了自己的25条消息,消费者2却在缓慢处理自己的25条消息,也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力是存在问题的。

解决

修改consumer服务的application.yml文件配置,通过设置prefetch来控制消费者预取的消息数量

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

3.发布/订阅

发布订阅的模型图如图:
在这里插入图片描述

Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

3.1 Fanout

在这里插入图片描述

3.1.1声明队列和交换机

在这里插入图片描述

在consumer服务中声明队列和交换机

package com.example.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("example.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

3.1.2消息发送

在publisher服务中编写测试方法发送消息

@Test
public void testFanoutExchange() {// 队列名称String exchangeName = "example.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}

3.1.3消息接收

在consumer服务中添加两个方法,作为消费者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

3.1.4总结

交换机的作用:

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败情况下消息丢失
  • FanoutExchange会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

  • Queue
  • FanoutExchange
  • Binding

3.2 Direct

在这里插入图片描述

在Direct模型下:

  • 队列与交换机的绑定不能是任意绑定,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不会再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RountingKey与消息的RoutingKey完全一致,才会收到消息

3.2.1消息发送

在这里插入图片描述

在publisher服务中添加测试方法:

@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

3.2.2消息接收——基于注解声明交换机和队列

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

3.2.3总结

Direct交换机与Fanout交换机的区别?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

3.3 Topic

Topic类型的Exchang可以让队列在绑定Routing key的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词
在这里插入图片描述

举例:

item.#:能够匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

图示:

3.3.1消息发送

在这里插入图片描述

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

3.3.2消息接收

在consumer服务中添加方法:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/401411.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【CONDA】库冲突解决办法

如今&#xff0c;使用PYTHON作为开发语言时&#xff0c;或多或少都会使用到conda。安装Annaconda时一般都会选择在启动终端时进入conda的base环境。该操作&#xff0c;实际上是在~/.bashrc中添加如下脚本&#xff1a; # >>> conda initialize >>> # !! Cont…

Java基础之循环嵌套

循环嵌套 在一个循环内部可以嵌套另一个或多个循环。 外部循环每执行1次&#xff0c;内层循环会执行1轮(全部)。 案例1&#xff1a; 连续3天&#xff0c;每天都要表白5次。 package com.briup.chap03;public class Test03_Nest {public static void main(String[] args) {…

XMGoat:一款针对Azure的环境安全检测工具

关于XMGoat XMGoat是一款针对Azure的环境安全检测工具&#xff0c;XM Goat 由 XM Cyber Terraform 模板组成&#xff0c;可帮助您了解常见的 Azure 安全问题。每个模板都是一个用于安全技术学习的靶机环境&#xff0c;包含了一些严重的配置错误。 在该工具的帮助下&#xff0c…

File的概述和构造方法

一.路径&#xff1a; 相对路径开头不带盘符。 二.File&#xff1a; 1.File对象&#xff1a; File对象就表示一个路径&#xff0c;可以是文件的路径&#xff0c;也可以是文件夹的路径&#xff0c; 这个路径可以是存在的&#xff0c;也可以是不存在的。 2.File对象常见的构造…

SpringCloud完整教程

一下内容为本人在听黑马程序员的课程时整理的 微服务技术栈 ⎛⎝≥⏝⏝≤⎛⎝ ⎛⎝≥⏝⏝≤⎛⎝ ⎛⎝≥⏝⏝≤⎛⎝ ⎛⎝≥⏝⏝≤⎛⎝ 1、微服务框架 1.1、认识微服务 1.1.1、服务架构演变 **单体架构&#xff1a;**将业务的所有功能集中在一个项目中开发&#xff0c;打包成…

华为云Api调用怎么生成Authorization鉴权信息,StringToSign拼接流程

请求示例 Authorization 为了安全&#xff0c;华为云的 Api 调用都是需要在请求的 Header 中携带 Authorization 鉴权的&#xff0c;这个鉴权15分钟内有效&#xff0c;超过15分钟就不能用了&#xff0c;而且是需要调用方自己手动拼接的。 Authorization的格式为 OBS 用户AK:…

Linux系统移植——开发板烧写

目录&#xff1a; 目录&#xff1a; 一、什么是EMMC分区&#xff1f; 1.1 eMMC分区 1.2 分区的管理 二、相关命令介绍&#xff1a; 2.1 mmc 2.1.1 主要功能 2.1.2 示例用法 2.2 fdisk 2.2.1 基本功能 2.2.2 交互模式常用命令 2.2.3 注意事项 三、U-BOOT烧写 3.1 mmc命令 3.2 f…

【Linux入门】Linux环境搭建

目录 前言 一、发行版本 二、搭建Linux环境 1.Linux环境搭建方式 2.虚拟机安装Ubuntu 22.02.4 1&#xff09;安装VMWare 2&#xff09;下载镜像源 3&#xff09;添加虚拟机 4&#xff09;换源 5&#xff09;安装VM Tools 6)添加快照 总结 前言 Linux是一款自由和开放…

JAVA集中学习第五周学习记录(二)

系列文章目录 第一章 JAVA集中学习第一周学习记录(一) 第二章 JAVA集中学习第一周项目实践 第三章 JAVA集中学习第一周学习记录(二) 第四章 JAVA集中学习第一周课后习题 第五章 JAVA集中学习第二周学习记录(一) 第六章 JAVA集中学习第二周项目实践 第七章 JAVA集中学习第二周学…

RCE远程命令执行

命令执行的常用函数 system()&#xff1a;能将字符串作为系统命令执行&#xff0c;且返回命令执行结果。 #system(string $command, int &$result_code null): string|false system(whoami); exec()&#xff1a;能将字符串作为系统命令执行&#xff0c;但是只返回执行结果…

MySQL 的 InnoDB 缓冲池里有什么?--InnoDB存储梳理(二)

文章目录 缓冲池的配置介绍一张表 INNODB_BUFFER_POOL_PAGES字段解释 缓冲池的配置 以下配置的意思&#xff0c;缓冲池在内存中的大小为20M&#xff1b;只有1个缓冲池实例&#xff1b;每一块的大小&#xff0c;插入缓冲占的百分比 # InnoDB 缓存池配置 innodb_buffer_pool_si…

Python之循环语句

这是《Python入门经典以解决计算问题为导向的Python编程实践》中58-65的内容&#xff0c;主要将了while循环语句和for循环语句。 循环 一、while循环语句语法&#xff1a;工作原理&#xff1a;案例解读要点 二、for循环语句语法工作原理、案例&#xff1a;寻找完全数 三、whil…

学习记录——day30 网络编程 端口号port 套接字socket TCP实现网络通信

目录 一、端口号 port 二、套接字 socket 1、原理 2、socket函数介绍 三、TCP实现网络通信 1、原理 2、TCP通信原理图 3、TCP相关函数 1&#xff09;bind 绑定 2&#xff09;listen 监听 3&#xff09;accept 接收连接请求 4&#xff09;recv 接收 5&#xff09;sen…

Ubuntu系统中安装ffmpeg工具(详细图文教程)

&#x1f4aa; 专业从事且热爱图像处理&#xff0c;图像处理专栏更新如下&#x1f447;&#xff1a; &#x1f4dd;《图像去噪》 &#x1f4dd;《超分辨率重建》 &#x1f4dd;《语义分割》 &#x1f4dd;《风格迁移》 &#x1f4dd;《目标检测》 &#x1f4dd;《暗光增强》 &a…

RAG:系统评估,以RAGAS为例

面试的时候经常会问到&#xff0c;模型和系统是怎么评估的&#xff0c;尤其是RAG&#xff0c;这么多组件&#xff0c;还有端到端&#xff0c;每部分有哪些指标评估&#xff0c;怎么实现的。今天整理下 目前最通用的是RAGAS框架&#xff0c;已经在langchain集成了。在看它之前&…

Java面试--设计模式

设计模式 目录 设计模式1.单例模式&#xff1f;2.代理模式&#xff1f;3.策略模式&#xff1f;4.工厂模式&#xff1f; 1.单例模式&#xff1f; 单例模式是Java的一种设计思想&#xff0c;用此模式下&#xff0c;某个对象在jvm只允许有一个实例&#xff0c;防止这个对象多次引…

文本分类任务算法演变(一)

文本分类任务算法演变 1.简介和应用场景1.1使用场景-打标签1.2使用场景-电商评论分析1.3使用场景-违规检测1.4使用场景-放开想象空间 2贝叶斯算法2.1预备知识-全概率公式2.2贝叶斯公式2.3文本分类中的应用2.3.1任务如下 2.4贝叶斯的优缺点 3.支持向量机3.1支持向量机-决策函数3…

libnl教程(2):发送请求

文章目录 前言示例示例代码构造请求创建套接字发送请求 简化示例 前言 前置阅读要求&#xff1a;libnl教程(1):订阅内核的netlink广播通知 本文介绍&#xff0c;libnl如何向内核发送请求。这包含三个部分&#xff1a;构建请求&#xff1b;创建套接字&#xff1b;发送请求。 …

Web开发:web服务器-Nginx的基础介绍(含AI文稿)

目录 一、Nginx的功能&#xff1a; 二、正向代理和反向代理的区别 三、Nginx负载均衡的主要功能 四、nginx安装目录下的各个文件&#xff08;夹&#xff09;的作用&#xff1a; 五、常用命令 一、Nginx的功能&#xff1a; 1.反向代理&#xff1a;例如我有三台服务器&#x…

大数据项目——实战项目:广告数仓(第二部分)集群环境部署

目录 第4章 广告数仓架构设计 第5章 集群环境准备 5.1 服务器准备 5.1.1 创建3台虚拟机 5.1.2 SSH无密登录配置 5.1.3 编写集群分发脚本xsync 5.1.4 JDK准备 5.1.5 环境变量配置说明 5.2 Hadoop部署 5.2.1 完全分布式运行模式&#xff08;开发重点&#xff09; 5.2…