nsqd的架构及源码分析

文章目录

一  nsq的整体代码结构

二  回顾nsq的整体架构图

三  nsqd进程的作用

四  nsqd启动流程的源码分析

五  本篇博客总结


在博客 nsq整体架构及各个部件作用详解_YZF_Kevin的博客-CSDN博客 中我们讲了nsq的整体框架,各个部件的大致作用。如果没看过的,建议大家去学习下,不然理解后续的内容会有难度

这篇博客开始我们来看下每个部件的详细功能,从源码入手分析其内部实现原理

一  nsq的整体代码结构

建议大家也下载nsq的代码,一边看博客一边看代码印象更深刻。nsq的官方git代码地址:GitHub - nsqio/nsq: A realtime distributed messaging platform

nsq代码结构如下,图中有注释,大家先有个整体印象,知道各个模块的代码在哪就行

二  回顾nsq的整体架构图

 图中最上面的四个节点就是nsqd进程,至少要有1个,可以多开。我们画了4个,分别是nsq1,nsq2,nsq3,nsq4

注意看nsqd的连接关系,每个nsqd节点和所有客户端都有连接(tcp+http),且每个nsqd节点和所有的nsqlookupd节点也有连接(tcp)

三  nsqd进程的作用

1. topic的创建,清空,暂停,重新激活,删除,持久化(保存到文件,从文件加载),同步给nsqlookupd进程

2. channel的创建,清空,暂停,重新激活,删除,持久化(保存到文件,从文件加载),同步给nsqlookupd进程

3. message的监听,中转,持久化(保存到文件,从文件加载),主动推送消息给各个客户端,超时重发,消息计数

4. 配置修改,运行状态(协程、内存)统计

5. 抽检channel的延迟队列,飞行队列,消息超时的重新入队

6. 统计和上报工作,主要统计topic,channel,消息,各种队列,客户端连接,GC等信息,通过UDP协议上报给指定地址的进程

可以说,nsqd进程是整个nsq平台的核心,消息队列架构简单的话,只有一个nsqd进程就够了。

我画了一个图来概括一个nsqd进程的工作内容,如下

 

四  nsqd启动流程的源码分析

nsqd的代码主要在两块

1. 代码框架及main函数,目录在 nsq/apps/nsqd/*

2. 实现代码,目录在 nsq/nsqd/*

值得一提的是nsqd,nsqlookupd,nsqadmin这三个进程的框架都使用了go-svc包,这个包很简单,使用者只需实现它的三个函数即可

Init()           配置,初始化等操作

Start()        真正启动

Stop()        结束时的关闭操作

好了,我们看nsqd的入口,也就是main函数,代码在nsq/apps/nsqd/main.go,代码如下(已加注释)

type program struct {once 		sync.Oncensqd 		*nsqd.NSQD	// nsqd对象
}// nsqd的启动入口
func main() {prg := &program{}// Run内部会调用Init(),Start(),监听到这两个系统信号时会调用Stop()if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {logFatal("%s", err)}
}

main()函数内部只有一个对象program,也只有一处调用svc.Run(),这个函数内部会调用program.Init()program.Start()

其中program.Init()函数,主要是创建并检测nsqd的配置,然后根据配置创建出一个nsqd实例

重点在program.Start()函数,代码如下(已加注释)

// nsqd的启动,重点在调用的Main()函数
func (p *program) Start() error {// 加载元数据,并创建初始化出所有的topic对象,所有的channel对象err := p.nsqd.LoadMetadata()if err != nil {logFatal("failed to load metadata - %s", err)}// 再持久化元数据到文件(不要觉得奇怪,因为上面的LoadMetadata()函数可能会过滤掉一些无效的topic,channel,这里再重写算是刷新了元数据)err = p.nsqd.PersistMetadata()if err != nil {logFatal("failed to persist metadata - %s", err)}// 启动一个新协程,专门运行nsqd的Main()循环,注意这个Main()是永不退出的(除非出错)go func() {err := p.nsqd.Main()if err != nil {p.Stop()os.Exit(1)}}()return nil
}

对上面的代码解释下,program.Start()函数一共干了3件事

1. nsqd.LoadMetadata(), 这个函数根据配置加载旧nsqd元数据。这些元数据包含版本号,topic,channel,过滤掉不合法的topic和channel,合法的topic和channel都创建出对象,并且为每个topic建立处理循环

2. nsqd.PersistMetadata(), 把过滤后的topic和channel再保存到文件nsqd.dat,算是把旧数据过滤了一遍

3. 新启动一个协程,调用nsqd.Main(),这个Main()是nsqd的核心,启动了nsqd的全部服务。除非遇到错误,否则永不退出

接下来看nsqd.Main()的内部实现,代码在nsq/nsqd/nsqd.go,代码如下(已加注释)

// nsqd主协程(内部启动tcp循环,http循环,https循环, 扫描队列池,和nsqlookupd循环),永不退出,除非严重错误
func (n *NSQD) Main() error {exitCh := make(chan error)var once sync.Once// 退出函数(独立协程运行,一直监听,遇到错误exitFunc := func(err error) {once.Do(func() {if err != nil {n.logf(LOG_FATAL, "%s", err)}exitCh <- err})}// TCP服务,独立协程n.waitGroup.Wrap(func() {exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))})// HTTP服务,独立协程if n.httpListener != nil {httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)n.waitGroup.Wrap(func() {exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))})}// HTTPS服务,独立协程if n.httpsListener != nil {httpsServer := newHTTPServer(n, true, true)n.waitGroup.Wrap(func() {exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))})}// 独立协程,抽检扫描各个队列n.waitGroup.Wrap(n.queueScanLoop)// 独立协程,和nsqlookupd的循环(连接和重连,心跳维持,topic,channel变化通知等)n.waitGroup.Wrap(n.lookupLoop)if n.getOpts().StatsdAddress != "" {n.waitGroup.Wrap(n.statsdLoop)}err := <-exitChreturn err
}

对上面的代码解释下,nsqd.Main()主要干了6件事

1. 开一个新协程,启动tcp服务并一直监听,为客户端提供tcp服务。我们的客户端最常用,因为生产消息,中转消息,处理消息都是这里实现的

2. 开一个新协程,启动http服务并一直监听,为客户端提供htttp服务

3. 开一个新协程,启动https服务并一直监听,为客户端提供htttps服务

4. 开一个新协程,建立并维持扫描池,这些扫描协程会扫描所有channel的延迟队列,飞行队列,如果消息超时了就重新入队。很有意思的是,nsqd作者很大方地承认他抄袭了redis的抽检策略,内部实现也确实是类redis操作,这个我们后面再讲,todo

5. 开一个新协程,和nsqlookupd建立循环,主要是连接和重连,心跳维持,实时报告自己的topic和channel变化

6. 如果你配置了信息上报的地址,nsqd会再开一个新协程,做统计上报操作,统计topic,channel,消息,内存,GC等信息,通过UDP协议上报过去

五  本篇博客总结

1. 给大家看了nsq平台下代码整体结构,建议大家下载源码自己看下,加强印象

2. 讲了nsqd进程提供的功能实现

3. 跟踪了nsqd进程启动流程,最核心的nsqd.Main()建议大家仔细看,后面讲的nsqd内容也都是这几个协程里面干的活

下一篇博客我们开始详解分析nsqd内部各个协程的具体工作

todo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/83707.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【100天精通python】Day30:使用python操作数据库_数据库基础入门

专栏导读 专栏订阅地址&#xff1a;https://blog.csdn.net/qq_35831906/category_12375510.html 1 数据库基础知识介绍 1.1 什么是数据库&#xff1f; 数据库是一个结构化存储和组织数据的集合&#xff0c;它可以被有效地访问、管理和更新。数据库的目的是为了提供一种可靠的…

paddleseg数据集自定义比例划分为测试集test.txt,训练集train.txt,验证集val.txt

将语义分割的数据集标注好后如下所示&#xff1a; 整理好图片和标签文后需要按照比例划分为训练集&#xff0c;验证集&#xff0c;测试集。 具体划分代码见下&#xff1a; import glob import os.path import argparse import warnings import numpy as npdef parse_args():p…

“清凉计划”KCOFFEE来了,华为天气和肯德基携手提升你的冰凉咖位

八月的热浪席卷着城市&#xff0c;开启了高温酷暑的“闷烤”模式&#xff0c;此时“怕热星人”急需一杯冰爽的冷饮&#xff0c;为了拯救热热热亻七的你们&#xff0c;华为天气联手肯德基带着超强冷势力&#xff0c;发起“清凉计划”&#xff0c;送上一杯KCOFFEE现磨冰咖啡&…

Debian10:安装PHPVirtualBox

PHPVirtualBox 是一个用 PHP 编写&#xff0c;用于管理 VirtualBox 的 Web 前端&#xff08;由AJAX实现&#xff09;。 参考文章&#xff1a;VirtualBoxPHPVirtualBox部署_骡子先生的博客-CSDN博客php virualbox,浏览器远程控制VBox 虚拟机phpVirtualBox_weixin_39815879的博客…

编写一个指令(v-focus2end)使输入框文本在聚焦时焦点在文本最后一个位置

项目反馈输入框内容比较多时候&#xff0c;让鼠标光标在最后一个位置&#xff0c;心想什么奇葩需求&#xff0c;后面试了一下&#xff0c;是有点影响体验&#xff0c;于是就有了下面的效果&#xff0c;我目前的项目都是若依的架子&#xff0c;用的是vue2版本。vue3的朋友想要使…

JVM—编译器、类加载的过程、双亲委派机制这些你还记得吗?

背景介绍 这两天在对JVM的知识进行回顾&#xff0c;顺便来分享分享&#xff0c;接下来也会有系列文章&#xff0c;欢迎大家一起讨论。 过程 为什么叫JVM&#xff1f; Java Virtual Machine&#xff0c;java虚拟机。可以理解成一个以字节码为机器指令的CPU 有哪些特点呢&#…

【Flutter】【packages】simple_animations 简单的实现动画

package&#xff1a;simple_animations 导入包到项目中去 可以实现简单的动画&#xff0c; 快速实现&#xff0c;不需要自己过多的设置 有多种样式可以实现[ ] 功能&#xff1a; 简单的用例&#xff1a;具体需要详细可以去 pub 链接地址 1. PlayAnimationBuilder PlayAnima…

递归神经网络简介

一、说明 说起递归神经网络&#xff0c;递归神经网络&#xff08;RNN&#xff09;主要包括以下几种类型&#xff1a; 简单的RNN&#xff08;Simple RNN&#xff09;&#xff1a;最基本的RNN类型&#xff0c;每个时刻的输出都与前面时刻的状态有关。 循环神经网络&#xff08;R…

Blazor前后端框架Known-V1.2.10

V1.2.10 Known是基于C#和Blazor开发的前后端分离快速开发框架&#xff0c;开箱即用&#xff0c;跨平台&#xff0c;一处代码&#xff0c;多处运行。 Gitee&#xff1a; https://gitee.com/known/KnownGithub&#xff1a;https://github.com/known/Known 概述 基于C#和Blazo…

Kafka:springboot集成kafka收发消息

kafka环境搭建参考Kafka&#xff1a;安装和配置_moreCalm的博客-CSDN博客 1、springboot中引入kafka依赖 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><…

Python-OpenCV中的图像处理-形态学转换

Python-OpenCV中的图像处理-形态学转换 形态学转换腐蚀膨胀开运算闭运算形态学梯度礼帽黑帽形态学操作之间的关系 形态学代码例程 形态学转换 形态学操作:腐蚀&#xff0c;膨胀&#xff0c;开运算&#xff0c;闭运算&#xff0c;形态学梯度&#xff0c;礼帽&#xff0c;黑帽等…

B树的插入与删除过程

B树的插入 原树&#xff1a; 插入key后&#xff0c;若导致原节点关键字数超过上限&#xff0c;则从中间位置&#xff08; ⌈ m 2 ⌉ \lceil\frac{m}{2}\rceil ⌈2m​⌉&#xff09;将关键字分成两部分&#xff0c;左部分包含的关键字放在原节点中&#xff0c;右部分包含的关键…

前端下载文化部几种方法(excel,zip,html,markdown、图片等等)和导出 zip 压缩包

文章目录 1、location.href2、location.href3、a标签4、请求后端的方式5、文件下载的方式6、Blob和Base647、下载附件方法(excel,zip,html,markdown)8、封装下载函数9、导出 zip 压缩包相关方法(流方式) 总结 1、location.href //get请求 window.location.href url;2、locati…

死锁的成因,和解决方案总结

何为死锁 死锁是多线程或并发程序中的一种情况&#xff0c;当多个线程因为竞争资源而相互等待&#xff0c;并且无法继续执行的情况。在死锁中&#xff0c;每个线程都在等待其他线程释放资源&#xff0c;从而导致所有线程都陷入无限等待状态&#xff0c;无法继续向前执行&#…

0805hw

1. #include <myhead.h> void Bub_sort(int *arr,int n)//冒泡排序 {for(int i1;i<n;i){int count0;for(int j0;j<n-i;j){if(arr[j]>arr[j1]){int temparr[j];arr[j]arr[j1];arr[j1]temp;count;}}if(count0){break;}}printf("冒泡排序后输出结果:\n"…

uni-app离线打包高德地图导入android studio不能正常显示

本人使用的uni-app SDK版本&#xff1a;Android-SDK3.8.7.81902_20230704 1.导入以上文件&#xff0c;依赖已经自动添加了 2.确保这个正常引入 3.修改AndroidMainifest.xml,添加自己的密钥

整理mongodb文档:删

个人博客 整理mongodb文档:删 求关注&#xff0c;哪儿不足&#xff0c;求大佬们指出&#xff0c;哪儿写的不够通俗易懂跟清晰&#xff0c;也求指出 文章概叙 本文主要是介绍了删除数据的几个方法&#xff0c;主要还是在介绍deleteMany、deleteOne以及remove&#xff0c;对于…

JAVA基础之放弃使用Random

随机是日常生活中经常遇到的非常有趣的东西&#xff0c;比如说抛硬币&#xff0c;他的不可预知性总是让我们特别着迷&#xff0c;在拿不定主意时&#xff0c;有些人就喜欢用抛硬币的方式来帮助我们做决定。体育领域也喜欢用喜欢用抛硬币的方式来猜先。随机数功能是Java非常非常…

14个前端开发者应该知道的实用网站

在本文中&#xff0c;我将分享一些非常有用的网站合集&#xff0c;这些网站可以在你的日常工作中极大地帮助你。这些网站已经成为我各种任务的首选资源&#xff0c;节省了我的时间&#xff0c;提高了工作效率 文档自动化 Documatic 是一款专为开发人员设计的非常高效的搜索引擎…

Pytorch深度学习-----现有网络模型的使用及修改(VGG16模型)

系列文章目录 PyTorch深度学习——Anaconda和PyTorch安装 Pytorch深度学习-----数据模块Dataset类 Pytorch深度学习------TensorBoard的使用 Pytorch深度学习------Torchvision中Transforms的使用&#xff08;ToTensor&#xff0c;Normalize&#xff0c;Resize &#xff0c;Co…