Apache Pulsar源码解析之Lookup机制

文章目录

  • 引言
  • Lookup是什么
  • 客户端实现原理
  • 服务端实现原理
  • 总结

引言

在学习Pulsar一段时间后,相信大家也或多或少听说Lookup这个词,今天就一起来深入剖析下Pulsar是怎么设计的它吧

Lookup是什么

在客户端跟服务端建立TCP连接前有些信息需要提前获取,这个获取方式就是Lookup机制。所获取的信息有以下几种

  • 应该跟哪台Broker建立连接
  • Topic的Schema信息
  • Topic的分区信息

其中第一个是最重要的,因此今天就针对第一点进行深入剖析,大致流程如下图
在这里插入图片描述

  1. 在创建生产者/消费者时会触发Lookup,一般是通过HTTP请求Broker来获取目标Topic所归属的Broker节点信息,这样才知道跟哪台机器建立TCP连接进行数据交互
  2. Broker接收到Lookup命令,此时会进行限流检查、身份/权限认证、校验集群等检测动作后,根据请求中携带的Namespace信息获取对应的Namespace对象进行处理,这里Namespace会对Topic进行哈希运算并判断它落在数组的哪一个节点,算出来后就根据数组的信息来从Bundle数组中获得对应的Bundle,这个过程其实就是一致性哈希算法寻址过程。
  3. 在获得Bundle后会尝试从本机Cache中查询该Bundle所归属的Broker信息。
  4. 如果在Cache中没有命中,则会去Zookeeper中进行读取,如果发现该Bundle还未归属Broker则触发归属Broker的流程
  5. 获取到该Topic所归属的Broker信息后返回给客户端,客户端解析结果并跟所归属的Broker建立TCP连接,用于后续生产者往Broker节点进行消息写入

补充说明确定Bundle的归属,如果Broker的loadManager使用的是中心化策略,则需要Broker Leader来当裁判决定,否则当前Broker就可当作裁判。虽然Broker是无状态的,但会通过Zookeeper选举出一个Leader用于监控负载、为Bundle分配Broker等事情,裁判Broker通过loadManager查找负载最低的Broker并把Bundle分配给它。

客户端实现原理

Lookup机制是由客户端发起的,在创建生产者/消费者对象时会初始化网络连接,以生产者代码为例进行跟踪看看。无论是创建分区还是非分区生产者,最终都会走到ProducerImpl的构造函数,就从这里开始看吧

   public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,ProducerInterceptors interceptors, Optional<String> overrideProducerName) {....//这里进去就是创建跟Broker的网络连接grabCnx();}void grabCnx() {//实际上是调用ConnectionHandler进行的this.connectionHandler.grabCnx();}protected void grabCnx(Optional<URI> hostURI) {....//这里是核心,相当于最终又调用回PulsarClientImpl类的getConnection方法cnxFuture = state.client.getConnection(state.topic, (state.redirectedClusterURI.toString()));....}public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {TopicName topicName = TopicName.get(topic);//看到方法名就知道到了Lookup的时候了,所以说好的命名远胜于注释return getLookup(url).getBroker(topicName).thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));}public LookupService getLookup(String serviceUrl) {return urlLookupMap.computeIfAbsent(serviceUrl, url -> {try {//忽略其他的,直接跟这里进去return createLookup(serviceUrl);} catch (PulsarClientException e) {log.warn("Failed to update url to lookup service {}, {}", url, e.getMessage());throw new IllegalStateException("Failed to update url " + url);}});}public LookupService createLookup(String url) throws PulsarClientException {//这里可以看到如果咱们在配置客户端的地址是http开头就会通过http方式进行Loopup,否则走二进制协议进行查询if (url.startsWith("http")) {return new HttpLookupService(conf, eventLoopGroup);} else {return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(),externalExecutorProvider.getExecutor());}}public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)throws PulsarClientException {//进到可能会误会Pulsar是通过HttpClient工具包进行的HTTP通信,继续看HttpClient构造函数this.httpClient = new HttpClient(conf, eventLoopGroup);this.useTls = conf.isUseTls();this.listenerName = conf.getListenerName();}protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {....//可以看到实际上最终是调用的AsyncHttpClient进行HTTP通信,这是一个封装Netty的async-http-client-2.12.1.jar的外部包httpClient = new DefaultAsyncHttpClient(config);....}

通过上面可以看到Lookup服务已经完成初始化,接下来就来看看客户端如何发起Lookup请求,回到PulsarClientImpl的getConnection方法,可以看到这里是链式调用,上面是从getLookup看到了其实是对Lookup进行初始化的过程,那么接下来就跟踪getBroker方法看看是怎么获取的服务端信息

    public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {TopicName topicName = TopicName.get(topic);return getLookup(url).getBroker(topicName).thenCompose(lookupResult -> getConnection(lookupResult.getLogicalAddress(),lookupResult.getPhysicalAddress(), cnxPool.genRandomKeyToSelectCon()));}public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {//判断访问哪个版本的接口String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;String path = basePath + topicName.getLookupName();path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);//获取要访问的Broker地址return httpClient.get(path, LookupData.class).thenCompose(lookupData -> {URI uri = null;try {//解析服务端返回的数据,本质上就是返回的就是Topic所在Broker的节点IP+端口InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());//HTTP通过Lookup方式访问服务端绝对不会走代理return CompletableFuture.completedFuture(new LookupTopicResult(brokerAddress, brokerAddress,false /* HTTP lookups never use the proxy */));} catch (Exception e) {....}});}public class LookupTopicResult {//LookupTopicResult是查询Topic归属Broker的结果后包装的一层结果,可以看到这里其实就是Socket信息也就是IP+端口private final InetSocketAddress logicalAddress;private final InetSocketAddress physicalAddress;private final boolean isUseProxy;
}

客户端的流程走到这里基本就结束了,是否有些意犹未尽迫不及待的想知道服务端又是怎么处理的?那么就看看下一节

服务端实现原理

服务端的入口在TopicLookup类的lookupTopicAsync方法,服务端大致步骤是这样的:1. 获取Topic所归属的Bundle 2. 查询Bundle所归属的Broker 3. 返回该Broker的url

    public void lookupTopicAsync(@Suspended AsyncResponse asyncResponse,@PathParam("topic-domain") String topicDomain, @PathParam("tenant") String tenant,@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,@QueryParam("listenerName") String listenerName,@HeaderParam(LISTENERNAME_HEADER) String listenerNameHeader) {TopicName topicName = getTopicName(topicDomain, tenant, namespace, encodedTopic);if (StringUtils.isEmpty(listenerName) && StringUtils.isNotEmpty(listenerNameHeader)) {listenerName = listenerNameHeader;}//可以看得到这里是获取Lookup的,跟踪进去看看internalLookupTopicAsync(topicName, authoritative, listenerName).thenAccept(lookupData -> asyncResponse.resume(lookupData)).exceptionally(ex -> {....});}protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName topicName, boolean authoritative, String listenerName) {
CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()//获得目标Broker地址, 继续从这里进去.getBrokerServiceUrlAsync(topicName,LookupOptions.builder().advertisedListenerName(listenerName).authoritative(authoritative).loadTopicsInBundle(false).build());}public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {long startTime = System.nanoTime();// 获取这个Topic所归属的BundleCompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic).thenCompose(bundle -> {//根据获得的bundle信息查询归属的Brokerreturn findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {//如果findRedirectLookupResultAsync方式没查到则走这里进行查询return findBrokerServiceUrl(bundle, options); });});future.thenAccept(optResult -> {....}).exceptionally(ex -> {....});return future;}

先看看是怎么获取Topic所归属的Bundle的吧,就从getBundleAsync方法跟踪进去

    public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {return bundleFactory.getBundlesAsync(topic.getNamespaceObject())//直接看findBundle,命名意思已经很清晰了.thenApply(bundles -> bundles.findBundle(topic));}public NamespaceBundle findBundle(TopicName topicName) {checkArgument(nsname.equals(topicName.getNamespaceObject()));//同理,继续跟踪进去return factory.getTopicBundleAssignmentStrategy().findBundle(topicName, this);}public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {//计算Topic名称的哈希值long hashCode = Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();//根据哈希值来获取所归属的bundle,一致性哈希的设计。跟进去看看是怎么计算的NamespaceBundle bundle = namespaceBundles.getBundle(hashCode);if (topicName.getDomain().equals(TopicDomain.non_persistent)) {bundle.setHasNonPersistentTopic(true);}return bundle;}protected NamespaceBundle getBundle(long hash) {//通过数组的二分查找进行计算,数组的元素个数跟存储Bundle的bundles的集合大小是一样的,能获取对应的Bundle//思路其实就是一致性哈希的查找方式,计算出哈希值处于哈希环所处的位置并查找其下一个节点的信息int idx = Arrays.binarySearch(partitions, hash);int lowerIdx = idx < 0 ? -(idx + 2) : idx;return bundles.get(lowerIdx);}

知道Bundle之后,下一步就是根据这个Bundle来查询其所归属的Broker节点,也就是上面的NamespaceService类的findRedirectLookupResultAsync方法,这里一路跟下去就是查询缓存中获取映射信息的地方了,感兴趣的伙伴可以继续跟下去

    private CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync(ServiceUnitId bundle) {if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {return CompletableFuture.completedFuture(Optional.empty());}return redirectManager.findRedirectLookupResultAsync();}

总结

以上就是Pulsar的Lookup机制的实现流程,在寻址的过程中,需要阅读的伙伴具备一致性哈希的知识,因为Pulsar的Topic归属就是引入了一致性哈希算法来实现的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/302515.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

arm-linux-gnueabihf-gcc默认目录

默认编译的头文件目录&#xff1a; /usr/local/arm/gcc-linaro-4.9.4-2017.01-x86_64_arm-linux-gnueabihf/arm-linux-gnueabihf/lib 默认编译的库文件目录&#xff1a; /usr/local/arm/gcc-linaro-4.9.4-2017.01-x86_64_arm-linux-gnueabihf/arm-linux-gnueabihf/include/ …

vscode 重命名很慢或失败 vscode renames are slow

网上问题&#xff0c; 插件问题&#xff08;我遇见的排除&#xff0c;不是&#xff09;被其他程序占用问题&#xff0c;&#xff08;我这边是这个&#xff09; 解决方案&#xff1a; 打开【资源管理器】&#xff0c;使用火绒 或其他软件&#xff0c;查看文件夹 or 文件 被哪个…

2024.4.9-day12-CSS 常用样式属性和字体图标

个人主页&#xff1a;学习前端的小z 个人专栏&#xff1a;HTML5和CSS3悦读 本专栏旨在分享记录每日学习的前端知识和学习笔记的归纳总结&#xff0c;欢迎大家在评论区交流讨论&#xff01; 文章目录 作业 作业 <!DOCTYPE html> <html lang"zh-CN"><he…

go之web框架gin

介绍 Gin 是一个用 Go (Golang) 编写的 Web 框架。 它具有类似 martini 的 API&#xff0c;性能要好得多&#xff0c;多亏了 httprouter&#xff0c;速度提高了 40 倍。 如果您需要性能和良好的生产力&#xff0c;您一定会喜欢 Gin。 安装 go get -u github.com/gin-gonic/g…

【C++】unordered 系列关联式容器

文章目录 1. unordered 系列关联式容器2. unordered_map2.1 unordered_map 的文档介绍2.2 unordered_map 的接口说明 3. unordered_set4. 在线 OJ 1. unordered 系列关联式容器 在 C 98 中&#xff0c;STL 提供了底层为红黑树结构的一系列关联式容器&#xff0c;在查询时效率可…

STM32中C编程引入C++程序

C具备类的创建思想很实用于实际场景多相似性的框架搭建&#xff1b;同种类型或相似类型的C的优势明显因此进行相互嵌套使用 需要在C中使用C类的话&#xff0c;你可以通过C的“extern "C"”语法来实现。这允许你在C代码中使用C的链接方式&#xff0c;而在C代码中使用…

实现RAG:使用LangChain实现图检索查询

你是不是有时会遇到这样的问题&#xff1a;你可能遇到的任何主题或问题&#xff0c;都有大量的文档&#xff0c;但是当尝试将某些内容应用于自己的用途时&#xff0c;突然发现很难找到所需的内容。 在这篇博文中&#xff0c;我们将看一下LangChain是如何实现RAG的&#xff0c;这…

kali基础渗透学习,永恒之蓝,木马实战

简介 kali的学习本质是在linux上对一些攻击软件的使用&#xff0c;只是学习的初期 先在终端切换到root用户&#xff0c;以便于有些工具对权限的要求 下载链接 镜像源kali 攻击流程 公网信息搜集 寻找漏洞&#xff0c;突破口&#xff0c;以进入内网 进入内网&#xff0c…

码蹄集部分题目(第五弹;OJ赛2024年第10期)

&#x1f40b;&#x1f40b;&#x1f40b;竹鼠通讯&#xff08;钻石&#xff1b;分治思想&#xff1b;模板题&#xff1a;就算几何平面点对问题&#xff09; 时间限制&#xff1a;3秒 占用内存&#xff1a;128M &#x1f41f;题目描述 在真空中&#xff0c;一块无限平坦光滑…

秋招算法刷题6

20240408 1.两数之和 &#xff08;时间复杂度是O&#xff08;n的平方&#xff09;&#xff09; public int[] twoSum(int[] nums, int target){int nnums.length; for(int i0;i<n;i){ for(int j1;j<n;j){ if(nums[i][j]target){ …

libVLC 提取视频帧使用OpenGL渲染

在上一节中&#xff0c;我们讲解了如何使用QWidget渲染每一帧视频数据。 由于我们不停的生成的是QImage对象&#xff0c;因此对 CPU 负荷较高。其实在绘制这块我们可以使用 OpenGL去绘制&#xff0c;利用 GPU 减轻 CPU 计算负荷&#xff0c;本节讲解使用OpenGL来绘制每一帧视频…

harmonyOS安装ohpm

下载 下载地址 HUAWEI DevEco Studio和SDK下载和升级 | 华为开发者联盟 初始化 注意&#xff1a;初始化ohpm前&#xff0c;需先完成node.js环境变量配置 1.解压文件&#xff0c;进入commandline-tools-windows-2.0.0.2\command-line-tools\ohpm\bin 2.执行&#xff1a; init.ba…

安卓开机启动流程

目录 一、整体框架二、流程代码分析2.1 Boot ROM2.2 Boot Loader2.3 Kernel层Kernel代码部分 2.4 Init进程Init进程代码部分 2.5 zygote进程zygote代码部分 2.6 SystemServer进程SystemServer代码部分 2.7 启动Launcher与SystemUI 三、SystemServices3.1 引导服务3.2 核心服务3…

C++进阶之路---何为智能指针?

顾得泉&#xff1a;个人主页 个人专栏&#xff1a;《Linux操作系统》 《C从入门到精通》 《LeedCode刷题》 键盘敲烂&#xff0c;年薪百万&#xff01; 一、为什么需要智能指针&#xff1f; 下面我们先分析一下下面这段程序有没有什么内存方面的问题&#xff1f;提示一下&am…

什么是HW,企业如何进行HW保障?

文章目录 一、什么是HW二、HW行动具体采取了哪些攻防演练措施三、攻击方一般的攻击流程和方法四、企业HW保障方案1.建意识2.摸家底3.固城池4.配神器5.增值守 一、什么是HW 网络安全形势近年出现新变化&#xff0c;网络安全态势变得越来越复杂&#xff0c;黑客攻击入侵、勒索病…

【腾讯云 TDSQL-C Serverless 产品体验】饮水机式使用云数据库

云计算的发展从IaaS&#xff0c;PaaS&#xff0c;SaaS&#xff0c;到最新的BaaS&#xff0c;FasS&#xff0c;在这个趋势中serverless(去服务器化&#xff09; 计算资源发展Physical -> Virtualisation -> Cloud Compute -> Container -> Serverless。 一、背景介绍…

【azure笔记 1】容器实例管理python sdk封装

容器实例管理python sdk封装 测试结果 说明 这是根据我的需求写的&#xff0c;所有有些参数是写死的&#xff0c;比如cpu核数和内存&#xff0c;你可以根据你的需要自行修改。前置条件&#xff1a; 当前环境已安装python3.8以上版本和azure cli并且已经登陆到你的账户 依赖安…

自己写的组件中使用v-model双向绑定

这里的时间选择表单是我写的一个组件&#xff0c;我想用v-model获取到实时的ref值。 代码&#xff1a; //父组件<TimePickerModal v-model:value"time" label-text"计划客面时间" /> const time ref(2024-04-09 15:20:00);//子组件<template>…

JRT判断数据是否存在优化

有一种业务情况类似下图&#xff0c;质控能做的项目是仪器关联的项目。这时候维护质控物时候开通项目时候要求加载仪器项目里面的项目&#xff08;没有开通的子业务数据的部分&#xff09;。对右边已经开通的部分要求加载仪器项目里面的项目&#xff08;有开通业务子数据的部分…

【C语言】扫雷小游戏

文章目录 前言一、游戏玩法二、创建文件test.c文件menu()——打印菜单game()——调用功能函数&#xff0c;游戏的实现main()主函数 game.c文件初始化棋盘打印棋盘随机布置雷的位置统计周围雷的个数展开周围一片没有雷的区域计算已排查位置的个数排查雷(包括检测输赢): game.h文…