手搭手RocketMQ重试机制

环境介绍

技术栈

springboot+mybatis-plus+mysql+rocketmq

软件

版本

mysql

8

IDEA

IntelliJ IDEA 2022.2.1

JDK

17

Spring Boot

3.1.7

dynamic-datasource

3.6.1

mybatis-plus

3.5.3.2

rocketmq

4.9.4

加入依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><!-- 排除logback依赖 --><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><!--Log4j2场景启动器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3</version><exclusions><exclusion><groupId>com.baomidou</groupId><artifactId>mybatis-plus-generator</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.14</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.6.1</version></dependency><dependency><groupId>p6spy</groupId><artifactId>p6spy</artifactId><version>3.9.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.2</version></dependency></dependencies>

Broker:经纪人(经理人)

Topic主题:消息区分,分类,虚拟结构

Queue:消息队列

Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

Rocket重试机制

RocketMQ的重试机制是指:当消费者消费消息失败时,RocketMQ会在一定时间后重新将消息发送给消费者进行消费,以确保消息的可靠消费。

自动重试:Consumer在消费失败后,会在一定重试策略下定期重试消费失败的消息,直到成功或达到最大重试次数。

消息重发:如果Consumer在最大重试次数内仍然消费失败,Broker会定期扫描被标记为消费失败的消息,并将其重发给其他Consumer。

灵活的重试策略:RocketMQ提供多种重试策略来控制重试时机和频率,主要有:

生产者重试

生产者设置消息失败后重试次数

//同步
producer.setRetryTimesWhenSendFailed(3);
//异步
producer.setRetryTimesWhenSendAsyncFailed(2);

Int 重试的次数

//重试

@Test
void retryProducerTest()throws Exception{//创建生产者DefaultMQProducer producer = new DefaultMQProducer("retryGroup");//连接namesrvproducer.setNamesrvAddr("192.168.68.133:9876");//启动producer.start();producer.setDefaultTopicQueueNums(1);//自身业务key唯一String Key = UUID.randomUUID().toString();System.out.println(Key);//重试//同步producer.setRetryTimesWhenSendFailed(3);//异步//producer.setRetryTimesWhenSendAsyncFailed(2);//创建消息Message message = new Message("retry", null,Key, "重试测试内存内容".getBytes());//发送消息producer.send(message);System.out.println("发送成功");//关闭生产者producer.shutdown();
}

消费者重试

设置自定义最大重试

consumer.setMaxReconsumeTimes(6);

死信消息(超过重试次数,并未处理的消息),放在死信主题中,%DLQ% retry

@Test
void retryConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//订阅主题   *表示该主题的所有消息consumer.subscribe("retry","*");//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {System.out.println(new Date());System.out.println("消息内容"+new String(messageExt.getBody()));}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

死信处理方案

死信处理方案1、单独订阅死信主题2、监听死信主题(业务流程控制)

通过存入单独数据库表,业务发送短信等方式通知人工处理

1、单独订阅死信主题

单独订阅监听主题

@Test
void retryDeadMonitorConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//订阅死信主题   *表示该主题的所有消息consumer.subscribe("%DLQ%retryConsumerTest","*");//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {System.out.println(new Date());System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

2、监听死信主题(业务流程控制)

通过业务流程监听多个主题

//死信处理方案二、监听死信主题

@Test
void retryDeadMonitorConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//设置每次重试次数consumer.setMaxReconsumeTimes(3);// 订阅需要的多个主题列表List<String> topics = Arrays.asList("retry", "TopicA", "TopicB");// 订阅主题列表中的所有主题for (String topic : topics) {consumer.subscribe(topic, "*"); // 这里的tag是用来过滤消息的,"*" 表示接收这个主题下的所有消息}//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {try{//业务代码System.out.println("业务代码");System.out.println("消息内容"+new String(messageExt.getBody()));int i =1/0;//模拟代码出错}catch (Exception e){//获取重试次数int reconsumeTimes = messageExt.getReconsumeTimes();String key = messageExt.getKeys();if (reconsumeTimes > 2){OrderLog orderLog = new OrderLog();orderLog.setType(2);orderLog.setOrderid(key);orderLog.setUsername("重试超过2次,死信消息");orderMapper.insert(orderLog);System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));//发送短信通知人工处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}else {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/277270.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Poi-tl Documentation】自定义占位符来设置图片大小

前置说明&#xff1a; <dependency><groupId>com.deepoove</groupId><artifactId>poi-tl</artifactId><version>1.12.1</version> </dependency>模板文件&#xff1a; image_test.docx package run.siyuan.poi.tl.policy;imp…

调皮的String及多种玩法(下部)

&#x1f468;‍&#x1f4bb;作者简介&#xff1a;&#x1f468;&#x1f3fb;‍&#x1f393;告别&#xff0c;今天 &#x1f4d4;高质量专栏 &#xff1a;☕java趣味之旅 欢迎&#x1f64f;点赞&#x1f5e3;️评论&#x1f4e5;收藏&#x1f493;关注 &#x1f496;衷心的希…

某赛通电子文档安全管理系统 DecryptApplication 任意文件读取漏洞复现

0x01 产品简介 某赛通电子文档安全管理系统(简称:CDG)是一款电子文档安全加密软件,该系统利用驱动层透明加密技术,通过对电子文档的加密保护,防止内部员工泄密和外部人员非法窃取企业核心重要数据资产,对电子文档进行全生命周期防护,系统具有透明加密、主动加密、智能…

国创证券策略:股指预计维持震荡格局 关注汽车、通信设备等板块

国创证券指出&#xff0c;近期两市指数持续反弹创新高&#xff0c;但沪指现已率先出现滞涨状况&#xff0c;一起均已进入阻力重压区。不过当时技术形状上坚持较好&#xff0c;可持续做多&#xff0c;一旦跌破重要支撑如沪指的3030点&#xff0c;则需降仓防卫&#xff0c;防止指…

Python 导入Excel三维坐标数据 生成三维曲面地形图(面) 1、线条折线曲面

环境和包: 环境 python:python-3.12.0-amd64包: matplotlib 3.8.2 pandas 2.1.4 openpyxl 3.1.2 代码: import pandas as pd import matplotlib.pyplot as plt import numpy as np from mpl_toolkits.mplot3d import Axes3D from matplotlib.colors import ListedColor…

释放人工智能的力量:GPU服务器托管和高电机柜托管的关键作用

随着人工智能技术的不断发展&#xff0c;GPU服务器托管和高电机柜托管也变得愈发重要。这些技术在人工智能领域发挥着关键作用&#xff0c;为AI算法的训练和推理提供了强大的计算支持。 GPU服务器托管是指将GPU服务器放置在专门的数据中心中&#xff0c;通过云服务提供商提供的…

网络学习:BGP路径属性分类

目录 前言&#xff1a; 路径属性分类 公认必遵 公认任意 可选过渡 可选非过渡 前言&#xff1a; 在默认情况下&#xff0c;到达同一目的地&#xff0c;BGP只走单条路径&#xff0c;并不会在多条路径之间执行负载均衡。对于IGP路由协议&#xff0c;当有多条路径可以到达同…

RPC通信原理(一)

RPC通信原理 RPC的概念 如果现在我有一个电商项目&#xff0c;用户要查询订单&#xff0c;自然而然是通过Service接口来调用订单的实现类。 我们把用户模块和订单模块都放在一起&#xff0c;打包成一个war包&#xff0c;然后再tomcat上运行&#xff0c;tomcat占有一个进程&am…

Netty线程模型详解

文章目录 概述单Reactor单线程模型单Reactor多线程模型主从Reactor多线程模型 概述 Netty的线程模型采用了Reactor模式&#xff0c;即一个或多个EventLoop轮询各自的任务队列&#xff0c;当发现有任务时&#xff0c;就处理它们。Netty支持单线程模型、多线程模型和混合线程模型…

chown: changing ownership of ‘.‘: Permission denied 的一种解法

前言 最近在新电脑用 colima docker 启动服务遇到了这样的报错 chown: changing ownership of .: Permission denied在网上搜索了很久&#xff0c;不管是google还是stack overflow都没有突破口&#xff0c;只要绑定了 volumes 就会报错&#xff0c;按照网上说的方法&#xff…

【Unity】读取Json的三种方法(JsonUtility,LitJson,Newtonsoft)

介绍 在Unity开发过程中&#xff0c;Json是比较常用的一种数据存储文本&#xff0c;尤其是在和第三方交互中&#xff0c;基本都是json格式。 先给出一个Json示例&#xff0c;我们来看看是如何解析的。 {"Player": [{"id": 1001,"name": "…

【JVM】GCRoot

GC root原理 通过对枚举GCroot对象做引用可达性分析&#xff0c;即从GC root对象开始&#xff0c;向下搜索&#xff0c;形成的路径称之为 引用链。如果一个对象到GC roots对象没有任何引用&#xff0c;没有形成引用链&#xff0c;那么该对象等待GC回收。 可以作为GC Roots的对…

云原生基础知识:容器技术的历史

容器化的定义&#xff1a; 容器化是一种轻量级的虚拟化技术&#xff0c;将应用程序及其所有依赖项&#xff08;包括运行时、系统工具、系统库等&#xff09;打包到一个称为容器的单独单元中。容器提供了一种隔离的执行环境&#xff0c;使得应用程序可以在不同的环境中运行&…

居民健康监测小程序|基于微信小程序的居民健康监测小程序设计与实现(源码+数据库+文档)

居民健康监测小程序目录 目录 基于微信小程序的居民健康监测小程序设计与实现 一、前言 二、系统设计 三、系统功能设计 1、用户信息管理 2、健康科普管理 5.3公告类型管理 3、论坛信息管理 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推…

LeetCode每日一题——移除元素

移除元素OJ链接&#xff1a;27. 移除元素 - 力扣&#xff08;LeetCode&#xff09; 题目&#xff1a; 思路&#xff1a; 题目给定要求只能使用O(1)的额外空间并且原地修改输入数组&#xff0c;然后返回移除后的数组行长度。那 么我们就可以确我没有办法建立临时的数组存放我…

Arduino IDE的下载和安装

一、Arduino的介绍 Arduino是一款开源电子原型平台&#xff0c;主要包含两部分&#xff1a;硬件&#xff08;各种型号的Arduino板&#xff09;和软件&#xff08;Arduino IDE&#xff09;。这个平台由意大利的Massimo Banzi、David Cuartielles等人共同开发设计&#xff0c;并于…

基于springboot实现小区物业管理系统项目【项目源码+论文说明】

基于springboot实现小区物业管理系统演示 摘要 随着城镇人口居住的集中化加剧 &#xff0c;传统人工小区管理模式逐渐跟不上时代的潮流。这就要求我们提供一个专门的管理系统。来提高物管的工作效率、为住户提供更好的服务。 物业管理系统运用现代化的计算机管理手段,使物业的…

导入空管基础数据

1、首先将data.tar.gz解压到自定义目录中 注意&#xff1a;由于数据文件的压缩包比较大&#xff0c;解压过程可能会持续3~5分钟&#xff0c;请耐心等待。 [rootnode3 ~]# cd /opt/software/ [rootnode3 software]# tar -xzf data.tar.gz -C /opt/ 2、利用SQLyog或者其他数据库…

力扣L10--- 3. 无重复字符的最长子串--2024年3月14日

1.题目 2.知识点 注1&#xff1a;containsKey 是 Java 中 HashMap 类的一个方法&#xff0c;用于检查哈希表中是否包含指定的键。 注2&#xff1a;在哈希表&#xff08;HashMap)中&#xff0c;每个键对应着唯一的值&#xff0c;因此键不能重复&#xff0c;但值可以重复。 (1)创…

【 c 语言 】指针入门

&#x1f388;个人主页&#xff1a;豌豆射手^ &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;C语言 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进步&…