RocketMQ快速实战以及集群架构原理详解

RocketMQ快速实战以及集群架构原理详解

组成部分

  • 启动Rocket服务之前要先启动NameServer

image.png

  • NameServer
    • 提供轻量级Broker路由服务,主要是提供服务注册
  • Broker
    • 实际处理消息存储、转发等服务的核心组件
  • Producer
    • 消息生产者集群,通常为业务系统中的一个功能模块
  • Consumer
    • 消息消费者集群,通常是业务系统中的一个功能模块
  • Topic
    • 区分消息的种类,生产端可以发送消息给一个或多个topic,消费端可以进行一个或多个消息进行消费

集群中的角色

  • Producer
    • 消息发送者(寄信人),在生产者中会把同一类生产者组成一个集合,称之为生产者组,这类生产者发送同一类消息且发送逻辑一致,如果发送的是事务消息是原始生产者在发送之后崩溃,则Broker服务会联系同一生产组的其它生产者来提交或回溯消费
  • Consumer
    • 消息接受者(收信人),消费者同样会把一类消费者组成一个集合,称之为消费者组,这类消费者消费同一类消息且消费逻辑一致,消费者组在消息消费方面,实现负载均衡和容错非常容易,消费组中的消费者必须订阅相同的topic
    • RocketMQ支持两种消息模式
      • 集群消费模式
        • 相同消费者组下的每个消费者平摊消息
      • 广播消费模式
        • 相同消费者组的每个消费者接受全量消息
  • Broker Server
    • 暂存和传输消息(邮局),也存储消息相关的元数据信息(包括消费者组、消费进度偏移、主题、队列消息等),Broker Server是RocketMQ真正的业务核心
      • 子模块
        • Remoting Module
          • 整个Broker的实体,负责处理来自Client端的请求
        • Client Manager
          • 负责管理客户端以及维护消费者订阅的topic信息
        • Store Service
          • 提供方便简单的API接口处理消息存储到物理硬盘的查询功能
        • HA Service
          • 高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能
        • Index Service
          • 根据特定的Message Key对投递到Broker的消息进行索引服务,以提供消息的快速查询
      • Broker架构模式
        • 普通集群
          • 每个节点分配一个固定的角色,master负责响应客户端的写以及存储消息,slave只负责对master的消息进行同步以及响应客户端的读
            • 消息同步方式分为同步和异步
        • Dledger高可用集群
          • 基于Raft协议随机选举出一个master,而master挂了之后,会从slage中自动选举出一个节点作为新master
          • Dledger的职责
            • 接管Broker的Commitlog的消息存储
            • 从集群中选举出master节点
            • 完成master节点往slave节点的消息同步
  • Name Server
    • 管理Broker(邮局的管理机构),Broker Server启动时会向所有的Name Server注册自己的服务信息,并且后续通过心跳来保证服务信息的实时性,生产者或消费者可以通过名称服务查找各个topic响应的Broker IP列表,多个Name Server实例组成集群(AP模式),但相互独立,没有信息互换,意味着Name Server中任意的节点挂了,只要有一台服务节点正常,整个路由服务不会受影响
  • Topic
    • 区分消息的种类,一个发送者可以发送消息给一个或多个topic,一个消息的接受者可以订阅一个或多个topic消息,同一个topic下的数据,会分片存储到不同的Broker上,而每一个分片单位MessageQueue(类似于Kafka中的Partition)
  • MessageQueue
    • 相当于Topic中的分区,用于并行发送和接收消息,每个Topic默认有4个MessageQueue

消息确认机制

  • 消息生产端采用消息确认多次重试的机制来保证消息能发送到MQ

    • 3种发送消息的方式

      • 同步发送

        • 必须等到Broker反馈之后才能继续发,安全性最高但发消息最慢
      • 单向发送

        • 不管消息是否发成功都能继续发,所以吞吐量最高,但是安全性低,容易丢消息
      • 异步发送

        • 发送消息的同时回注册一个回调去处理响应,安全性低,容易丢消息
  • 消息消费者端采⽤状态确认机制保证消费者⼀定能正常处理对应的消息

    • Broker会通过记录重试次数,为了不影响topic下其它正常的消息,会给每个消费组设计对应的重试topic,在消息重试时,会将原topic的消息移动到对应的重试topic中去,当重试达到一定阈值会将失败的消息推入到死信topic中
    • 消费者组由多个消费者实例组成,Broker只需要向某一个实例推送消息即可,保障消息重试机制正常运行,并且Broker只通过消费者返回的状态来判断是否处理成功,但是业务执行是否正确是无法知道的
  • 消费者也可以⾃⾏指定起始消费位点

    • Broker通过消费者返回的状态来推进消费者组对应的消息offset,虽然offset是Broker来维护,但是消费者可以自己指定offset进行消费

消息模型

顺序消息

  • 只能保证局部消息有序,不能保证全局有序,要保证全局有序需要从生产端、Broker、消费端三个角度同时考虑才行
    • 生产端
      • 默认情况下,生产端采用轮询将消息投递到不同的MessageQueue种,而消费端会从多个MessageQueue中拉取消息,所以这种情况下是无法保证顺序的,所以只有让一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这组消息有序
    • Broker
      • Broker中的一个MessageQueue是可以保证有序的
    • 消费端
      • 消费端会从多个MessageQueue上拉取消息,此时每个MessageQueue的消息是有序的,但是多个MessageQueue直接混合到一起却是乱序的,如果想要保证消费有序,可以通过锁MessageQueue的方式,消费完一个MessageQueue再去消费下一个来保证
        • MessageListenerOrderly会锁队列,取完一个才能下一个
        • MessageListenerConcurrently不会锁队列,每次从多个MessageQueue取出一批数据(默认不超过32条)
  • 实现思路简概
    • 生产者只有将一批有序的消息放到同一个MessageQueue上,Broker才有可能保持这一批消息的顺序
    • 消费者只有一次锁定一个MessageQueue,拿到MessageQueue上消息
  • 注意点
    • 大部分业务场景下只要保证局部有序,如果要保证全局有序,只能保留一个MessageQueue,性能较低
    • 生产者端尽可能将有序消息打散到不同的MessageQueue上,避免数据热点竞争
    • 消费者端只能使用同步方式处理消息,不要使用异步处理,更不能自行批量处理
    • 消费者端只进行有限次数的重试,如果一条消息处理失败,RocketMQ会将后续消息阻塞,让消费者进行重试,但是如果消费者一直处理失败,超过最大重试次数,RocketMQ会跳过这条消息,直接处理后面的消息,导致消费乱序
    • 消费者端如果处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代

广播消息

  • 广播消息并没有特定的消费者,因为这涉及到消费者的集群消费模式,默认是集群模式
  • 实现思路简概
    • 集群模式
      • 一个消息只会被一个消费组中的多个消费者共同处理一次
        • Broker端会给每个消费者组维护一个统一的offset来保证同一个消费组内只会被消费一次
    • 广播模式
      • 一个消息会被推送给所有消费者消费,不再关心消费组
        • Broker端只管推消息,消费端自己维护offset
  • 注意点
    • Broker端不维护消费进度,如果消费者处理消息失败了,将⽆法进⾏消息重试
    • 消费端自己维护offset可以在服务重启后继续之前的进度,消息丢了也不影响服务稳定性

延迟消息

  • RocketMQ给消息定制了18个默认的延迟级别,延迟消息的难点其实是性能,需要不断进⾏定时轮询,全部扫描所有消息是不可能的
  • 实现思路简概
    • RocketMQ预设一个系统topic(SCHEDULE_TOPIC_XXXX),在这个topic下有18个延迟队列,每次只针对这些队列里的消息进行延迟操作
  • 注意点
    • 预设延迟时间导致不灵活,后续版本已经支持预设一个具体的时间戳,不建议调整延迟级别对应的延迟时间

批量消息

  • 生产者端发送的消息过多时,可以将多条消息合并进行批量发送,减少网络IO,提升消息发送的吞吐量

  • 注意点

    • 只能对同一topic下的消息进行批量发送,不支持延迟消息,以及批量消息的大小不超过1MB(超过了自行拆分)

过滤消息

  • 同一topic下不同的消息,消费者只关注某一类消息,有简单过滤和SQL过滤方式
  • 实现思路简概
    • 简单过滤
      • ⽣产者端需要在发送消息时,增加Tag属性,消费者端/Broker端就可以通过这个Tag属性过滤出需要的消息
    • SQL过滤
      • ⽣产者端需要在发送消息时,增加Tag属性以及自定义的属性,消费者端/Broker端可以指定一个SQL进行过滤
  • 注意点
    • 消息过滤在消费者端和Broker端都可以做,消费者端进行过滤可以保障消息过滤的可控性,而Broker端过滤可以减少不必要数数据网络IO(只把消费者端需要的消息发送出去就行)
    • 在实际生产中,被过滤的消息并不会直接丢弃,会交给其它需要的消费者进行消费,如果一直没有消费者进行消费,Broker端会继续推进offset

事务消息

  • 通过RocketMQ的事务机制,来保障本地事务(比如数据库)与MQ消息发送的事务一致性(上下游的数据一致性)

  • 实现思路简概

    • 生产者端将消息发送到MQ服务端
    • MQ服务端将消息持久化成功之后,向生产者端反馈已发送成功,此时消息处于半事务消息状态(暂不能投递)
    • 生产者端开始执行本地事务逻辑
    • 生产者端根据本地事务执行结果向MQ服务端提交二次确认结果来判断是否提交或回滚
      • 提交
        • 服务端将半事务消息标记为可投递,将半事务消息转交给消费端
      • 回滚
        • 服务端将回滚事务,放弃将半事务消息转交给消费端
    • 当出现网络故障或生产者端重启时,若果MQ服务端未收到二次确认消息结果或收到的结果为未知状态,经过一定时间后,MQ服务端将对生产者组的任一生产者发送消息回查,生产者收到消息回查后,需要检查对应消息的本地事务执行最终结果,然后生产者端根据检查到的最终结果再次提交二次确认来判断是否提交或回滚
  • 注意点

    • 半消息是对消费者不可⻅的⼀种消息,RocketMQ的做法是将消息转到了⼀个系统Topic(RMQ_SYS_TRANS_HALF_TOPIC)
    • 事务消息中,本地事务默认回查次数15次,本地事务回查的默认间隔60秒,超过回查次数后,消息将会被丢弃
    • 事务消息不支持延迟消息和批量消息

最佳实战注意点

  • 合理分配Topic、Tag
    • ⼀个应⽤尽可能⽤⼀个Topic,⽽消息⼦类型则可以⽤tags来标识,tags可以由应⽤⾃由设置,只有⽣产者在发送消息设置了tags,消费⽅在订阅消息时才可以利⽤tags通过broker做消息过滤
  • 使⽤Key加快消息索引
    • 分配好Topic和Tag之后,⾃然就需要优化Key属性了,因为Key也可以参与消息过滤,通常建议每个消息要分配 ⼀个在业务层⾯的唯⼀标识码,设置到Key属性中
      • 作用
        • 可以配合Tag进⾏更精确的消息过滤
        • Broker端会为每个消息创建⼀个hash索引,应⽤可以通过topic、key来查询某⼀条历史的消息内容,以及消息在集群内的处理情况,为了避免哈希冲突问题,客户端要尽量保证key的唯⼀性
  • 关注错误消息重试
    • RocketMQ消费者端,如果处理消息失败了,Broker是会将消息重新进⾏投送,⽽在重试时,每个消费者组创建⼀个对应的重试队列(“%RETRY%”+ConsumeGroup),多关注重试队列,可以及时了解消费者端的运⾏情况,如果这个队列中出现了⼤量的消息,就意味着消费者的运⾏出现了问题,要及时跟踪进⾏⼲预
    • RocketMQ默认允许每条消息最多重试16次(可以定制),如果消息重试16次后仍然失败,消息将不再投递。转为进⼊死信队列
  • ⼿动处理死信队列
    • 当⼀条消息消费失败,RocketMQ就会⾃动进⾏消息重试。⽽如果消息超过最⼤重试次数,RocketMQ就会认为这个消息有问题,RocketMQ不会⽴刻将这个有问题的消息丢弃,⽽会将其发送到这个消费者组对应的死信队列,此时需要人工去查看死信队列(%DLQ%+ConsumGroup)中的消息,对错误原因进行排查以及对死信进行处理(转发到正常的tipic进行重新消费或者丢弃)
    • 死信队列的特征
      • ⼀个死信队列对应⼀个ConsumGroup,⽽不是对应某个消费者实例
      • 如果⼀个ConsumeGroup没有产⽣死信,RocketMQ就不会为其创建相应的死信队列
      • 死信队列中的消息不会再被消费者正常消费
      • 死信队列的有效期跟正常消息相同,默认3天(可配置),超过这个最⻓时间的消息都会被删除,⽽不管消息是否消费过
  • 消费者端进⾏幂等控制
    • 在MQ系统中,对于消息幂等有三种实现语义
      • at most once 最多⼀次:每条消息最多只会被消费⼀次
        • 可以⽤异步发送、sendOneWay等⽅式就可以保证
      • at least once ⾄少⼀次:每条消息⾄少会被消费⼀次
        • 可以⽤同步发送、事务消息等很多⽅式能够保证
      • exactly once 刚刚好⼀次:每条消息都只会确定的消费⼀次
        • RocketMQ只能保证at least once,保证不了exactly once
          • 云上版本支持
    • 消息幂等的必要性
      • 出现重复的情况
        • 发送时消息重复
          • 当⼀条消息已被成功发送到服务端并完成持久化,此时出现了⽹络闪断或者客户端宕机,导致服务端对客户端
            应答失败, 如果此时⽣产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且消息ID也相同的消息
        • 投递时消息重复
          • 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候⽹络闪断,为
            了保证消息⾄少被消费⼀次,Broker端将在⽹络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且消息ID也相同的消息
        • 负载均衡时消息重复(不限于⽹络抖动、Broker 重启以及订阅⽅应⽤重启)
          • 当 Broke端 或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息
      • 处理方式
        • 在RocketMQ中,是⽆法保证每个消息只被投递⼀次的,所以要在业务上⾃⾏来保证消息消费的幂等性,RocketMQ的每条消息都有⼀个唯⼀的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以⽤这个MessageId来作为判断幂等的关键依据,但是最好使用分布式ID来避免出现冲突

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/263798.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华清远见作业第四十一天——Qt(第三天)

思维导图: 编程 完善对话框,点击登录对话框,如果账号和密码匹配,则弹出信息对话框,给出提示”登录成功“,提供一个Ok按钮,用户点击Ok后,关闭登录界面,跳转到其他界面 如…

利用iSCSI服务部署IP SAN网络存储服务

一、配置环境(Vmware WorkStation虚拟环境) 服务端与客户端OS:openEuler 22.03-LTS CPU:1U1C 内存:2G 硬盘:5个SCSI磁盘,其中一个作为系统盘,另外四个配置为RAID5阵列 服务器IP…

【黑马程序员】1、TypeScript介绍_黑马程序员前端TypeScript教程,TypeScript零基础入门到实战全套教程

课程地址:【黑马程序员前端TypeScript教程,TypeScript零基础入门到实战全套教程】 https://www.bilibili.com/video/BV14Z4y1u7pi/?share_sourcecopy_web&vd_sourceb1cb921b73fe3808550eaf2224d1c155 目录 1、TypeScript介绍 1.1 TypeScript是什…

React 模态框的设计(三)拖动组件的完善

我在上次的Draggable组件的设计中给了一个简化的方法,今天我来完善一下这个组件,可用于任何可移动组件的包裹。完善后的效果如下所示: 这个优化中,增加了一个注目的效果,还增加了触发可拖动区域的指定功能,…

pikachu靶场-RCE

介绍: RCE(remote command/code execute)概述 RCE漏洞,可以让攻击者直接向后台服务器远程注入操作系统命令或者代码,从而控制后台系统。 远程系统命令执行 一般出现这种漏洞,是因为应用系统从设计上需要给用户提供指定的远程命…

2024“薪”风口、学习鸿蒙开发就业钱景如何?

随着华为的鸿蒙系统从诞生之初就备受关注,对于那些对鸿蒙开发感兴趣并希望在这一领域寻找职业发展的人来说,这是一个非常重要的问题。 那么,2024年学鸿蒙开发的就业前景如何呢? 一、彻底摆脱“安卓套壳”! HarmonyO…

我用Python写了一个倒计时软件

人过中年,每天都觉得时间过得很快,忙活了一天却发现自己很多时候是瞎忙,似乎没有什么成效,匆忙中一天就过去了。 后来,我想想可能是我没有时间的紧迫感,或者说没有明确的目标和执行力,所以才会…

LeetCode94.二叉树的中序遍历

题目 给定一个二叉树的根节点 root ,返回 它的 中序 遍历 。 示例 : 输入:root [1,null,2,3] 输出:[1,3,2] 思路 中序遍历的顺序是左子树 -> 根节点 -> 右子树。因此,我们可以通过递归的方式遍历二叉树&…

人工智能在测绘行业的应用与挑战

目录 一、背景 二、AI在测绘行业的应用方向 1. 自动化特征提取 2. 数据处理与分析 3. 无人机测绘 4. 智能导航与路径规划 5. 三维建模与可视化 6. 地理信息系统(GIS)智能化 三、发展前景 1. 技术融合 2. 精准测绘 3. 智慧城市建设 4. 可…

window: C++ 获取自己写的dll的地址

我自己用C写了一个插件,插件是dll形式的,我的插件式在dll的目录下有个config文件夹,里面是我用json写的插件配置文件,当插件运行的时候我需要读取到json配置文件,所有最重要的就是如何获取dll的路径. 大概就是这么个结构, 我自己封装了一个函数.只适用于window编程,因为里面用…

TCP/IP协议详解

文章目录 TCP/IP协议概述基于TCP/IP协议的应用工具协议协议的必要性 TCP/IP协议TCP/IP协议族协议的分层 传输方式的分类报文、帧、数据包等的区别TCP 和 UDP的区别 TCP/IP协议概述 TCP/IP(Transmission Control Protocol/Internet Protocol)是一组通信协…

Chrome插件精选 — 缓存清理

Chrome实现同一功能的插件往往有多款产品,逐一去安装试用耗时又费力,在此为某一类型插件挑选出比较好用的一款或几款,尽量满足界面精致、功能齐全、设置选项丰富的使用要求,便于节省一个个去尝试的时间和精力。 1. Chrome清理大师…

maven 打包命令

Maven是基于项目对象模型(POM project object model),可以通过一小段描述信息(配置)来管理项目的构建,报告和文档的软件项目管理工具。 Maven的核心功能便是合理叙述项目间的依赖关系,通俗点讲,就是通过po…

【程序员必备技能】Git入门

目录 🌈前言🌈 📁 Git的概念 📂 版本控制 📂 集中式 和 分布式 ​ 📁 创建和配置本地仓库 📁 理解工作区,暂存区,版本库 📁 Git的基本操作 📂…

论文阅读:How Do Neural Networks See Depth in Single Images?

是由Technische Universiteit Delft(代尔夫特理工大学)发表于ICCV,2019。这篇文章的研究内容很有趣,没有关注如何提升深度网络的性能,而是关注单目深度估计的工作机理。 What they find? 所有的网络都忽略了物体的实际大小,而关注他们的垂直…

Seata分布式事务实战AT模式

目录 分布式事务简介 典型的分布式事务应用场景 两阶段提交协议(2PC) 2PC存在的问题 什么是Seata? Seata的三大角色 Seata AT模式的设计思路 一阶段 二阶段 Seata快速开始 Seata Server(TC)环境搭建 db存储模式Nacos(注册&配…

【C++】多态概念(入门)

介绍: 多态的概念:通俗来说,多态就是多种形态,具体点就是去完成某个行为,当不同的对象去完成时会产生出不同的状态。比如扫红包操作,同样是扫码动作,不同的用户扫 得到的不一样的红包&#xff0…

限流算法

下面对常见的限流算法进行讨论。目前,常用的限流算法主要有三种:计数器法、滑动窗口算法、漏桶算法和令牌桶算法。下面分别介绍其原理。 1. 计数器法 计数器法是通过计数对到来的请求进行选择性处理。如系统限制一秒内最多有X个请求,则在该…

042 继承

代码实现 首先定义Person类(人类) /*** 人的基础特征** author Admin*/ public class Person {/*** 姓名*/String name;/*** 生日*/Date birthday;/*** 手机号码*/String tel;/*** 身份证号码*/String idCode;public Person() {}public Person(String …

关于添加第三方jar包到SpringBoot工程中的一些问题

1&#xff1a;如果是多级module工程的情况下&#xff0c;将jar包添加到当前module中&#xff1b; 2&#xff1a;在当前需要依赖的maven工程中添加 外部jar包路径进行引入 <dependency><groupId>kuaishou</groupId><artifactId>kuaishou-merchant-ope…