# 消息中间件 RocketMQ 高级功能和源码分析(四)

消息中间件 RocketMQ 高级功能和源码分析(四)

一、 消息中间件 RocketMQ 源码分析:回顾 NameServer 架构设计。

1、RocketMQ 架构设计

消息中间件的设计思路一般是基于主题订阅发布的机制,消息生产者(Producer)发送某一个主题到消息服务器,消息服务器负责将消息持久化存储,消息消费者(Consumer)订阅该兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费者(Push模式)或者消费者主动向消息服务器拉去(Pull模式),从而实现消息生产者与消息消费者解耦。为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那消息生产者如何知道消息要发送到哪台消息服务器呢?如果某一台消息服务器宕机了,那么消息生产者如何在不重启服务情况下感知呢?

NameServer 就是为了解决以上问题设计的。

在这里插入图片描述

2、Broker 消息服务器

Broker 消息服务器在启动的时向所有 NameServer注册,消息生产者(Producer)在发送消息时之前先从 NameServer 获取 Broker 服务器地址列表,然后根据负载均衡算法从列表中选择一台服务器进行发送。NameServer与每台Broker保持长连接,并间隔30S检测 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中删除。但是路由变化不会马上通知消息生产者。这样设计的目的是为了降低 NameServer 实现的复杂度,在消息发送端提供容错机制保证消息发送的可用性。

3、NameServer 的高可用

NameServer 本身的高可用是通过部署多台 NameServer 来实现,但彼此之间不通讯,也就是 NameServer 服务器之间在某一个时刻的数据并不完全相同,但这对消息发送并不会造成任何影响,这也是 NameServer 设计的一个亮点,总之,RocketMQ 设计追求简单高效。

二、 消息中间件 RocketMQ 源码分析:NameServer 启动步骤一。

1、RocketMQ 源码分析: NameServer 启动流程

步骤一:

解析配置文件,填充 NameServerConfig、NettyServerConfig 属性值,并创建 NamesrvController。

步骤二

根据启动属性创建NamesrvController实例,并初始化该实例。NameServerController实例为NameServer核心控制器。

步骤三

在JVM进程关闭之前,先将线程池关闭,及时释放资源。

在这里插入图片描述

2、RocketMQ 源码 启动类:org.apache.rocketmq.namesrv.NamesrvStartup

步骤一:

解析配置文件,填充 NameServerConfig、NettyServerConfig 属性值,并创建 NamesrvController。

代码:NamesrvController#createNamesrvController


//创建NamesrvConfig
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//创建NettyServerConfig
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//设置启动端口号
nettyServerConfig.setListenPort(9876);
//解析启动-c参数
if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}
}
//解析启动-p参数
if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);
}
//将启动参数填充到namesrvConfig,nettyServerConfig
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);//创建NameServerController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

2、 NamesrvConfig 属性


private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false;

3、 rocketmqHome: rocketmq 主目录

kvConfig: NameServer 存储KV配置属性的持久化路径

configStorePath: nameServer 默认配置文件路径

orderMessageEnable: 是否支持顺序消息

4、NettyServerConfig 属性


private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;
private boolean useEpollNativeSelector = false;

listenPort: NameServer监听端口,该值默认会被初始化为9876
serverWorkerThreads: Netty业务线程池线程个数
serverCallbackExecutorThreads: Netty public任务线程池线程个数,Netty网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型未注册线程池,则由public线程池执行。
serverSelectorThreads: IO线程池个数,主要是NameServer、Broker端解析请求、返回相应的线程个数,这类线程主要是处理网路请求的,解析请求包,然后转发到各个业务线程池完成具体的操作,然后将结果返回给调用方;
serverOnewaySemaphoreValue: send oneway消息请求并发读(Broker端参数);
serverAsyncSemaphoreValue: 异步消息发送最大并发度;
serverChannelMaxIdleTimeSeconds : 网络连接最大的空闲时间,默认120s。
serverSocketSndBufSize: 网络socket发送缓冲区大小。
serverSocketRcvBufSize: 网络接收端缓存区大小。
serverPooledByteBufAllocatorEnable: ByteBuffer是否开启缓存;
useEpollNativeSelector: 是否启用Epoll IO模型。

三、 消息中间件 RocketMQ 源码分析:NameServer 启动步骤二。

1、步骤二

根据启动属性创建 NamesrvController 实例,并初始化该实例。NameServerController 实例为 NameServer 核心控制器

2、代码:NamesrvController#initialize

public boolean initialize() {//加载KV配置this.kvConfigManager.load();//创建NettyServer网络处理对象this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);//开启定时任务:每隔10s扫描一次Broker,移除不活跃的Brokerthis.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));this.registerProcessor();this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);//开启定时任务:每隔10min打印一次KV配置this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);return true;
}

四、 消息中间件 RocketMQ 源码分析:NameServer 启动步骤三。

1、步骤三

在 JVM 进程关闭之前,先将线程池关闭,及时释放资源。

2、代码:NamesrvStartup#start

//注册JVM钩子函数代码
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {@Overridepublic Void call() throws Exception {//释放资源controller.shutdown();return null;}
}));

# 消息中间件 RocketMQ 高级功能和源码分析(三)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/353792.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

9.华为交换机telnet远程管理配置aaa认证

目的&#xff1a;telnet远程管理设备 LSW1配置 [Huawei]int Vlanif 1 [Huawei-Vlanif1]ip add 1.1.1.1 24 [Huawei-Vlanif1]q [Huawei]user-interface vty 0 4 [Huawei-ui-vty0-4]authentication-mode aaa [Huawei-ui-vty0-4]q [Huawei]aaa [Huawei-aaa]local-user admin pass…

Webmin在EPICS IOC启动中的应用

本文使用webmin启动远程工控机中的EPICS IOC&#xff0c;受控设备使用PI公司的六轴台以及相应的控制器C-887&#xff1a; 1&#xff09;控制器C-887 2) 六轴台&#xff1a; 3&#xff09;在工控机上安装用于与C-887控制器进行通信的EPICS IOC程序&#xff0c;安装结束后&#…

聊一聊生成式AI

生成式AI&#xff08;Generative AI&#xff09;是指一类能够自主创造新内容的人工智能技术&#xff0c;这些内容可以是文本、图像、音频、视频等。与传统的分析性或分类性AI系统不同&#xff0c;生成式模型的主要任务不是对现有数据进行分类或预测&#xff0c;而是生成全新的、…

光电液位传感器在净水器领域的应用优势有哪些?

光电液位传感器作为一种先进的液位检测技术&#xff0c;在净水器领域有着显著的应用优势。具有高精度的特点&#xff0c;能够精确地检测水位变化&#xff0c;保证水处理过程的稳定性和效率。 传统的浮球式传感器可能存在精度偏差或者在长期使用中需要维护和更换的问题&#xf…

内网穿透方法有哪些?路由器端口映射外网和软件方案步骤

公网IP和私有IP不能互相通讯。我们通常在局域网内部署服务器和应用&#xff0c;当需要将本地服务提供到互联网外网连接访问时&#xff0c;由于本地服务器本身并无公网IP&#xff0c;就无法实现。这时候就需要内网穿透技术&#xff0c;即内网映射&#xff0c;内网IP端口映射到外…

从网络配置文件中提取PEAP凭据

我的一位同事最近遇到了这样一种情况&#xff1a;他可以物理访问使用802.1X连接到有线网络的Windows计算机&#xff0c;同时保存了用于身份验证的用户凭据&#xff0c;随后他想提取这些凭据&#xff0c;您可能认为这没什么特别的&#xff0c;但是事情却有点崎岖波折…… 如何开…

屏蔽房是做什么用的?为什么需要定期检测?

屏蔽房对于不了解的人来说&#xff0c;可能光看名字不知道是做什么的&#xff0c;但是对于一些企业或者机构&#xff0c;却是再熟悉不过的了。和名字一样&#xff0c;屏蔽房是对空间内的信号以及一些外界环境条件进行隔绝&#xff0c;在一些有特殊要求的企业机构中&#xff0c;…

人生的乐趣,在于对真知的追求

子曰&#xff1a;朝闻道&#xff0c;夕死可矣&#xff01; 孔子说&#xff1a;早上听到关于世界的真理&#xff0c;哪怕晚上就die了都可以。 这句话很有力量而经常被人引用&#xff0c;表达出我们如何看待沉重的肉身和精神世界。 我们的生活目的&#xff1a;道。 —— 要了解…

陶建辉入选 2023 年度“中国物联网行业卓越人物榜”

在这个技术飞速发展的时代&#xff0c;物联网行业作为推动社会进步的重要力量&#xff0c;正在不断地演化和革新。近日&#xff0c;中国智联网生态大会暨“2023 物联之星”年度榜单颁奖典礼在上海浦东举行。现场公布了拥有物联网行业奥斯卡奖之称的 ——“物联之星 2023 中国物…

鸿蒙 Text文本过长超出Row的范围问题

代码如下: 可以发现随着文本内容的增加, 第二个组件test2明显被挤出了屏幕外, 感觉像是Row自己对内容的约束没做好一样, 目前没看到官方的推荐解决方法, 机缘巧合下找到了个这种的办法, 给内容会增加的组件设置layoutWeight(), 借助layoutWeight的特性来解决该问题, 改动后代码…

探究 IP 地址被网站封禁的原因

在我们登录各种网站、APP浏览时&#xff0c;可能会遇到 IP 地址被某些网站封禁的情况。很多人奇怪这是为什么呢&#xff1f; 首先&#xff0c;违反网站的使用规则是比较常见的原因之一。比如&#xff0c;频繁发送垃圾邮件、恶意评论、进行网络攻击或试图破解网站的安全机制等不…

餐饮点餐系统小程序(ThinkPHP+FastAdmin+UniApp)

便捷美食新体验&#x1f354;&#x1f4f1; 基于ThinkPHPFastAdminUniApp开发的餐饮点餐系统&#xff0c;主要应用于餐饮&#xff0c;例如早餐、面馆、快餐、零食小吃等快捷扫码点餐需求&#xff0c;标准版本仅支持先付款后就餐模式&#xff0c;高级版本支持先付后就餐和先就餐…

【Docker系列】深入解析 Docker 容器部署脚本

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

git的分支管理

✨前言✨ &#x1f4d8; 博客主页&#xff1a;to Keep博客主页 &#x1f646;欢迎关注&#xff0c;&#x1f44d;点赞&#xff0c;&#x1f4dd;留言评论 ⏳首发时间&#xff1a;20246月19日 &#x1f4e8; 博主码云地址&#xff1a;博主码云地址 &#x1f4d5;参考书籍&#x…

003.Linux SSH协议工具

我 的 个 人 主 页&#xff1a;&#x1f449;&#x1f449; 失心疯的个人主页 &#x1f448;&#x1f448; 入 门 教 程 推 荐 &#xff1a;&#x1f449;&#x1f449; Python零基础入门教程合集 &#x1f448;&#x1f448; 虚 拟 环 境 搭 建 &#xff1a;&#x1f449;&…

【Linux 基础】目录结构

Linux 的目录结构&#xff08;也称为文件系统结构&#xff09;是组织文件和目录的一种逻辑方式。每个文件和目录在文件系统中都有一个唯一的位置或路径。 Linux文件系统是整个操作系统的基础架构&#xff0c;对于系统的稳定运行、数据安全以及用户操作便捷性至关重要&#xff0…

k8s快速上手实操

前言 Kubernetes&#xff08;简称K8s&#xff09;是由Google开源的一个用于自动化部署、扩展和管理容器化应用程序的系统。自2014年发布以来&#xff0c;Kubernetes已经迅速成长为容器编排领域的标准&#xff0c;并在全球范围内得到了广泛的采用和认可。 Kubernetes作为现代容…

全新量子计算技术!在硅中可以生成大规模量子比特

内容来源&#xff1a;量子前哨&#xff08;ID&#xff1a;Qforepost&#xff09; 文丨沛贤/浪味仙 排版丨沛贤 深度好文&#xff1a;1600字丨6分钟阅读 摘要&#xff1a;研究人员利用气体环境在硅中形成被称为“色心”的可编程缺陷&#xff0c;首次利用飞秒激光&#xff0c;…

有关排序的算法

目录 选择法排序 冒泡法排序 qsort排序&#xff08;快速排序&#xff09; qsort排序整型 qsort排序结构体类型 排序是我们日常生活中比较常见的问题&#xff0c;这里我们来说叨几个排序的算法。 比如有一个一维数组 arr[8] {2,5,3,1,7,6,4,8},我们想要把它排成升序&#…

力扣1901.寻找峰值II

力扣1901.寻找峰值II 二分每一行 并用函数找出每一行中最大值的下标若最大值比其下面相邻的元素大 则上方一定存在峰值若最大值比其下面相邻的元素小 则下方一定存在峰值 class Solution {int indexmax(vector<int> &a){return max_element(a.begin(),a.end()) - …