rocketmq源码分析(一)broker启动remoting抽象

1. netty基础

2. broker启动

rocketmq-broker.puml

@startuml
BrokerStartup -> BrokerStartup: createBrokerController
BrokerStartup -> BrokerController : controller.initialize() 初始化BrokerController,new 出各种 NettyRemotingServer
BrokerController -> BrokerController : registerProcessor()  注册所有的processor ,rockt中processor负责处理各种的业务逻辑
BrokerStartup ->  BrokerController:  start() 讲各个nettyserver 启动起来
@enduml

核心逻辑registerProcessor

org.apache.rocketmq.remoting.netty.NettyRemotingServer#registerProcessor

 @Overridepublic void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {ExecutorService executorThis = executor;if (null == executor) {executorThis = this.publicExecutor;}Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);this.processorTable.put(requestCode, pair);}

其中processorTable 是超类NettyRemotingAbstract 的成员属性,用HashMap来维护;

    /*** This container holds all processors per request code, aka, for each incoming request, we may look up the* responding processor in this map to handle the request.*/protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

总结: 从NameServer 的创建启动到Broker的创建启动,整体来上看都是基于RemotingCommand出发来构建一个类命令模式的一个整体框架;  我们如果要使用netty 进行二次开发也建议参考rocketmq;包含netty 相关参数配置等等

3.remoting抽象

remoting包下面的netty 核心包核心类NettyRemotingServer  对所有的netty操作进行了封装;

@Overridepublic void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});prepareSharableHandlers();ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog()).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/274406.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用Tokeniser估算GPT和LLM服务的查询成本

将LLM集成到项目所花费的成本主要是我们通过API获取LLM返回结果的成本&#xff0c;而这些成本通常是根据处理的令牌数量计算的。我们如何预估我们的令牌数量呢&#xff1f;Tokeniser包可以有效地计算文本输入中的令牌来估算这些成本。本文将介绍如何使用Tokeniser有效地预测和管…

人工智能|机器学习——Canopy聚类算法(密度聚类)

1.简介 Canopy聚类算法是一个将对象分组到类的简单、快速、精确地方法。每个对象用多维特征空间里的一个点来表示。这个算法使用一个快速近似距离度量和两个距离阈值T1 > T2 处理。 Canopy聚类很少单独使用&#xff0c; 一般是作为k-means前不知道要指定k为何值的时候&#…

vue 下载的插件从哪里上传?npm发布插件详细记录

文章参考&#xff1a; 参考文章一&#xff1a; 封装vue插件并发布到npm详细步骤_vue-cli 封装插件-CSDN博客 参考文章二&#xff1a; npm发布vue插件步骤、组件、package、adduser、publish、getElementsByClassName、important、export、default、target、dest_export default…

linux ,Windows部署

Linux部署 准备好虚拟机 连接好查看版本&#xff1a;java -version安装jdk 解压命令&#xff1a;tar -zxvf 加jdk的压缩文件名cd /etc 在编辑vim profile文件 在最底下写入&#xff1a; export JAVA_HOME/root/soft/jdk1.8.0_151&#xff08;跟自己的jdk保持一致&#xff0…

初窥机器学习

人工智能 近几年来&#xff0c;人工智能&#xff08;AI&#xff09;已成为家喻户晓的术语&#xff0c;我们在游戏、电影&#xff08;还记得J.A.R.V.I.S吗&#xff1f;&#xff09;和书籍中经常看到它的提及和描绘&#xff0c;但人工智能究竟是什么呢&#xff1f; 人工智能简单…

go语言添加代理

LiteIDE 工具->管理 https://mirrors.aliyun.com/goproxy/或https://goproxy.cn,direct 命令行 go env -w GOPROXYhttps://goproxy.cn,direct

前端页面访问后台hiveserver2,阶段性报错

1、运行环境 Windows11下安装VMware&#xff0c;VMware下安装CentOS7 Linux系统&#xff0c;三台虚拟机集群部署hadoop&#xff0c;安装hive&#xff1b; 在Linux下安装Eclipse&#xff0c;创建maven工程&#xff0c;使用hive-jdbc-2.3.2访问hiveserver2 2、在windows11下&…

​如何防止网络攻击?

应对不同类型网络攻击的最佳途径是“知己”、“知彼”&#xff0c;在了解它们的工作原理、能够识别其手段、方法及意图的前提下&#xff0c;找出针对性的应对文案。今天&#xff0c;就为大家总结以下防止不同类型网络攻击的有效方法&#xff0c;希望无论是对个人、还是企业和组…

字节跳动也启动春季校园招聘了(含二面算法原题)

字节跳动 - 春招启动 随着各个大厂陆续打响春招的响头炮&#xff0c;字节跳动也官宣了春季校园招聘的正式开始。 还是那句话&#xff1a;连互联网大厂启动校招计划尚且争先恐后&#xff0c;你还有什么理由不马上行动&#xff1f;&#xff01; 先来扫一眼「春招流程」和「面向群…

RabbitMQ - 07 - 通过注解创建队列和交换机

之前消息模型的实现,都是通过rabbitMQ Management 控制台来手动创建 queue 和 exchange 的 在项目开发中有两种方式通过代码声明 创建 一种是通过 Bean 方式,这种代码量较大 稍繁琐 一种是通过注解的方式声明 先编写消费者代码 通过注解绑定了 消息队列,交换机,还有 routin…

​LeetCode解法汇总1261. 在受污染的二叉树中查找元素

目录链接&#xff1a; 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目&#xff1a; https://github.com/September26/java-algorithms 原题链接&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 描述&#xff1a; 给出一个满足下述规则的二叉树&#xff1…

小程序学习 1

pages/goods/search/home.wxml首页功能设定 1. loading入场 2. 下拉刷新 3. 搜索栏 4. 分类切换 5. 商品列表 6. 规格弹层 7. 加载更多 <view style"text-align: center; color: #b9b9b9" wx:if"{{pageLoading}}"><t-loading theme"circula…

每日一题——LeetCode2129.将标题首字母大写

方法一 个人方法 将字符串转为数组&#xff0c;遍历数组&#xff0c;对数组的每一个元素&#xff0c;先全部转为小写&#xff0c;如果当前元素长度大于2&#xff0c;将第一个字符转为大写形式 var capitalizeTitle function(title) {titletitle.split( )for(let i0;i<tit…

同学,请实现一个扫码登录

大概的流程图如下 主要涉及到的是pc端、手机端和后台服务端。由于听产品同事说手机端由原生端&#xff08;安卓和IOS&#xff09;来实现&#xff0c;因此我这边只需要开发pc端就行&#xff0c;工作量直接减半有没有。做过该功能的小伙伴肯定了解&#xff0c;pc端的实现还是比较…

python淘宝网页爬虫数据保存到 csv和mysql(selenium)

数据库连接设置&#xff08;表和字段要提前在数据库中建好&#xff09; # 数据库中要插入的表 MYSQL_TABLE goods# MySQL 数据库连接配置,根据自己的本地数据库修改 db_config {host: localhost,port: 3306,user: root,password: ma*****6,database: may2024,charset: utf8mb…

一体机电脑辐射超标整改

电脑一体机是目前台式机和笔记本电脑之间的一个新型的市场产物&#xff0c;它将主机部分、显示器部分整合到一起的新形态电脑&#xff0c;该产品的创新在于内部元件的高度集成。随着无线技术的发展&#xff0c;电脑一体机的键盘、鼠标与显示器可实现无线链接&#xff0c;机器只…

云打印下载,云打印怎么使用?

互联网的发展让许多实体业务都受到了强烈的冲击&#xff0c;这其中打印业务也是其中之一。在当前云打印技术的推广下&#xff0c;现在有越来越多有打印需求的用户都开始选择性价比更高、打印更方便的云打印服务了。那么云打印下载&#xff0c;云打印怎么使用&#xff1f;今天小…

AIGC——ComfyUI 安装与基础使用

简介 ComfyUI是一个基于节点流程的稳定扩散操作界面&#xff0c;通过流程实现了更加精准的工作流定制和完善的可复现性。每个模块都有特定的功能&#xff0c;我们可以通过调整模块连接来实现不同的出图效果。然而&#xff0c;节点式的工作流也提高了一定的使用门槛。同时&…

leetcode代码记录(有序数组两数之和

目录 1. 题目&#xff1a;2. 我的代码&#xff1a;小结&#xff1a; 1. 题目&#xff1a; 给定一个已按照 升序排列 的整数数组 numbers &#xff0c;请你从数组中找出两个数满足相加之和等于目标数 target 。 函数应该以长度为 2 的整数数组的形式返回这两个数的下标值。numb…

25.5 MySQL 聚合函数

1. 聚合函数 聚合函数(Aggregate Function): 是在数据库中进行数据处理和计算的常用函数. 它们可以对一组数据进行求和, 计数, 平均值, 最大值, 最小值等操作, 从而得到汇总结果.常见的聚合函数有以下几种: SUM: 用于计算某一列的数值总和, 可以用于整数, 小数或者日期类型的列…