RocketMQ源码 Broker-SubscriptionGroupManager 订阅组管理组件源码分析

前言

SubscriptionGroupManager 继承了ConfigManager配置管理组件,拥有将内存数据持久化到磁盘文件subscriptionGroup.json的能力。它主要负责维护所有消费组在内存中的订阅数据。


源码版本:4.9.3

源码架构图

核心数据结构

主要的数据结构比较简单,维护了Map<消费组名称, 订阅组配置>的映射关系。

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);// Map<消费组名称,订阅组配置>private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);// 内存数据版本号private final DataVersion dataVersion = new DataVersion();
}

深入看下SubscriptionGroupConfig 的数据结构。

public class SubscriptionGroupConfig {// 消费组名称private String groupName;// 是否开启消费private boolean consumeEnable = true;// 是否允许消费最早消息private boolean consumeFromMinEnable = true;// 是否允许广播消费private boolean consumeBroadcastEnable = true;// 重试队列数private int retryQueueNums = 1;// 重试最大次数private int retryMaxTimes = 16;// brokerIdprivate long brokerId = MixAll.MASTER_ID;// 当产生慢消费时,选择第几个brokerprivate long whichBrokerWhenConsumeSlowly = 1;// 是否通知消费者ids变化private boolean notifyConsumerIdsChangedEnable = true;
}

核心数据行为

数据行为主要都是对上面提到的数据结构的维护,代码 + 注释如下:

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {public SubscriptionGroupManager() {this.init();}public SubscriptionGroupManager(BrokerController brokerController) {this.brokerController = brokerController;this.init();}private void init() {{// 初始化系统消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化过滤服务消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化自测消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化http代理消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_PULL消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true); // 激活广播模式this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_PERMISSION消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_OWNER消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);}}// 更新订阅配置,且更新内存数据版本号public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);if (old != null) {log.info("update subscription group config, old: {} new: {}", old, config);} else {log.info("create new subscription group, {}", config);}this.dataVersion.nextVersion();this.persist();}// 失效消费组public void disableConsume(final String groupName) {SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);if (old != null) {old.setConsumeEnable(false);this.dataVersion.nextVersion();}}// 查找指定消费组的订阅配置public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}this.dataVersion.nextVersion();this.persist();}}return subscriptionGroupConfig;}// 将内存数据结构编码成字符串@Overridepublic String encode() {return this.encode(false);}// 获取配置文件路径@Overridepublic String configFilePath() {return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());}// 从字符串中恢复数据,写回内存数据结构@Overridepublic void decode(String jsonString) {if (jsonString != null) {SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);if (obj != null) {this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);this.dataVersion.assignNewOne(obj.dataVersion);this.printLoadDataWhenFirstBoot(obj);}}}// 将内存数据结构编码成字符串public String encode(final boolean prettyFormat) {return RemotingSerializable.toJson(this, prettyFormat);}// 当第一次启动时,打印加载数据时的日志private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();while (it.hasNext()) {Entry<String, SubscriptionGroupConfig> next = it.next();log.info("load exist subscription group, {}", next.getValue().toString());}}public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {return subscriptionGroupTable;}public DataVersion getDataVersion() {return dataVersion;}// 删除指定消费组的订阅配置public void deleteSubscriptionGroupConfig(final String groupName) {SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);if (old != null) {log.info("delete subscription group OK, subscription group:{}", old);this.dataVersion.nextVersion();this.persist();} else {log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/218041.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云服务器Centos中安装Docker

云服务器Centos中安装Docker 1 简介DockerCentosCentos和Ubuntu区别 2 安装3 测试hello-world的镜像测试 1 简介 Docker Docker是一个开源的应用容器引擎&#xff0c;利用操作系统本身已有的机制和特性&#xff0c;可以实现远超传统虚拟机的轻量级虚拟化。它支持将软件编译成…

【AI】如何准备mac开发vue项目的环境

为了在Mac上开发Vue项目&#xff0c;你需要准备一些工具和环境。以下是主要的步骤&#xff1a; 安装Node.js和npm&#xff1a; Vue.js是一个基于JavaScript的框架&#xff0c;因此你需要Node.js环境。访问Node.js官网下载并安装Node.js&#xff0c;这也会自动安装npm&#xff0…

事务的四个特性、四个隔离级别以及数据库的常用锁

事务的四个特性、四个隔离级别以及数据库的常用锁 四大特性 事务的四大特性&#xff0c;通常被称为ACID特性&#xff0c;是数据库管理系统&#xff08;DBMS&#xff09;确保事务处理的关键属性。这四大特性分别是&#xff1a; 原子性&#xff08;Atomicity&#xff09;&#x…

HHDESK个性化脚本功能

HHDESK可以把脚本配置在对话框中&#xff0c;生成按钮&#xff0c;便捷操作。 在界面下方的脚本框中&#xff0c;点击“”&#xff0c;选择新建&#xff1b; 随后在弹出框内填写名称及脚本&#xff0c;按需求选择填写参数&#xff0c;及运行过程中是否弹出参数框&#xff1b;…

Linux实用操作-上篇

Linux实用操作-下篇&#xff1a;Linux实用操作篇-下篇-CSDN博客 一、各类小技巧&#xff08;快捷键&#xff09; 1.1 ctrl c 强制停止 Linux某些程序的运行&#xff0c;如果想要强制停止它&#xff0c;可以使用快捷键ctrl c 命令输入错误&#xff0c;也可以通过快捷键ctr…

如何了解蜘蛛池蚂蚁SEO

蜘蛛池是一种基于搜索引擎优化的技术手段&#xff0c;通过模拟蜘蛛爬行行为来提高网站在搜索引擎中的排名&#xff0c;从而增加网站的流量和曝光率。 编辑搜图 如何联系蚂蚁seo&#xff1f; baidu搜索&#xff1a;如何联系蚂蚁SEO&#xff1f; baidu搜索&#xff1a;如何联…

Shell三剑客:文本过滤工具——grep

一、简介&#xff1a;过滤&#xff0c;查找文档中的内容 二、分类 grepegrep——扩展支持正则\w所有字母与数字&#xff0c;称为字符[a-zA-Z0-9] l[a-zA-Z0-9]*ve l\w*ve\W所有字母与数字之外的字符&#xff0c;称为非字符 love[^a-zA-Z0-9] love\W\b词边界 \<love\>…

优先考虑泛型

Java中的泛型&#xff08;Generics&#xff09;提供了一种参数化类型的机制&#xff0c;使得你可以编写更灵活、类型安全的代码。下面是一个例子&#xff0c;说明在Java中优先考虑泛型的好处&#xff1a; 考虑一个简单的容器类&#xff0c;它可以存储任意类型的元素&#xff0…

机器人、智能小车常用的TT电机/310电机/370电机选型对比

在制作智能小车或小型玩具时&#xff0c;在电机选型上一些到各种模糊混淆的概念&#xff0c;以及各种错综复杂的电机参数&#xff0c;本文综合对比几种常用电机的参数及特性适应范围&#xff0c;以便快速选型&#xff0c;注意不同生产厂家的电机参数规则会有较大差异。 普通TT…

调用win32 api获取电脑名字和系统目录

学习一下几个函数的功能&#xff0c;和调用方式&#xff1b; void CBasenameView::OnDraw(CDC* pDC) {CBasenameDoc* pDoc GetDocument();ASSERT_VALID(pDoc);// TODO: add draw code for native data hereCString str1;TCHAR myname1[50], myname2[50], mydirname1[50], myd…

dp入门:从记忆化搜索到递推 灵神[基础算法精讲17]

198. 打家劫舍 链接 : 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 解决 : 1.记忆化搜索(自顶向下) ; class Solution { public:int rob(vector<int>& nums) {// 记忆化搜索int n nums.size();vector<int> memo(n,-1); //…

Doris学习笔记

目录 简介 特点 MPP数据库 PB和EB都是用来衡量数据存储量的单位。 秒级响应 Google Mesa Apache Impala 支持标准sql且兼容mysql协议 ROLAP OLAP&#xff08;On-Line Analytical Processing&#xff0c;联机分析处理&#xff09; ROLAP&#xff08;Relational On-Line An…

【PWN】学习笔记(三)【返回导向编程】(中)

目录 课程回顾动态链接过程 课程 课程链接&#xff1a;https://www.bilibili.com/video/BV1854y1y7Ro/?vd_source7b06bd7a9dd90c45c5c9c44d12e7b4e6 课程附件&#xff1a; https://pan.baidu.com/s/1vRCd4bMkqnqqY1nT2uhSYw 提取码: 5rx6 回顾 管道符 | 把前一个指令的输出作…

最新盲盒交友脱单系统源码

盲盒交友脱单系统源码&#xff0c;学校 爱好 城市 地区 星座等等&#xff0c;首页轮转广告&#xff0c;页面美化&#xff0c;首页两款连抽高质量底部连抽&#xff0c;后台选择开关&#xff0c;邀请奖励爱心或者&#xff0c;提现达到金额有提成奖励&#xff0c;二级分销&#xf…

canvas基本绘制对象

目录 绘制画布 设置画布 绘制圆形 绘制矩形填充渐变色 绘制文字及文字样式 绘制画布 <canvas id"canvas" width"800" height"600"></canvas> 设置画布 //获得画布元素var canvasdocument.getElementById(canvas);var ctxca…

Python求小于m的最大10个素数

为了找到小于m的最大10个素数&#xff0c;我们首先需要确定m的值。然后&#xff0c;我们可以使用一个简单的算法来检查每一个小于m的数字是否是素数。 下面是一个Python代码示例&#xff0c;可以找到小于m的最大10个素数&#xff1a; def is_prime(n): if n < 1: …

DAP数据集成与算法模型如何结合使用

企业信息化建设会越来越完善&#xff0c;越来越体系化&#xff0c;当今数据时代背景下更加强调、重视数据的价值&#xff0c;以数据说话&#xff0c;通过数据为企业提升渠道转化率、改善企业产品、实现精准运营&#xff0c;为企业打造自助模式的数据分析成果&#xff0c;以数据…

乐益达教育网页

目录 一、网页效果 二、html代码 三、CSS代码 四、JS代码 一、网页效果 二、html代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, in…

excel数据重复率怎么计算【保姆教程】

大家好&#xff0c;今天来聊聊excel数据重复率怎么计算&#xff0c;希望能给大家提供一点参考。 以下是针对论文重复率高的情况&#xff0c;提供一些修改建议和技巧&#xff1a; excel数据重复率怎么计算 在Excel中计算数据重复率可以通过以下步骤实现&#xff1a; 1. 确定重复…

priority_queue的实现,容器和仿函数

首先我们要实现priority_queue就必须要了解其底层&#xff0c;本质其实就是堆排序&#xff0c;大根堆就是升序排序&#xff0c;小根堆就是降序排序。 原因是因为&#xff0c;我们堆排序取元素可以将堆顶和最后一个元素交换&#xff0c;然后让堆顶下沉&#xff0c;这样可以维护…