RocketMQ系统性学习-RocketMQ原理分析之Broker接收消息的处理流程

Broker接收消息的处理流程?

既然要分析 Broker 接收消息,那么如何找到 Broker 接收消息并进行处理的程序入口呢?

那么消息既然是从生产者开始发送,消息是有单条消息和批量消息之分的,那么消息肯定是有一个标识,当 Broker 接收到消息之后,肯定是需要通过判断消息的标识来区分单条消息和批量消息,那么只需要找到发送消息的标识,再全局搜索,就可以找到这个标识在哪里被处理,被处理的地方一定就是 Broker 接收消息处理的位置了!

那么还是先找到发送消息的位置:DefaultMQProducer # send(Message msg) ,通过层层调用(这里在生产者发送消息流程中讲了)到达了 DefaultMQProducerImpl # this.sendKernelImpl()

在这个方法中就调用到了 MQ 客户端的发送消息的方法 this.mQClientFactory.getMQClientAPIImpl().sendMessage()

在这里真正的通过 Netty 去发送消息到 Broker 中去:

  1. 通过判断消息的类型构造一个 RemotiongCommand 类型的 request 参数

    这里有 4 个构造 request 参数的方法,如下图会走到第三个方法中,那么这里的请求标识为 RequestCode.SEND_MESSAGE_V2

    在这里插入图片描述

  2. this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request) 方法中通过 Netty 将消息发送出去,那么这个方法需要传入一个 request 参数

在上边构造了 request 并且通过 Netty 发送出去,request 的标识为 RequestCode.SEND_MESSAGE_V2 ,那么我们只需要找到处理该标识的 request 的位置,那就是 Broker 处理消息的位置,在 IDEA 中通过 Ctrl+Shift+F 全局搜索这个标识即可:

在这里插入图片描述

可以发现有三个进行 case 判断的地方:

  • 第一个在 PlainAccessResource 类中
  • 第二个在 SendMessageActivity 类中
  • 第三个在 SendMessageRequestHeader 类中

这里第三个 case 判断的地方就是 Broker 处理消息的位置(可以在三个 case 中都 debug,看断点走到哪里就知道了)

那么我们就在第三个 case 判断的位置打上断点

在这里插入图片描述

接下来启动 NameServer,再以 Debug 的方式启动 Broker,再启动生产者,根据调用堆栈信息来找到 Broker 处理消息的整个调用链:

在这里插入图片描述

根据这个堆栈信息,可以发现,调用链是从 NettyServerHandler 的 channelRead0 转移过来的,那么也就是再 NettyServerHandler 这个 Netty 的服务端接收到消息并进行处理,那么我们就在这个堆栈信息中找 Broker 是在哪里对消息进行处理了呢?

就是在 SendMessageProcessor # processRequest 方法中(也就是堆栈顶第3个方法),在这个方法中:

  1. 通过 parseRequestHeader(request) 先对请求头进行解码,也就是根据请求头 RequestCode.SEND_MESSAGE_V2 的类型做一些相应的处理
  2. 接下来通过 buildMsgContext(ctx, requestHeader, request) 创建消息的上下文对象
  3. this.executeSendMessageHookBefore(sendMessageContext) 执行一些消息发送前的钩子(扩展点)
  4. 核心:this.sendMessage() 真正去发送消息

那么在 this.sendMessage() 中就是真正发送消息的逻辑了:

  1. 首先是 preSend(ctx, request, requestHeader) 进行预发送,这里其实就是对发送的消息进行一些检查(Topic 是否合法?Topic 是否与系统默认 Topic 冲突?Topic 的一些配置是否存在?等等信息)

  2. 如果 queueIdInt < 0 是 true 的话,表明生产者没有指定要发送到哪个队列,那么就通过 99999999 % 队列个数 来选择一个队列发送

  3. 将超过最大重试次数的消息发送到 DLQ 死信队列中去

    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) {return response;
    }
    
  4. 接下来判断 Broker 是否开启了 异步模式,如果开启的话,通过 asyncPutMessage() 处理

    如果没有开启 异步模式,通过 putMessage() 处理,这里其实还是调用了 asyncPutMessage(),只不过通过 get() 阻塞等待结果(复用代码)

那么在发送消息的时候,无论是否异步,都会进入到 DefaultMessageStore # asyncPutMessage() 方法中,我们就点进去看看进行了哪些处理:

  1. 执行一些钩子函数,作为扩展点:putMessageHook.executeBeforePutMessage(msg)

  2. 提交文件的写请求:CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg)

    在这个写文件的方法中,主要做一些文件的写操作,以及将文件写入到磁盘中

    1. 获取文件对象:this.mappedFileQueue.getLastMappedFile()
    2. 追加写文件的操作: mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext)
    3. 最后进行刷盘以及高可用的一些处理:handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA)
  3. 打印写文件消耗的时间 this.getSystemClock().now() - beginTime

那么 Broker 总体的接收消息的处理流程就是上边将的这么多了,当然还有一些边边角角的内容没有细说,先了解整体的处理流程,不要提前去学习太多的细节!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/221399.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Codeforces Round 916 (Div. 3)(G未补)

目录 A. Problemsolving Log B. Preparing for the Contest C. Quests D. Three Activities E1.E2. Game with Marbles F. Programming Competition A. Problemsolving Log 题意&#xff1a;A任务需要一分钟完成&#xff0c;B任务需要两分钟完成&#xff0c;……以此类推…

ASP.NET MVC实战之权限拦截Authorize使用

1&#xff0c;具体的实现方法代码如下 public class CustomAuthorizeAttribute : FilterAttribute, IAuthorizationFilter{/// <summary>/// 如果需要验证权限的时候&#xff0c;就执行进来/// </summary>/// <param name"filterContext"></par…

代码随想录算法训练营第四十一天|198.打家劫舍 ,213.打家劫舍II ,337.打家劫舍III

198. 打家劫舍 - 力扣&#xff08;LeetCode&#xff09; 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#…

设计模式(三)-结构型模式(5)-外观模式

一、为何需要外观模式&#xff08;Facade&#xff09;? 要实现一个大功能&#xff0c;我们需要将它拆分成多个子系统。然后每个子系统所实现的功能&#xff0c;就由一个称为外观的高层功能模块来调用。这种设计方式就称为外观模式。该模式在开发时常常被使用过&#xff0c;所…

HTML CSS 进度条

1 原生HTML标签 <meter>&#xff1a;显示已知范围的标量值或者分数值<progress>&#xff1a;显示一项任务的完成进度&#xff0c;通常情况下&#xff0c;该元素都显示为一个进度条 1.1 <meter> <html><head><style>meter{width:200px;}…

飞天使-k8s知识点1-kubernetes架构简述

文章目录 名词功能要点 k8s核心要素CNCF 云原生框架简介k8s组建介绍 名词 CI 持续集成, 自动化构建和测试&#xff1a;通过使用自动化构建工具和自动化测试套件&#xff0c;持续集成可以帮助开发人员自动构建和测试他们的代码。这样可以快速检测到潜在的问题&#xff0c;并及早…

解决 Hbuilder打包 Apk pad 无法横屏 以及 H5 直接打包 成Apk

解决 Hbuilder打包 Apk pad 无法横屏 前言云打包配置 前言 利用VUE 写了一套H5 想着 做一个APP壳 然后把 H5 直接嵌进去 客户要求 在pad 端 能够操作 然后页面风格 也需要pad 横屏展示 云打包 配置 下面是manifest.json 配置文件 {"platforms": ["iPad"…

Axure中如何使用交互样式交互事件交互动作情形

&#x1f3ac; 艳艳耶✌️&#xff1a;个人主页 &#x1f525; 个人专栏 &#xff1a;《产品经理如何画泳道图&流程图》 ⛺️ 越努力 &#xff0c;越幸运 目录 一、Axure中交互样式 1、什么是交互样式&#xff1f; 2、交互样式的作用&#xff1f; 3、Axure中如何…

Postman使用总结--生成测试报告

1.执行生成的命令格式 newman run 用例集文件 .json -e 环境文件 .json -d 数据文件 .json/.csv -r htmlextra --reporter- htmlextra-export 测试报告名 .html -e 和 -d 是 非必须的。 如果没有使用 环境&#xff0c;不需要指定 -e 如果没有使用 数据…

Educational Codeforces Round 160 (Div. 2) A~E

A.Rating Increase&#xff08;思维&#xff09; 题意&#xff1a; 给出一个仅包含数字的字符串 s s s&#xff0c;要求将该字符串按以下要求分成左右两部分 a , b a,b a,b&#xff1a; 两个数字均不包含前导 0 0 0 两个数字均大于 0 0 0 b > a b > a b>a 如果…

董宇辉“回归”成为东方甄选高级合伙人,尘埃落地后是谁赢了?

董宇辉“回归”成为东方甄选高级合伙人&#xff0c;尘埃落地后是谁赢了&#xff1f; 董宇辉的“小作文事件”“CEO摔手机事件”迎来大结局了&#xff01; 就在12月18日&#xff0c;董宇辉被任命为新东方教育科技集团董事长文化助理&#xff0c;兼任新东方文旅集团副总裁。有朋…

java中常用的加密算法总结

目前在工作中常用到加密的一些场景&#xff0c;比如密码加密&#xff0c;数据加密&#xff0c;接口参数加密等&#xff0c;故通过本文总结以下常见的加密算法。 1. 对称加密算法 对称加密算法使用相同的密钥进行加密和解密。在Java中&#xff0c;常见的对称加密算法包括&…

网络基础介绍

1.网线制作 1.1 网线制作需要的工具 网线 网线钳 水晶头 测试仪 ​编辑 1.2 网线的标准 1.3 网线的做法 2.集线器&交换机&路由器的介绍 3.OSI七层模型 4.路由器的设置 4.1 常见的路由器设置地址 4.2 常见的路由器账号密码 4.3 登录路由器 设置访客网…

【Axure RP9】中继器应用及相关案例

一 中继器简介 1.1 中继器是什么 中继器&#xff08;Repeater&#xff09;是一种高级的组件&#xff08;Widget&#xff09;&#xff0c;用于显示文本、图像和其他元素的重复集合。它是一个容器&#xff0c;容器中的每一个项目称作“item”&#xff0c;由于“item”中的数据由…

【Spark精讲】Spark五种JOIN策略

目录 三种通用JOIN策略原理 Hash Join 散列连接 原理详解 Sort Merge Join 排序合并连接 Nested Loop 嵌套循环连接 影响JOIN操作的因素 数据集的大小 JOIN的条件 JOIN的类型 Spark中JOIN执行的5种策略 Shuffle Hash Join Broadcast Hash Join Sort Merge Join C…

【面试】Java最新面试题资深开发-微服务篇(1)

问题九&#xff1a;微服务 什么是微服务架构&#xff1f;它与单体架构相比有哪些优势和劣势&#xff1f;解释一下服务发现和服务注册是什么&#xff0c;它们在微服务中的作用是什么&#xff1f;什么是API网关&#xff08;API Gateway&#xff09;&#xff1f;在微服务中它有何…

issue阶段的选择电路的实现

1-of-M的仲裁电路 为什么要实现oldest-first 功能的仲裁呢&#xff1f; 这是考虑到越是旧的指令&#xff0c;和它存在相关性的指令也就越多&#xff0c;因此优先执行最旧的指令&#xff0c;则可以唤醒更多的指令&#xff0c;能够有效地提高处理器执行指令的并行度,而且最旧的指…

德人合科技 | 公司电脑文件加密系统

公司电脑文件加密系统是一种可以对电脑文件进行加密的保护机制。它使用驱动层透明加密技术&#xff0c;能够在用户无感知的情况下对文件进行加密&#xff0c;从源头上保障数据安全和使用安全。 PC端访问地址&#xff1a; www.drhchina.com 此类系统主要有以下几个特点和功能&a…

免 费 搭 建 小程序商城,打造多商家入驻的b2b2c、o2o、直播带货商城

在数字化时代&#xff0c;电商行业正经历着前所未有的变革。鸿鹄云商的saas云平台以其独特的架构和先进的理念&#xff0c;为电商行业带来了全新的商业模式和营销策略。该平台涉及多个平台端&#xff0c;包括平台管理、商家端、买家平台、微服务平台等&#xff0c;涵盖了pc端、…

基于RocketMQ实现分布式事务

前言 在上一篇文章Spring Boot自动装配原理以及实践我们完成了服务通用日志监控组件的开发&#xff0c;确保每个服务都可以基于一个注解实现业务功能的监控。 而本文我们尝试基于RocketMQ实现下单的分布式的事务。可能会有读者会有疑问&#xff0c;之前我们不是基于Seata完成了…