RocketMQ 中如何实现消息的可靠传递?

引言

作为头部消息队列开源中间件,学习其中的技术方案并且总结可靠性和健壮性,提升我们的架构思维和解决问题的能力 。

在 RocketMQ 中实现消息的可靠传递可以从多个方面入手,涵盖生产者、Broker 以及消费者等不同环节。

 

生产者端

1. 同步发送消息

生产者使用同步发送模式时,会等待 Broker 返回发送结果,确保消息成功发送到 Broker 才会继续后续操作。若发送失败,生产者可以进行重试。

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;public class SyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes("UTF-8"));try {// 同步发送消息producer.send(msg);} catch (Exception e) {// 发送失败,可进行重试等处理e.printStackTrace();}producer.shutdown();}
}

2. 重试机制

生产者在发送消息失败时,可配置重试次数。RocketMQ 支持自动重试,当遇到网络抖动、Broker 临时不可用等情况时,会自动尝试重新发送消息。

producer.setRetryTimesWhenSendFailed(3); // 设置发送失败时的重试次数为 3 次

3. 消息幂等性处理

为避免因重试导致消息重复发送,生产者可以为每条消息生成唯一的 ID。Broker 在接收消息时,会根据消息 ID 进行去重处理,确保相同 ID 的消息只被处理一次。

Broker 端

1. 刷盘策略

  • 同步刷盘:当 Broker 收到消息后,会先将消息写入磁盘,再返回响应给生产者。这种策略保证了消息不会因 Broker 异常重启而丢失,但会降低系统的吞吐量。
    flushDiskType = SYNC_FLUSH
  • 异步刷盘:Broker 收到消息后,先将消息写入内存缓冲区,然后立即返回响应给生产者,由专门的线程将消息异步写入磁盘。这种策略性能较高,但在 Broker 异常崩溃时,可能会丢失部分内存中的消息。

    2. 主从复制

    RocketMQ 支持主从复制架构,主 Broker 接收消息后,会将消息同步复制到从 Broker。当主 Broker 出现故障时,可以切换到从 Broker 继续提供服务,保证消息的可用性。

    brokerRole = SYNC_MASTER # 主 Broker 配置为同步主节点
    brokerRole = SLAVE # 从 Broker 配置为从节点

    消费者端

    1. 手动提交消费偏移量

    消费者在处理完消息后,手动向 Broker 提交消费偏移量,确保只有在消息处理成功后才更新消费进度。这样,当消费者出现异常时,可以从上次提交的偏移量处继续消费,避免消息丢失。

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class ManualCommitConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");// 手动提交消费偏移量consumer.setAutoCommit(false);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {// 处理消息System.out.println(new String(msg.getBody()));} catch (Exception e) {// 处理失败,返回重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 手动提交消费偏移量context.setAckIndex(msgs.size() - 1);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
    }

    2. 消费重试机制

    当消费者处理消息失败时,RocketMQ 会自动进行重试。消费者可以根据业务需求,设置重试次数和重试间隔,确保消息能够被成功处理。

    3. 幂等消费

    消费者在处理消息时,要保证消息的幂等性,即多次处理相同的消息不会产生额外的影响。可以通过消息 ID 或业务唯一标识来判断消息是否已经处理过,避免重复处理。

总结

  1. 持久化策略:内存注定是不可靠的,刷盘一定是可靠性首选,但是刷盘导致的IO延时如何优化,是评判中间件性能的关键。
  2. 重试机制:3次重试应该是各个开源框架的默认重试次数。
  3. 集群化策略:单个节点注定不是高可用的最终形态,主从复制多节点可靠是最终态。
  4. 幂等机制:保持消息的重复消费可靠性,幂等键或者其他策略都是可参考的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/9571.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

联想Y7000+RTX4060+i7+Ubuntu22.04运行DeepSeek开源多模态大模型Janus-Pro-1B+本地部署

直接上手搓了&#xff1a; conda create -n myenv python3.10 -ygit clone https://github.com/deepseek-ai/Janus.gitcd Januspip install -e .pip install webencodings beautifulsoup4 tinycss2pip install -e .[gradio]pip install pexpect>4.3python demo/app_januspr…

批量卸载fnm中已经安装的所有版本

直接上代码 fnm list | awk -F NR>1 {print line} {line$2} | xargs -n 1 -I {} fnm uninstall {}原理 fnm list 列出 fnm 中所有已经安装的 node 版本 awk -F NR>1 {print line} {line$2} 以空格分隔-F {line$2}&#xff0c;取从左到右第 2 段&#xff08;v22.11…

(done) MIT6.S081 2023 学习笔记 (Day6: LAB5 COW Fork)

网页&#xff1a;https://pdos.csail.mit.edu/6.S081/2023/labs/cow.html 任务1&#xff1a;Implement copy-on-write fork(hard) (完成) 现实中的问题如下&#xff1a; xv6中的fork()系统调用会将父进程的用户空间内存全部复制到子进程中。如果父进程很大&#xff0c;复制过程…

如何将xps文件转换为txt文件?xps转为pdf,pdf转为txt,提取pdf表格并转为txt

文章目录 xps转txt方法一方法二 pdf转txt整页转txt提取pdf表格&#xff0c;并转为txt 总结另外参考XPS文件转换为TXT文件XPS文件转换为PDF文件PDF文件转换为TXT文件提取PDF表格并转为TXT示例代码&#xff08;部分&#xff09; 本文测试代码已上传&#xff0c;路径如下&#xff…

C++,STL,【目录篇】

文章目录 一、简介二、内容提纲第一部分&#xff1a;STL 概述第二部分&#xff1a;STL 容器第三部分&#xff1a;STL 迭代器第四部分&#xff1a;STL 算法第五部分&#xff1a;STL 函数对象第六部分&#xff1a;STL 高级主题第七部分&#xff1a;STL 实战应用 三、写作风格四、…

[STM32 - 野火] - - - 固件库学习笔记 - - -十三.高级定时器

一、高级定时器简介 高级定时器的简介在前面一章已经介绍过&#xff0c;可以点击下面链接了解&#xff0c;在这里进行一些补充。 [STM32 - 野火] - - - 固件库学习笔记 - - -十二.基本定时器 1.1 功能简介 1、高级定时器可以向上/向下/两边计数&#xff0c;还独有一个重复计…

Mybatis是如何进行分页的?

大家好&#xff0c;我是锋哥。今天分享关于【Mybatis是如何进行分页的&#xff1f;】面试题。希望对大家有帮助&#xff1b; Mybatis是如何进行分页的&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 MyBatis 实现分页的方式有很多种&#xff0c;最常见…

四.3 Redis 五大数据类型/结构的详细说明/详细使用( hash 哈希表数据类型详解和使用)

四.3 Redis 五大数据类型/结构的详细说明/详细使用&#xff08; hash 哈希表数据类型详解和使用&#xff09; 文章目录 四.3 Redis 五大数据类型/结构的详细说明/详细使用&#xff08; hash 哈希表数据类型详解和使用&#xff09;2.hash 哈希表常用指令(详细讲解说明)2.1 hset …

编译dpdk19.08.2中example时一系列报错解决

dpdk19.08编译过程全解 dpdk 介绍问题描述编译过程执行Step 1报错一解决方式 报错二解决方式 继续执行Step 248的时候报错 49没有修改成功输入60退出 使用过程执行make报错一解决方式 继续make报错二解决方式 继续make执行生成文件helloworld报错三解决方式 执行make 完成参考链…

openeuler 22.03 lts sp4 使用 cri-o 和 静态 pod 的方式部署 k8s-v1.32.0 高可用集群

前情提要 整篇文章会非常的长…可以选择性阅读,另外,这篇文章是自己学习使用的,用于生产,还请三思和斟酌 静态 pod 的部署方式和二进制部署的方式是差不多的,区别在于 master 组件的管理方式是 kubectl 还是 systemctl有 kubeadm 工具,为什么还要用静态 pod 的方式部署?…

渗透测试之WAF规则触发绕过规则之规则库绕过方式

目录 Waf触发规则的绕过 特殊字符替换空格 实例 特殊字符拼接绕过waf Mysql 内置得方法 注释包含关键字 实例 Waf触发规则的绕过 特殊字符替换空格 用一些特殊字符代替空格&#xff0c;比如在mysql中%0a是换行&#xff0c;可以代替空格 这个方法也可以部分绕过最新版本的…

C# dataGridView1获取选中行的名字

在视觉项目中编写的框架需要能够选择产品或复制产品等方便后续换型&#xff0c;视觉调试仅需调试相机图像、调试视觉相关参数、标定&#xff0c;再试跑调试优化参数。 C# dataGridView1 鼠标点击某一行能够计算出是那一行 使用CellMouseClick事件 首先&#xff0c;在Form的构造…

Ubuntu介绍、与centos的区别、基于VMware安装Ubuntu Server 22.04、配置远程连接、安装jdk+Tomcat

目录 ?编辑 一、Ubuntu22.04介绍 二、Ubuntu与Centos的区别 三、基于VMware安装Ubuntu Server 22.04 下载 VMware安装 1.创建新的虚拟机 2.选择类型配置 3.虚拟机硬件兼容性 4.安装客户机操作系统 5.选择客户机操作系统 6.命名虚拟机 7.处理器配置 8.虚拟机内存…

Linux基础指令

基本文件操作 补充&#xff1a; “cd -” 可以前往刚才所在目录 “ls 文件路径” 列举指定路径的文件 “ls -a”列出隐藏文件 “ls -l”可以缩写为“ll” 周边概念 读取操作 “cat 文件名”阅读文本文件内容&#xff0c;可以使用Tab键补全文件…

【HarmonyOS之旅】基于ArkTS开发(三) -> 兼容JS的类Web开发(三)

目录 1 -> 生命周期 1.1 -> 应用生命周期 1.2 -> 页面生命周期 2 -> 资源限定与访问 2.1 -> 资源限定词 2.2 -> 资源限定词的命名要求 2.3 -> 限定词与设备状态的匹配规则 2.4 -> 引用JS模块内resources资源 3 -> 多语言支持 3.1 -> 定…

【JavaWeb06】Tomcat基础入门:架构理解与基本配置指南

文章目录 &#x1f30d;一. WEB 开发❄️1. 介绍 ❄️2. BS 与 CS 开发介绍 ❄️3. JavaWeb 服务软件 &#x1f30d;二. Tomcat❄️1. Tomcat 下载和安装 ❄️2. Tomcat 启动 ❄️3. Tomcat 启动故障排除 ❄️4. Tomcat 服务中部署 WEB 应用 ❄️5. 浏览器访问 Web 服务过程详…

C语言练习(29)

13个人围成一圈&#xff0c;从第1个人开始顺序报号1、2、3。凡报到“3”者退出圈子&#xff0c;找出最后留在圈子中的人原来的序号。本题要求用链表实现。 #include <stdio.h> #include <stdlib.h>// 定义链表节点结构体 typedef struct Node {int num;struct Nod…

简要介绍C语言和c++的共有变量,以及c++特有的变量

在C语言和C中&#xff0c;变量是用来存储数据的内存位置&#xff0c;它们的使用方式和特性在两种语言中既有相似之处&#xff0c;也有不同之处。以下分别介绍C语言和C的共有变量以及C特有的变量。 C语言和C的共有变量 C语言和C都支持以下类型的变量&#xff0c;它们在语法和基…

【UE插件】Sphinx关键词语音识别

视频教程&#xff1a; Unreal Engine - Speech Recognition - Free Pluginhttps://www.youtube.com/watch?vKBcXNnSdWog&t622s 官方教程&#xff1a; Sphinx: Speech Recognition Plugin | Unreal Engine Community Wikihttps://unrealcommunity.wiki/speech-recognition…

图漾相机——C++语言属性设置

文章目录 前言1.SDK API功能介绍1.1 Device组件下的API测试1.1.1 相机工作模式设置&#xff08;TY_TRIGGER_PARAM_EX&#xff09;1.1.2 TY_INT_FRAME_PER_TRIGGER1.1.3 TY_INT_PACKET_DELAY1.1.4 TY_INT_PACKET_SIZE1.1.5 TY_BOOL_GVSP_RESEND1.1.6 TY_BOOL_TRIGGER_OUT_IO1.1.…