Pulsar服务端处理消费者请求以及源码解析

文章目录

  • 引言
  • 正文
  • 处理消费请求
  • 回调处理
  • 总结

引言

处理读写是Pulsar服务端最基本也是最重要的逻辑,今天就重点看看服务端是如何处理的读请求也就是消费者请求

正文

Pulsar服务端处理消费者请求的流程大致如下图所示
在这里插入图片描述

  1. 消费者通过TCP向服务端发起消息拉取请求
  2. Broker会根据请求中携带的ID来获取在服务端对应的Consumer对象,每个Consumer对象都有一个对应的游标对象,这个游标对象会调用Dispatcher来做数据查询的操作
  3. Dispatcher会先尝试读取缓存,这个缓存是个跳表结构并且节点数据是存在堆外内存中的,如果命中则直接返回
  4. 未命中缓存的话会通过Bookkeeper客户端去读取Bookkeeper中的数据,读取到后会通过跟客户端所建立的TCP连接将查到的数据发送过去

整体流程就是这四步,接下来就让咱们看看Pulsar的代码实现吧

处理消费请求

Broker处理的请求基本都是从ServerCnx这里开始的,因为它实现了Netty的ChannelInboundHandlerAdapter类,因此所有TCP的数据写进来时最终都是ServerCnx进行处理的,处理消费的请求时从handleFlow方法开始,因此从这里进行跟踪

    protected void handleFlow(CommandFlow flow) {....//从当前Broker维护的Consumer列表中获取客户端对应服务端的Consumer对象CompletableFuture<Consumer> consumerFuture = consumers.get(flow.getConsumerId());if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {Consumer consumer = consumerFuture.getNow(null);if (consumer != null) {//传入客户端配置的拉取条数,最大默认不会超过1000consumer.flowPermits(flow.getMessagePermits());} else {log.info("[{}] Couldn't find consumer {}", remoteAddress, flow.getConsumerId());}}}public void flowPermits(int additionalNumberOfMessages) {....// 处理消息拉取请求,继续跟进去看看subscription.consumerFlow(this, additionalNumberOfMessages);}public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {this.lastConsumedFlowTimestamp = System.currentTimeMillis();//最终调用者是dispatcherdispatcher.consumerFlow(consumer, additionalNumberOfMessages);}

Dispatcher是个接口,在这里选择PersistentDispatcherSingleActiveConsumer的实现进行跟踪

    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {//作为一个任务交给线程池处理executor.execute(() -> internalConsumerFlow(consumer));}private synchronized void internalConsumerFlow(Consumer consumer) {//进行消息的读取readMoreEntries(consumer);}private void readMoreEntries(Consumer consumer) {....//通过游标进行数据的读取cursor.asyncReadEntriesOrWait(messagesToRead,bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition());}

PersistentDispatcherSingleActiveConsumer最终会调用ManagedCursorImpl进行数据的读取,这里要注意PersistentDispatcherSingleActiveConsumer实现了回调接口,也就是它自身实现了数据读取成功的处理逻辑。这里它将自己作为参数传给下一层用于在读取成功后进行回调处理,这也是最常见的异步回调设计方式。

继续跟踪ManagedCursorImpl的数据读取逻辑

    public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) {asyncReadEntriesWithSkipOrWait(maxEntries, maxSizeBytes, callback, ctx, maxPosition, null);}public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,Object ctx, PositionImpl maxPosition,Predicate<PositionImpl> skipCondition) {....// 读取数据asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx,maxPosition, skipCondition);}public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {// 封装第二层回调OpReadEntry op =OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition);//核心方法,从这里进去读取ledger.asyncReadEntries(op);
}void asyncReadEntries(OpReadEntry opReadEntry) {....internalReadFromLedger(currentLedger, opReadEntry);....}private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {....// 进行数据读取asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);}protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry,Object ctx) {if (config.getReadEntryTimeoutSeconds() > 0) {....// 封装第三层回调ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,opReadEntry, readOpCount, createdTime, ctx);lastReadCallback = readCallback;// 尝试从缓存中读取数据,继续跟踪进去entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(),readCallback, readOpCount);} else {entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry,ctx);}}

entryCache有RangeEntryCacheImpl和EntryCacheDisabled两种实现,EntryCacheDisabled相当于不走缓存直接查Bookkeeper,而RangeEntryCacheImpl是会尝试去读取Broker自身的缓存,这里跟着RangeEntryCacheImpl看看实现

    public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,final ReadEntriesCallback callback, Object ctx) {//跟进去看asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx);}void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,final ReadEntriesCallback callback, Object ctx) {//一样,继续跟踪看asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null);}void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) {....// 缓存实现是ConcurrentSkipListMap value是堆外内存Collection<EntryImpl> cachedEntries = entries.getRange(firstPosition, lastPosition);....//如果全部命中缓存则直接返回,否则往下走// 从bookkeeper读pendingReadsManager.readEntries(lh, firstEntry, lastEntry,shouldCacheEntry, callback, ctx);}void readEntries(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {....//从Bookkeeper进行数据的读取CompletableFuture<List<EntryImpl>> readResult = rangeEntryCache.readFromStorage(lh, firstEntry,lastEntry, shouldCacheEntry);}CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,long firstEntry, long lastEntry, boolean shouldCacheEntry) {....//这里的lh其实就是Bookkeeper的客户端对象LedgerHandleCompletableFuture<List<EntryImpl>> readResult = lh.readAsync(firstEntry, lastEntry)....}

到这里基本就到了Bookkeeper的内部逻辑了,Bookkeeper相关的后面在单独进行分析。读取逻辑基本就到这了,肯定会有伙伴疑惑🤔,读到数据后怎么将数据发给客户端/消费者呢?请继续往下看

回调处理

刚刚进行代码跟踪的时候应该都看到流程中封住了好几个回调函数,这里就拎最重要的也就是PersistentDispatcherSingleActiveConsumer进行讨论,这里直接从它的回调方法readEntriesComplete进行跟踪

    public void readEntriesComplete(final List<Entry> entries, Object obj) {//作为任务放到线程池去执行executor.execute(() -> internalReadEntriesComplete(entries, obj));}private synchronized void internalReadEntriesComplete(final List<Entry> entries, Object obj) {....//分派数据到消费者dispatchEntriesToConsumer(currentConsumer, entries, batchSizes, batchIndexesAcks, sendMessageInfo, epoch);}protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> entries,EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,SendMessageInfo sendMessageInfo, long epoch) {//将查到的消息通过TCP写到消费者端currentConsumer.sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),redeliveryTracker, epoch)....}public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,EntryBatchIndexesAcks batchIndexesAcks,int totalMessages, long totalBytes, long totalChunkedMessages,RedeliveryTracker redeliveryTracker, long epoch) {....//通过PulsarCommandSenderImpl进行消息发送,继续跟踪进去Future<Void> writeAndFlushPromise = cnx.getCommandSender().sendMessagesToConsumer(....);....}public ChannelPromise sendMessagesToConsumer(....) {....//通过Netty的TCP将查到的消息数据写到客户端ctx.write(....);....}

到这里基本上服务端的事情就结束了,剩余的其他几个回调函数感兴趣的伙伴可以自行跟踪。

总结

可以看到Pulsar里大量使用了异步回调处理,这样的设计在高并发场景大幅提升服务的性能,尽可能的避免了存在瓶颈的地方。不过带来的另一影响是,代码跟踪起来相对来说容易“迷路”,因此掌握好异步设计的逻辑是很有必要的,可以帮助我们更好的跟踪Pulsar的代码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/302330.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

idea开发 java web 高校学籍管理系统bootstrap框架web结构java编程计算机网页

一、源码特点 java 高校学籍管理系统是一套完善的完整信息系统&#xff0c;结合java web开发和bootstrap UI框架完成本系统 &#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 前段主要技术 css jq…

使用git 和 github协作开发

文章目录 github浏览器汉化插件github新建仓库git安装以及ssh配置团队创建及基本命令的使用创建团队基本命令 分支管理快速切换远程仓库地址 如何使用git && github进行协作开发&#xff0c;包括git常见基础命令 github浏览器汉化插件 在刚开始使用github的时候&#…

openGauss 5.0 单点企业版部署_Centos7_x86(上)

背景 通过openGauss提供的脚本安装时&#xff0c;只允许在单台物理机部署一个数据库系统。如果您需要在单台物理机部署多个数据库系统&#xff0c;建议您通过命令行安装&#xff0c;不需要通过openGauss提供的安装脚本执行安装。 本文档环境&#xff1a;CentOS7.9 x86_64 4G1…

物联网数据服务平台

随着物联网技术的迅猛发展&#xff0c;海量数据的产生和应用成为推动工业数字化转型的核心动力。在这个数据为王的时代&#xff0c;如何高效地收集、处理、分析并应用这些数据&#xff0c;成为了企业关注的焦点。物联网数据服务平台应运而生&#xff0c;为企业提供了全面、高效…

HTML - 请你说一下如何阻止a标签跳转

难度级别:初级及以上 提问概率:55% a标签的默认语义化功能就是超链接,HTML给它的定位就是与外部页面进行交流,不过也可以通过锚点功能,定位到本页面的固定id区域去。但在开发场景中,又避免不了禁用a标签的需求,那么都有哪些方式可以禁用…

IEC101、IEC103、IEC104、Modbus报文解析工具

一、概述 国际电工委员会第57技术委员会&#xff08;IEC TC57&#xff09;1995年出版IEC 60870-5-101后&#xff0c;得到了广泛的应用。为适应网络传输&#xff0c;2000年IEC TC57又出版了IEC 60870-5-104&#xff1a;2000《远东设备及系统 第5-104部分&#xff1a;传输规约-采…

基于SpringBoot+Vue+Mysql的图书管理系统

博主介绍&#xff1a; 大家好&#xff0c;本人精通Java、Python、C#、C、C编程语言&#xff0c;同时也熟练掌握微信小程序、Php和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我有丰富的成品Java、Python、C#毕设项目经验&#xff0c;能够为学生提供各类…

C++手撕红黑树

文章目录 红黑树概念性质&#xff08;条件限制&#xff09;节点的定义红黑树的结构红黑树的插入cur为红&#xff0c;p为红&#xff0c;g为黑&#xff0c;u存在且为红cur为红&#xff0c;p为红&#xff0c;g为黑&#xff0c;u不存在或u为黑&#xff0c;插入到p对应的一边cur为红…

深度评测2024年热门婴儿洗衣机,鲸立、希亦、小吉等品牌一网打尽!

为人父母&#xff0c;是一件非常美妙的事情&#xff0c;在养育新生命的过程中&#xff0c;细心的照顾是非常重要的&#xff0c;而最小的细节&#xff0c;就是让婴儿的衣服保持最温和、最有效的清洁。而婴儿洗衣机是当今不少家庭的福音&#xff0c;它给家长们带来了巨大的方便&a…

R语言数据可视化:ggplot2绘图系统

ggpolt2绘图系统被称为R语言中最高大上的绘图系统&#xff0c;使用ggplot2绘图系统绘图就像是在使用语法创造句子一样&#xff0c;把数据映射到几何客体的美学属性上。因此使用ggplot2绘图系统的核心函数ggplot来绘图必须具备三个条件&#xff0c;数据data&#xff0c;美学属性…

力扣HOT100 - 560. 和为k的子数组

解题思路&#xff1a; 方法一&#xff1a;枚举 class Solution {public int subarraySum(int[] nums, int k) {int cnt 0;for (int start 0; start < nums.length; start) {int sum 0;//注意开始位置for (int end start; end < nums.length; end) {sum nums[end];…

软件设计师知识点-1

串行的计算公式为&#xff1a;(取值时间分析时间执行时间) x 指令的个数 流水线的计算公式为&#xff1a;单条指令的执行时间 (n-1) x 流水线周期 n的意思为指令的个数&#xff0c;流水线周期的意思为取值&#xff0c;分析&#xff0c;执行三条执行过程中花费时间最多的那条…

VSCode配置AI自动补全插件Tabnine

面向软件开发人员的 AI 助手 使用 AI 代码完成更快地编写代码 什么是Tabnine Tabnine 是一款 AI 代码助手&#xff0c;可让您成为更好的开发人员。Tabnine 将通过所有最流行的编码语言和 IDE 的实时代码完成、聊天和代码生成来提高您的开发速度。 无论您将其称为 IntelliSens…

代码随想录阅读笔记-二叉树【二叉搜索树转换为累加树】

题目 给出二叉 搜索 树的根节点&#xff0c;该树的节点值各不相同&#xff0c;请你将其转换为累加树&#xff08;Greater Sum Tree&#xff09;&#xff0c;使每个节点 node 的新值等于原树中大于或等于 node.val 的值之和。 提醒一下&#xff0c;二叉搜索树满足下列约束条件&…

Java绘图坐标体系

一、介绍 下图说明了Java坐标系。坐标原点位于左上角&#xff0c;以像素为单位。在Java坐标系中&#xff0c;第一个是x坐标&#xff0c;表示当前位置为水平方向&#xff0c;距离坐标原点x个像素&#xff1b;第二个是y坐标&#xff0c;表示当前位置为垂直方向&#xff0c;距离坐…

西门子PLC(S7-200 SMART)学习笔记1:初识PLC可编程逻辑器件

今日开始我的西门子PLC学习之路&#xff0c;学习的型号以S7-200 SMART为主 主要认识一下PLC是什么、型号怎么看、 通信相关、编程软件、构造及工作原理 目录 西门子官方PLC手册获取&#xff1a; 1、PLC可编程逻辑器件的基本认识&#xff1a; PLC的结构及各部分的作用&#xff…

threejs 基础知识点汇总

threejs 基础知识点汇总 之前写了几篇博文&#xff0c;但是我觉得写的不好&#xff0c;我今天再补充一篇还不好的&#xff0c;把基础知识点汇总一下&#xff0c;不写运行的代码了&#xff0c;只写关键代码&#xff0c;但是看了之前我写的那几篇&#xff0c;看这篇的话问题其实不…

群晖NAS使用Docker部署Potopea在线图片编辑工具并实现公网访问

文章目录 1. 部署Photopea2. 运行Photopea3. 群晖安装Cpolar4. 配置公网地址5. 公网访问测试6. 固定公网地址 本文主要介绍如何在群晖NAS使用Docker部署Potopea在线图片编辑工具&#xff0c;并结合cpolar内网穿透实现公网环境可以远程访问本地部署的Potopea. Photopea是一款强大…

伺服电机的惯性

一、伺服电机的惯性 伺服电机的惯性主要指电机及其连接的负载的惯性。它是通过将物体的质量与其距离旋转轴的平方相乘得到的。对于伺服电机来说&#xff0c;惯性体现了电机和负载对速度和加速度变化的阻力程度&#xff0c;即其惯性越大&#xff0c;对速度和加速度变化的阻…

人工智能_大模型023_AssistantsAPI_01_OpenAI助手的创建_API的调用_生命周期管理_对话服务创建---人工智能工作笔记0159

先来说一下一些问题: 尽量不要微调,很麻烦,而且效果需要自己不断的去测试. 如果文档中有图表,大量的图片去分析就不合适了. 是否用RAG搜索,这个可以这样来弄,首先去es库去搜能直接找到答案可以就不用去RAG检索了,也可以设置一个分,如果低于60分,那么就可以去进行RAG检索 微…