RocketMQ源码学习笔记:Producer发送消息流程

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、Overview
  • 2、验证消息
  • 3、查找路由
  • 4、选择消息发送队列
    • 4.1、选择队列的策略
    • 4.2、源码阅读
      • 4.2.1、轮询规避
      • 4.2.2、故障延迟规避
        • 4.2.2.1、计算规避时间
        • 4.2.2.2、选择队列
      • 4.2.3、ThreadLocal的使用
  • 5、发送消息
    • 5.1、客户端建立的时间


1、Overview

消息发送主要可以分成下面四个步骤。

  1. 验证消息
  2. 查找路由
  3. 选择队列
  4. 消息发送

之后从源码查看四个步骤的具体内容。

我们建立一个DefaultMQProducer之后,调用DefaultMQProducer#send()方法就可发送信息。

查看send()的代码最终会来到DefaultMQProducerImpl#sendDefaultImpl(),我们从这里开始看源码。

2、验证消息

发送前必然验证一下消息。

主要是检验消息的状态,一些必要的值不能为空等。

this.makeSureStateOK();
// 1、检查消息
Validators.checkMessage(msg, this.defaultMQProducer);

公司内部想设置一些新的规则用来发送前拦截信息就适合放在checkMessage()里。

这部分没有太多内容。


3、查找路由

所谓的路由是指可用的Broker的信息,包括地址,具体的消息队列等。

下面这一句获取到路由。

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

DefaultMQProducer内部缓存这路由信息,维护在ConcurrentHashMap<String, TopicPublishInfo>

private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();

tryToFindTopicPublishInfo()中会先检查路由信息是否存在,不存在还需要从NameServer中获取路由列表。

this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);

更新的时候会加上一个ReentrantLock,更新结束后释放。

获取到路由信息后开始选择队列发送消息。


4、选择消息发送队列

4.1、选择队列的策略

得到路由信息后就开始选择消息队列发送信息。

选择队列有两种策略

  • 轮询规避:轮询选择队列。如果上次发送消息失败,那就消息需要重新发送,这时就需要规避掉上次发送失败的队列,寻找下一个队列发送。
  • 故障延迟策略:在选择队列发送时根据以往发送时长判断该队列的Broker是否可用。对于发送失败的BrokerProducer会规避该Broker一段时间。

这是发送消息的流程图。

假设我们的Broker是集群,有两个Broker。消息会选择其中一个Broker发送消息,如果失败就重试,直到发送成功或者超过重试次数。
在这里插入图片描述


4.2、源码阅读

这里会探索源码如何实现这两种队列选择策略。

选择队列的入口在DefaultMQProducerImpl#sendDefaultImpl -> this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
在这里插入图片描述
从入口进入查看代码,最终在MQFaultStrategy#selectOneMessageQueue,代码通过sendLatencyFaultEnable这个字段来选择不同的选择策略。

在这里插入图片描述

4.2.1、轮询规避

这是轮询规避的源码。
在这里插入图片描述
其中lastBrokerName是上一次消息发送时选择的broker。这代表该消息上一次发送失败了,所以记录着上一次失败的broker以在这次选择Broker时规避他。

所以lastBrokerName==null时该消息是第一次发送,不需要规避,直接随机选择一个队列发送。

如果上一次发送失败,则开始轮询选择一个队列,保证这个新选出的队列和上一个不同后就可以返回。


4.2.2、故障延迟规避

4.2.2.1、计算规避时间

故障延迟规避策略需要记录发送时间并计算。在看选择Broker的代码时需要看看源码如何记录发送时间并计算出规避时间的。

计算规避时间的代码在this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);。这时消息正常发送会调用一次这个方法;如果出现异常在catch快也会调用这个方法计算规避时间。
在这里插入图片描述

进入这个方法,代码如下。

需要注意isolation=true时表示消息发送出现异常,这时便认为延迟时长是30000ms

同时也可以看到sendLatencyFaultEnable==true表示开启故障规避策略,这种情况才需要计算规避时间。选择Broker时也是通过这个属性判断使用过故障规避还是轮询规避。
在这里插入图片描述

规避时间的计算比较简单,阿里根据自己的经验设置了一个对照表来计算时间,如下图所示。比如延迟是550ms以内的Broker不用规避;延迟在550~1000ms的需要规避30s

在这里插入图片描述

这里跳过计算规避时间的代码细节,进入下一行代码this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);

代码如下,它其实就是将不可用的Broker维护在faultItemTable中,并且记录着解禁时间。以后选择Broker会通过这个集合查看Broker是否可用。
在这里插入图片描述

4.2.2.2、选择队列

我们回到选择Broker的代码MQFaultStrategy#selectOneMessageQueue,下图是相关代码。

在这里插入图片描述
它的大概流程是,轮询队列,如果可用就返回。实在找不到可用的就随机选择一个Broker发送。

它通过latencyFaultTolerance.isAvailable(mq.getBrokerName())判断队列是否可用,里面实际就是通过前面讲到的faultItemTable来查看队列是否可用。


4.2.3、ThreadLocal的使用

在选择队列时,无论是轮询规避还是故障延迟规避都需要循环遍历messageQueue找到适合的queue发送信息。

获取下标的方式用到了ThreadLocal。如下图所示,sendWhichQueue本质上就是一个ThreadLocal<Integer>对象。

请添加图片描述
生产者发送信息时可能会有多个线程同时发信息。

这些线程发送信息时应该各自维护一个消息队列的下标,这样每个线程发送信息时才会比较均匀地向每个队列都发送信息。

另外这些线程发送信息时可能会指定消息队列的id,所以线程各自维护一个消息队列的下标是很有必要的。

这个场景就很适合ThreadLocal,选择消息队列时用ThreadLocal来维护下标。



5、发送消息

5.1、客户端建立的时间

客户端发送消息时,建立HTTP连接是在send()方法中而不是在start()方法中。

站在设计者的角度需要考虑到,开发者在start()方法后可能还需要过一段时间才会真正发送信息,甚至不发信息。

那么建立HTTP连接放在start()就比较浪费资源,所以建立HTTP连接放在了send()方法中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/377005.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

土壤分析仪:解密土壤之奥秘的科技先锋

在农业生产和生态保护的道路上&#xff0c;土壤的质量与状况一直是我们关注的焦点。土壤分析仪&#xff0c;作为现代科技在农业和环保领域的杰出代表&#xff0c;以其高效、精准的分析能力&#xff0c;为我们揭示了土壤的奥秘&#xff0c;为农业生产提供了科学指导&#xff0c;…

一个spring boot项目的启动过程分析

1、web.xml 定义入口类 <context-param><param-name>contextConfigLocation</param-name><param-value>com.baosight.ApplicationBoot</param-value> </context-param> 2、主入口类: ApplicationBoot,SpringBoot项目的mian函数 SpringBo…

阿里云搭建vps服务器的过程

最近突发奇想想要搭建一个阿里云的的vps服务器&#xff0c;下面是搭建的过程&#xff1a; 首先&#xff0c;登录阿里云网站&#xff1a; 搜索&#xff0c;esc控制台&#xff1a; 点击创建实例&#xff1a; 选择地区&#xff1a; 选择实例规格&#xff1a; 选择镜像&#x…

adminPage-vue3依赖FormPage说明文档,表单页快速开发,使用思路及范例(Ⅱ)formConfig基础配置项

adminPage-vue3依赖FormPage说明文档&#xff0c;表单页快速开发&#xff0c;使用思路及范例&#xff08;Ⅱ&#xff09;formConfig配置项 属性: formConfig&#xff08;表单项设置&#xff09;keylabelnoLabeldefaultValuebindchildSlottypeString类型数据&#xff08;除 time…

ArcGIS识别不GDB文件地理数据库显示为空?

​ 点击下方全系列课程学习 点击学习—>ArcGIS全系列实战视频教程——9个单一课程组合系列直播回放 点击学习——>遥感影像综合处理4大遥感软件ArcGISENVIErdaseCognition 我们经常会碰到拷贝的GDB文件ArcGIS无法识别&#xff0c;软件只是把他当做普通的文件夹去看待&am…

2024最新Cloudways主机使用教程(含最新Cloudways折扣码)

Cloudways是一家提供云托管服务的公司&#xff0c;可以帮助你轻松管理和运行你的网站。本教程是Cloudways主机注册和使用教程。Cloudways界面简洁&#xff0c;使用方便&#xff0c;不需要复杂的设置&#xff0c;就能快速搭建一个WordPress网站。它的主机功能包括高级缓存和Bree…

[IDEA插件] JarEditor 编辑jar包(直接新增、修改、删除jar包内的class文件)

文章目录 1. 安装插件 JarEditor2. 在IDEA中添加外部JAR包3. JarEditor 使用介绍 之前我们需要修改jar内文件的时候需要解压jar包&#xff0c;反编译class&#xff0c;新建java源文件&#xff0c;修改代码&#xff0c;再编译成class&#xff0c;替换jar包内的class文件。 现在…

C++入门 模仿mysql控制台输出表格

一、 说明 控制台输出表格&#xff0c;自适应宽度 二、 源码 #include <iostream> #include <map> #include <string> #include <vector>using namespace std;void printTable(vector<vector<string>> *pTableData) {int row pTableDa…

C:数据结构---算法

1.1排序算法 稳定排序 不稳定排序 ①冒泡排序&#xff08;稳定&#xff09; 比较相邻的元素。如果第一个比第二个大&#xff0c;就交换他们两个。对每一对相邻元素作同样的工作&#xff0c;从开始第一对到结尾的最后一对 ②选择排序 在未排序序列中找到最小&#xff08;大…

vue2学习笔记3 - 开发环境知识补充:live server简介

学习笔记1搭建开发环境中&#xff0c;在vs code里安装了live server插件&#xff0c;后续多次使用open with live server来打开浏览器&#xff0c;展示代码运行效果。本着知其然也要知其所以然的态度&#xff0c;稍稍了解了一下Live server。 什么是Live Server Live Server是…

MFC Ribbon菜单 - 中英文实时切换方法

简介 最近在搞一个老外的项目&#xff0c;本来谈的好好的&#xff0c;纯英文界面。项目接近尾声了&#xff0c;又提出了中英文实时切换的新需求&#xff0c;没办法就只能想办法&#xff0c;毕竟客户最大嘛。 实现方法 还好本来的ribbon英文菜单不复杂&#xff0c;就用纯C编码…

【两大3D转换SDK对比】HOOPS Exchange VS. CAD Exchanger

在现代工业和工程设计领域&#xff0c;CAD数据转换工具是确保不同软件系统间数据互通的关键环节。HOOPS Exchange和CAD Exchanger是两款备受关注的工具&#xff0c;它们在功能、支持格式、性能和应用场景等方面有着显著差异。 本文将从背景、支持格式、功能和性能、应用场景等…

网络安全设备——EDR

网络安全中的EDR&#xff08;Endpoint Detection and Response&#xff0c;端点检测与响应&#xff09;是一种主动式的端点安全解决方案&#xff0c;它专注于监控、检测和响应计算机和终端设备上的安全威胁。以下是EDR的详细解释&#xff1a; 一、定义与功能 EDR是一种网络安…

第三方配件也能适配苹果了,iOS 18与iPadOS 18将支持快速配对

苹果公司以其对用户体验的不懈追求和对创新技术的不断探索而闻名。随着iOS 18和iPadOS 18的发布&#xff0c;苹果再次证明了其在移动操作系统领域的领先地位。 最新系统版本中的一项引人注目的功能&#xff0c;便是对蓝牙和Wi-Fi配件的配对方式进行了重大改进&#xff0c;不仅…

若依(RuoYi)开源框架-登录

学习目标&#xff1a; 使用&#xff0c;减少自己的工作量 学习优秀开源项目底层的编程思想&#xff0c;设计思路&#xff0c;提高自己的编程能力 环境要求&#xff1a;jdk1.8 MySQL Redis Maven Vue 使用若依&#xff1a; 1.下载并运行 2.看懂业务流程 3.进行二次开发 1.登录 …

【微信小程序开发】如何定义公共的js函数,其它页面可以调用

在微信小程序开发中&#xff0c;可以通过以下步骤定义和使用公共的 JS 函数&#xff0c;使得其它页面可以调用&#xff1a; 1. 创建一个公共的 JS 文件&#xff1a;在项目的 utils 目录下创建一个 JS 文件&#xff0c;例如 utils/util.js。 2. 定义公共函数&#xff1a;在 uti…

QUIC的Stream

QUIC的流介绍 Streams in QUIC provide a lightweight, ordered byte-stream abstraction to an application. Streams can be unidirectional or bidirectional. QUIC allows for an arbitrary number of streams to operate concurrently and for an arbitrary amount of da…

搞定ES6同步与异步机制、async/await的使用以及Promise的使用!

文章目录 同步和异步async/awaitPromisePromise的概念 同步和异步 ​ 同步&#xff1a;代码按照编写顺序逐行执行&#xff0c;后续的代码必须等待当前正在执行的代码完成之后才能执行&#xff0c;当遇到耗时的操作&#xff08;如网络请求等&#xff09;时&#xff0c;主线程会…

MySQL高级面试点

Explain语句结果中各个字段分别代表什么 id&#xff1a;查询语句没出现一个select关键字&#xff0c;MySQL就会给他分配一个唯一id select_type&#xff1a; select关键字对应哪个查询的类型 simple&#xff1a;简单的查询 不包含任何子查询 primary&#xff1a;查询中如果…

TongRDS 2214 docker版指引(by lqw )

文章目录 前言准备工作中心节点服务节点哨兵节点 前言 部署docker版本&#xff0c;建议先参考TongRDS2214手动部署版指引&#xff08;by lqwsy&#xff09; 在本地手动部署了一套适合业务场景的rds 服务后&#xff0c;再通过dockerfile 打镜像。 准备工作 1.准备对应的安装包…