消息队列-RabbitMQ(二)

接上文《消息队列-RabbitMQ(一)》

在这里插入图片描述

@Configuration
public class RabbitMqConfig {// 消息的消费方json数据的反序列化@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}// 定义使用json的方式转换数据@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate amqpTemplate = new RabbitTemplate();amqpTemplate.setConnectionFactory(connectionFactory);amqpTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return amqpTemplate;}}
@Service
public class TestConsumer {@RabbitListener(queuesToDeclare = {@Queue("simpleQueue")})public void simpleModel(User user) {System.out.println("接收消息message=" + user);}// value=@Queue 创建临时队列// exchange创建交换机@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))})public void receiveMessage1(User user) {System.out.println(String.format("消费者 【one】: %s", user));}@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))})public void receiveMessage2(User user) {System.out.println(String.format("消费者 【two】: %s", user));}}
@RestController
@RequestMapping("/rabbitMqReq")
public class RabbitMqController {@Autowiredprivate TestProducer testProducer;@PostMapping("/simple")public void simple(@RequestBody User user) {testProducer.simpleMessageSend();}@PostMapping("/fanoutMessageSend")public void fanoutMessageSend(@RequestBody User user) {testProducer.fanoutMessageSend();}
}
@Data
public class User {private String userId; //用户编号private String userName; //用户姓名
}
@Service
public class TestProducer {@Resourceprivate RabbitTemplate rabbitTemplate;public void simpleMessageSend() {System.out.println("simpleMessageSend");User user = new User();user.setUserId("1001");user.setUserName("张三");rabbitTemplate.convertAndSend("simpleQueue", user);}// fanout模型public void fanoutMessageSend() {for (int i = 0; i < 5; i++) {User user = new User();user.setUserId("1001");user.setUserName("张三");rabbitTemplate.convertAndSend("fanout-ex", "", user);}try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}}}
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
public class App
{public static void main(String[] args){SpringApplication.run(App.class, args);System.out.println("(♥◠‿◠)ノ゙  启动成功   ლ(´ڡ`ლ)゙ ");}
}
# 开发环境配置
server:# 服务器的HTTP端口,默认为8080port: 8899servlet:# 应用的访问路径context-path: /tomcat:# tomcat的URI编码uri-encoding: UTF-8# tomcat最大线程数,默认为200max-threads: 800# Tomcat启动初始化的线程数,默认值25min-spare-threads: 30
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>RabbitMQDemo</artifactId><version>1.0-SNAPSHOT</version><name>RabbitMQDemo</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.1.RELEASE</version><relativePath /></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><fastjson.version>1.2.70</fastjson.version></properties><dependencies><!-- SpringBoot 核心包 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- AMQP客户端 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 阿里JSON解析器 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency></dependencies><repositories><!--阿里云镜像仓库--><repository><id>public</id><name>aliyun nexus</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases></repository><!--oracle驱动没有发布到中央仓库,只能从此仓库下载--><repository><id>jeecg</id><name>jeecg Repository</name><url>http://maven.jeewx.com/nexus/content/repositories/jeecg</url><snapshots><enabled>false</enabled></snapshots></repository></repositories></project>

什么是事务?
数据库事务。原子性、一致性、隔离性、持久性。

本地事务?
整个服务操作只能涉及一个单一的数据库资源。

分布式事务?
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。分布式事务就是为了保证不同数据库的数据一致性。

分布式事务应用架构?
微服务架构的分布式应用环境下,越来越多的应用要求对多个数据库资源,多个服务的访问都能纳入到同一个事务当中,分布式事务应运而生。

分布式事务有几种解决方案: 两阶段提交/XA MQ
第一阶段(prepare):即所有的参与者RM准备执行事务并锁住需要的资源。参与者ready时,向TM报告已准备就绪。
第二阶段 (commit/rollback):当事务管理者™确认所有参与者(RM)都ready后,向所有参与者发送commit命令。

https://blog.csdn.net/qq_43410878/article/details/123656765 RabbitMQ使用详解

RabbitMQ与springboot整合
1、添加依赖 spring-boot-starter-amqp
2、配置RabbitConfig,包含RabbitListenerContainerFactory、RabbitTemplate

简单模型
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void simpleMessageSend() {rabbitTemplate.convertAndSend("simpleQueue", new User(1, "张"));
}

}

@RabbitListener(queuesToDeclare = {@Queue(“simpleQueue”)})
public void simpleModel(User user) {
log.info(“message: {}”, user);
}

发布订阅模型
// fanout模型
@Test
public void fanoutMessageSend() {
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend(“fanout-ex”, “”, new User(i, “张三”));
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// value=@Queue 创建临时队列
// exchange创建交换机
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = “fanout-ex”, type = ExchangeTypes.FANOUT))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = “fanout-ex”, type = ExchangeTypes.FANOUT))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

直连模式(direct)
@Test
public void directMessageSend() {
//rabbitTemplate.convertAndSend(“direct-ex”, “success”, new User(2, “张三”));
rabbitTemplate.convertAndSend(“direct-ex”, “error”, new User(2, “张三”));
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“error”, “success”},
exchange = @Exchange(value = “direct-ex”, type = ExchangeTypes.DIRECT))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“error”},
exchange = @Exchange(value = “direct-ex”, type = ExchangeTypes.DIRECT))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

topic模型
@Test
public void topicMessageSend() {
//rabbitTemplate.convertAndSend(“topic-ex”, “company”, new User(2, “张三”));
rabbitTemplate.convertAndSend(“topic-ex”, “company.java”, new User(2, “张三”));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.#”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.java.#”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.html.*”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage3(User user) {
System.out.println(String.format(“消费者 【three】: %s”, user));
}

消息的手动确认
spring:
rabbitmq:
listener:
simple:
# 提交方式为手动
acknowledge-mode: MANUAL

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

面试:a. 如何保证消息不丢失?

​ b. 如何保证消息的不重复消费?

​ c. 如何使用mq来是实现分布式事务?

​ d. 在工作中mq用在哪里?支付回调。

RabbitMQ如何做到消息不丢失?

1.持久化

发送消息时设置delivery_mode属性为2,使消息被持久化保存到磁盘,即使RabbitMQ服务器宕机也能保证消息不丢失。同时,创建队列时设置durable属性为True,以确保队列也被持久化保存。

2.确认机制

消费者通过basic.ack命令向RabbitMQ服务器确认已经消费了消息。如果消费者处理消息时发生错误或宕机,RabbitMQ会重新将消息发送给其他消费者。RabbitMQ在接收到消费者确认消息前会将消息保存在内存中,在确认后才会删除消息。

3.发布者确认

RabbitMQ支持发布者确认机制,即发布者在将消息发送到队列后,等待RabbitMQ服务器的确认消息。成功保存到队列的消息会返回确认消息给发布者,如果无法保存则返回Nack(Negative Acknowledgement)消息。通过发布者确认机制,可以确保消息成功发送到RabbitMQ服务器。

4.备份队列

RabbitMQ支持备份队列(Alternate Exchange)机制,即在消息发送到队列之前,先将消息发送到备份队列。如果主队列无法接收消息,RabbitMQ会将消息发送到备份队列中。备份队列通常是一个交换机,在创建队列时可以通过x-dead-letter-exchange属性指定备份队列。

三、注意事项 在使用RabbitMQ消息确认机制时,需注意以下几点:

1、确认模式的选择: 根据具体业务需求选择合适的确认模式。如果对消息传递的可靠性要求较高,建议使用手动确认模式或批量确认模式。

2、设置消息持久化: 在发布消息时,通过设置消息的持久化属性,可以确保即使RabbitMQ服务器意外关闭或重启,消息仍然能够被保存下来。

3、处理未确认消息: 如果消费者在处理消息时发生异常,导致无法发送确认信号给RabbitMQ,那么这些消息将保持在队列中,并可能被重新投递。需要设定适当的重试策略和错误处理机制来处理这些未确认的消息。

4、监听确认超时: 在使用批量确认模式时,如果长时间没有收到确认信号,需要设置合理的超时时间,并采取相应措施来处理超时的情况。

RabbitMQ消息确认机制详解:确保交付成功

消息确认机制是通过发布者(Producer)和消费者(Consumer)之间的交互来实现的。
当发布者发送消息到RabbitMQ后,会等待确认结果。如果消息成功被消费者接收并处理,消费者会发送一个确认信号给RabbitMQ,告知消息已经处理完成。而RabbitMQ则会根据接收到的确认信号,判断消息是否成功交付。
消息确认机制一般有两种模式:简单模式和批量模式。在简单模式中,每发送一条消息后就等待确认;而在批量模式中,可以一次发送多条消息,然后等待批量确认。无论是简单模式还是批量模式,都可以提高消息传递的可靠性。

https://blog.csdn.net/weixin_52278591/article/details/128841155 RabbitMq 消息确认机制详解

https://blog.csdn.net/u011942456/article/details/128198956 总结RabbitMq消息丢失和消息重复消费问题

https://www.zhihu.com/question/54756562?utm_id=0 rabbitmq是什么,主要用于哪些方面?

https://zhuanlan.zhihu.com/p/583520436?utm_id=0 用了8年MQ!聊聊消息队列的技术选型,哪个最香!

https://blog.csdn.net/qq_43410878/article/details/123656765 RabbitMQ使用详解

https://zhuanlan.zhihu.com/p/590948541 绝对详细的 RabbitMQ入门,看完本系列就够了

MQ的优势和劣势

https://cloud.tencent.com/developer/article/2113514 面试百问:使用MQ的优势、劣势以及问题

rabbitmq basicAck

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/146763.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何用画图将另一个图片中的成分复制粘贴?

一、画图是什么&#xff1f; 画图是Windows自带的一个附件&#xff0c;可于菜单中的Windows附件文件夹中找到&#xff08;自带的为2D画图&#xff0c;有需要的可以下载3D画图&#xff09;&#xff0c;可以用来编辑或查看图片&#xff0c;也可以用来绘制图片&#xff0c;并将图…

扫地机器人经营商城小程序的作用是什么

扫地机器人对人们生活大有帮助&#xff0c;近些年也有不少企业开创品牌&#xff0c;在电商平台每年销量也非常高&#xff0c;同行竞争激烈及私域化程度加深情况下&#xff0c;虽然第三方平台或线下方式也有生意&#xff0c;但互联网电商发展也为商家们带来了诸多痛点。 那么通…

Ant-Design-Vue:a-range-picker组件国际化配置

在使用Ant-Design-Vue中的时间范围选择器开发个人项目时&#xff0c;发现默认显示为英文。如何解决呢&#xff1f; date-picker分类 Antd-Vue提供了DatePicker、MonthPicker、RangePicker、WeekPicker 几种类型的时间选择器&#xff0c;分别用于选择日期、月份、日期范围、周范…

TensorFlow入门(一、环境搭建)

一、下载安装Anaconda 下载地址:http://www.anaconda.comhttp://www.anaconda.com 下载完成后运行exe进行安装 二、下载cuda 下载地址:http://developer.nvidia.com/cuda-downloadshttp://developer.nvidia.com/cuda-downloads 下载完成后运行exe进行安装 安装后winR cmd进…

快速开发微信小程序之一登录认证

一、背景 记得11、12年的时候大家一窝蜂的开始做客户端Android、IOS开发&#xff0c;我是14年才开始做Andoird开发&#xff0c;干了两年多&#xff0c;然后18年左右微信小程序火了&#xff0c;我也做了两个小程序&#xff0c;一个是将原有牛奶公众号的功能迁移到小程序&#x…

CSS基础语法第二天

目录 一、复合选择器 1.1 后代选择器 1.2 子代选择器 1.3 并集选择器 1.4 交集选择器 1.4.1超链接伪类 二、CSS特性 2.1 继承性 2.2 层叠性 2.3 优先级 基础选择器 复合选择器-叠加 三、Emmet 写法 3.1HTML标签 3.2CSS 四、背景属性 4.1 背景图 4.2 平铺方式 …

(Note)机器学习面试题

机器学习 1.两位同事从上海出发前往深圳出差&#xff0c;他们在不同时间出发&#xff0c;搭乘的交通工具也不同&#xff0c;能准确描述两者“上海到深圳”距离差别的是&#xff1a; A.欧式距离 B.余弦距离 C.曼哈顿距离 D.切比雪夫距离 S:D 1. 欧几里得距离 计算公式&#x…

2023.10.02 win7x64sp1下Navicat_Premium15_x86连接Oracle_10g(安装在win2003x86)

Oracle_10g安装在这个版本的系统里: Microsoft Windows [版本 5.2.3790] 这个win2003_x86(分配内存1G)安装在vmware虚拟机里. 安装包文件名为:oracle 10g_win32.zip 大小约624 MB (655,025,354 字节) 安装完毕后,tcp1521端口应该开放: Microsoft Windows [版本 5.2.3790]…

网络工程师就业和身高有关系吗

大家好&#xff0c;我是网络工程师成长日记实验室的郑老师&#xff0c;您现在正在查看的是网络工程师成长日记专栏&#xff0c;记录网络工程师日常生活的点点滴滴 一个哥们问我说&#xff0c;他说他现在准备学网络工程师&#xff0c;但是他男生&#xff0c;他说他个子不是太高。…

IDEA的使用

文章目录 1.IDEA配置1.1 idea界面说明1.2 git1.3 JDK1.4 maven1.5 Tomcat1.6 idea设置编码格式1.7 vscodenodejs1.8 windows下安装redis 2. IDEA问题2.1 setAttribute方法爆红2.2 idea cannot download sources解决办法2.3 springboot项目跑起来不停run 3. vscode3.1 vscode显示…

ThreeJS - 封装一个GLB模型展示组件(TypeScript)

一、引言 最近基于Three.JS&#xff0c;使用class封装了一个GLB模型展示&#xff0c;支持TypeScript、支持不同框架使用&#xff0c;具有多种功能。 &#xff08;下图展示一些基础的功能&#xff0c;可以自行扩展&#xff0c;比如光源等&#xff09; 二、主要代码 本模块依赖…

博客无限滚动加载(html、css、js)实现

介绍 这是一个简单实现了类似博客瀑布流加载功能的页面&#xff0c;使用html、css、js实现。简单易懂&#xff0c;值得学习借鉴。&#x1f44d; 演示地址&#xff1a;https://i_dog.gitee.io/easy-web-projects/infinite_scroll_blog/index.html 代码 index.html <!DOCT…

Mac上如何修复损坏的音频?试试iZotope RX 10,对音频进行处理,提高音频质量!

iZotope RX 10是一款由iZotope公司开发的音频修复和编辑软件。它被广泛用于电影、电视、音乐和游戏等行业的音频后期制作&#xff0c;以及声音设计和修复工作。 在RX 10中&#xff0c;iZotope从头开始重新设计了全新的Repair Assistant修复助手&#xff0c;并且推出了相应的修…

Echarts 教程一

Echarts 教程一 可视化大屏幕适配方案可视化大屏幕布局方案Echart 图表通用配置部分解决方案1. titile2. tooltip3. xAxis / yAxis 常用配置4. legend5. grid6. series7.color Echarts API 使用全局echarts对象echarts实例对象 可视化大屏幕适配方案 rem flexible.js 关于flex…

蓝桥杯每日一题2023.10.1

路径 - 蓝桥云课 (lanqiao.cn) 题目分析 求最短路问题&#xff0c;有多种解法&#xff0c;下面介绍两种蓝桥杯最常用到的两种解法 方法一 Floyd&#xff08;求任意两点之间的最短路&#xff09;注&#xff1a;不能有负权回路 初始化每个点到每个点的距离都为0x3f这样才能对…

Netgear R6700v3 1.0.4.102(CVE-2021-27239)

漏洞信息 此漏洞允许网络相邻攻击者在受影响的NETGEAR R6400和R6700固件版本1.0.4.98路由器上执行任意代码。利用此漏洞不需要身份验证。该漏洞存在于upnpd服务中&#xff0c;upnpd服务默认监听UDP 1900端口。SSDP消息中精心制作的MX报头字段可能会触发固定长度的基于堆栈的缓…

经典算法-----迷宫问题(栈的应用)

目录 前言 迷宫问题 算法思路 1.栈的使用方法 ​编辑2.方向的定义 代码实现 栈的cpp代码&#xff1a; 栈的头文件h代码: 走迷宫代码&#xff1a; 前言 今天学习一种算法问题&#xff0c;也就是我们常见的迷宫问题&#xff0c;本期我们通过前面学习过的数据结构---栈来…

(四)动态阈值分割

文章目录 一、基本概念二、实例解析 一、基本概念 基于局部阈值分割的dyn_threshold()算子&#xff0c;适用于一些无法用单一灰度进行分割的情况&#xff0c;如背景比较复杂&#xff0c;有的部分比前景目标亮&#xff0c;或者有的部分比前景目标暗&#xff1b;又比如前景目标包…

【C++进阶之路】C++11(上)

文章目录 一、列表初始化1.{}2.initializer_list 二、声明1.auto2.deltype 三、右值与左值1.基本概念2.应用场景1.左值引用2.右值引用3.完美转发4.万能引用 四、新增默认成员函数五、lambda表达式1.基本语法1.1捕捉列表1.2参数列表1.3返回类型1.4函数体 2.底层原理 总结 一、列…

一个高精度24位ADC芯片ADS1222的使用方法及参考电路程序成都控制器定制

前一段时间&#xff0c;在做单片机、PLC、电路板、控制器/箱、仪器仪表、机电设备或系统、自动化、工控、传感、数据采集、自控系统、控制系统&#xff0c;物联网&#xff0c;电子产品&#xff0c;软件、APP开发设计定制定做开发项目时&#xff0c;有要求用到24位的高精度ADC&a…