【Zinx】Day5-Part2:Zinx 的消息队列及多任务机制

目录

  • Day5-Part2:Zinx 的消息队列及多任务机制
    • 创建消息队列
    • 创建及启动 Worker 工作池
    • 在 Server 启动的同时对连接池进行初始化

Day5-Part2:Zinx 的消息队列及多任务机制

接下来我们需要给 ZInx 添加消息队列以及多任务 Worker 机制。可以通过限制 worker 的数量来限定处理业务的 goroutine 数量,而不是无限制的开辟 goroutine。虽然 golang 在调度 goroutine 这方面已经做到极致了,但是数量过多的 goroutine 依然会带来不必要的环境切换成本,这应该是服务器节省掉的成本。

我们可以用消息队列来对 worker 进行缓冲。
请添加图片描述

创建消息队列

首先,处理消息队列的部分应该集成到MsgHandler模块下,它属于消息模块的范畴:

type MsgHandle struct {Apis           map[uint32]ziface.IRouter // map 存放每个 MsgId 对应的处理方法WorkerPoolSize uint32                    // 业务工作 worker 池的数量TaskQueue      []chan ziface.IRequest    // worker 负责取任务的消息队列
}func NewMsgHandle() *MsgHandle {return &MsgHandle{Apis:           make(map[uint32]ziface.IRouter),WorkerPoolSize: settings.Conf.WorkerPoolSize,TaskQueue:      make([]chan ziface.IRequest, settings.Conf.WorkerPoolSize),}
}

这里添加了两个成员:

  • WorkerPoolSize:工作池的数量;
  • TaskQueue:TaskQueue 是一个 Request 请求的 channel 集合,用来缓冲请求 worker 处理的 Request 信息,worker 会从对应的队列当中取客户端的请求数据并进行具体的业务处理。

还有一点要注意:我们在此处新添加了settings.Conf.WorkerPoolSize字段,意味着有新的参数需要设定,我们需要修改 conf.yaml 文件以及 settings 中的 Conf 结构,新增字段 WorkerPoolSize。如果后续出现了新的以 settings.Conf 为前缀的字段,同样需要按照上述逻辑修改 Conf 的成员以及 yaml 文件。

创建及启动 Worker 工作池

我们修改了 MsgHandle 的成员,新增了 WorkerPoolSize 和 TaskQueue,现在我们为其新添加一些方法,首先修改 IMsgHandle 这个接口的定义,即首先在接口中定义好要新增哪些方法:

type IMsgHandle interface {DoMsgHandler(request IRequest)          // 立即以非阻塞的方式处理消息AddRouter(msgId uint32, router IRouter) // 为消息添加具体的处理逻辑StartWorkerPool()                       // 启动 worker 工作池SendMsgToTaskQueue(request IRequest)    // 将消息交给 TaskQueue, 由 worker 进行处理
}

定义好接口的方法,现在我们需要为 MsgHandle 实现新增的两个方法,以使其实现接口:

// StartOneWorker 启动一个 Worker 的工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {fmt.Println("Worker ID = ", workerID, " is started.")for {select {case request := <-taskQueue:mh.DoMsgHandler(request)}}
}// StartWorkerPool 启动 worker 工作池
func (mh *MsgHandle) StartWorkerPool() {// 遍历需要启动的 worker, 依次启动for i := 0; i < int(mh.WorkerPoolSize); i++ {// 一个 worker 被启动时, 给当前的 worker 对应的任务队列开辟空间mh.TaskQueue[i] = make(chan ziface.IRequest, settings.Conf.MaxWorkerTaskLen)// 启动当前 worker, 阻塞地等待对应的任务队列是否有消息传来go mh.StartOneWorker(i, mh.TaskQueue[i])}
}// SendMsgToTaskQueue 将消息交给 TaskQueue, 由 worker 进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {// 根据 ConnID 来分配当前的连接应该由哪个 worker 负责处理// 使用轮询分配法则// 得到需要处理此条连接地 workerIDworkerID := request.GetConnection().GetConnID() % mh.WorkerPoolSizefmt.Println("Add ConnID = ", request.GetConnection().GetConnID(), " request msgID = ", request.GetMsgID(), "to workerID = ", workerID)// 将请求消息发送给任务队列mh.TaskQueue[workerID] <- request
}
  • StartWorkerPool() 方法会启动 Worker 工作池,它会遍历 TaskQueue 这个存放着 request 类型的 channel 的列表,需要注意的是,TaskQueue 在工厂函数中初始化时,是初始化了保存 channel 的 slice,而没有初始化 channel,因此 channel 使用默认的零值 nil 进行填充。在启动工作池的时候,应当对 channel 进行正确的初始化,即使用 make 方法指定其类型以及缓冲区的大小。指定了缓冲区大小的 channel 更像是一个队列,可以完成对 request 的缓冲。初始化每一个 TaskQueue 下辖的 channel 之后,启动 StartOneWorker 这个 goroutine,即让相应的 worker 开启工作。
  • StartOneWorker() 方法就是一个 Worker 的工作业务,它使用一个 for loop 来等待 taskQueue 中 request 的到来,注意形参 taskQueue,它是一个 Request 类型的通道,传入的实参是 StartWorkerPool 方法中的 TaskQueue[i]
  • 现在,我们的 workers 就绪了,还需要一个给 worker 传入数据的入口,在 SendMsgToTaskQueue 实现这项功能。SendMsgToTaskQueue() 是工作池的数据入口,采用轮询的分配机制,根据 Server 分配的 ConnID,与 WorkerPoolSize 取模来计算具体要把这个 request 分配到哪个 worker 当中。分配好 worker 之后,通过 TaskQueue[i] 将 Request 发送给对应的 worker,worker 就会在其 goroutine 中处理这个请求。

在 Server 启动的同时对连接池进行初始化

现在我们将消息队列及多任务机制集成到 Zinx 中。在 Server 的 Start 方法下,服务端 Accept 来自客户端的连接请求之前,使用 StartWorkerPool 启动线程池:

func (s *Server) Start() {//...go func() {//0 启动worker工作池机制s.msgHandler.StartWorkerPool()//1 获取一个TCP的Addraddr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Println("resolve tcp addr err: ", err)return}//...//...}}()
}

其次,当我们在 Connection 的 Reader 中得到来自客户端的数据之后,将数据发送给 Worker 的工作池,进行业务处理:

// StartReader 开启处理 conn 读数据的 goroutine
func (c *Connection) StartReader() {fmt.Println("Reader Goroutine is running")defer fmt.Println(c.RemoteAddr().String(), " conn reader exit !")defer c.Stop()for {// 创建封包拆包的对象dp := NewDataPack()// 读取客户端的 msg headheadData := make([]byte, dp.GetHeadLen()) // 注意 GetHeadLen() 返回常量 8, 因为包的头部长度固定if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head error", err)c.ExitBuffChan <- truereturn}// 拆包, 得到 msgid 和 datalen, 并放在 msg 中msg, err := dp.Unpack(headData)if err != nil {fmt.Println("unpack error", err)c.ExitBuffChan <- truereturn}// 根据 dataLen 读取 data, 放在 msg.Data 中var data []byteif msg.GetDataLen() > 0 {data = make([]byte, msg.GetDataLen())if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data error", err)c.ExitBuffChan <- truereturn}}msg.SetData(data)// 得到当前客户端请求的 Request 数据req := Request{conn: c,msg:  msg,}if settings.Conf.WorkerPoolSize > 0 {// 已经启动工作池机制, 将消息交给 Worker 处理c.Msghandler.SendMsgToTaskQueue(&req)} else {// 从绑定好的消息和对应的处理方法中执行 Handle 方法go c.Msghandler.DoMsgHandler(&req)}}
}

此处我们并没有强制的启用多任务机制,如果 WorkerPoolSize 小于等于零,我们只开启一个临时的 goroutine 去处理业务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/31140.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

项目上传到Gitee过程

在gitee上新建一个仓库 点击“克隆/下载”获取仓库地址 电脑上要装好git 在电脑本地文件夹右键“Git Bash Here” 依次执行如下命令 git init git remote add origin https://gitee.com/qlexcel/stm32-simple.git git pull origin master git add . git commit -m ‘init’…

速算迷你世界脚本UI

--[[ --数学速算主界面 local UI"6996144362677448610" local v"6996144362677448610_" --自定义玩家数据界面 --显示界面分类 -- --称号积分幼儿园0学前班50小学生200初中生500高中生1000大学生2000研究生5000博士生10000教授50000 local A {["主屏幕…

坐落于杭州的电商代运营公司品融电商

坐落于杭州的电商代运营公司品融电商 在中国电商行业蓬勃发展的浪潮中&#xff0c;品融电商&#xff08;PINKROON&#xff09;作为一家扎根杭州的新锐品牌管理公司&#xff0c;凭借其独特的全域增长方法论和实战经验&#xff0c;迅速崛起为行业标杆。自2020年成立以来&#x…

mysql的Innodb最大支持的索引长度是多少,以及索引长度怎么计算

今天正好有空&#xff0c;来讲个之前粉丝经常问的一个知识&#xff0c;就是mysql的Innodb最大支持的索引长度是多少&#xff1f;以及索引长度怎么计算&#xff1f; 一、mysql的innodb引擎&#xff0c;创建索引最大支持的长度是多少字节&#xff1f; 不墨迹&#xff0c;直接说…

【网络安全工程】任务11:路由器配置与静态路由配置

目录 一、概念 二、路由器配置 三、配置静态路由CSDN 原创主页&#xff1a;不羁https://blog.csdn.net/2303_76492156?typeblog 一、概念 1、路由器的作用&#xff1a;通过路由表进行数据的转发。 2、交换机的作用&#xff1a;通过学习和识别 MAC 地址&#xff0c;依据 M…

Dagger 2 系列(五)——进阶之@Scope 和 @Singleton

前言&#xff1a; 在上一篇Dagger 2 系列&#xff08;四&#xff09;——Named 和 Qualifier注解介绍&#xff0c;了Named 和 Qualifier注解&#xff0c;这篇文章&#xff0c;我们将会了解另外俩个注解&#xff1a;Scope 和 Singleton。 在这篇文章中你会了解到&#xff1a; …

脑电波控制设备:基于典型相关分析(CCA)的脑机接口频率精准解码方法

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、CCA的用途二、频率求解思路三、输入数据结构四、判断方法五、matlab实践1.数据集获取及处理2.matlab代码3.运行及结果 六、参考文献 前言 在脑机接口(BCI)领…

fiddler+雷电模拟器(安卓9)+https配置

一、fiddler配置 1、开启https代理 2、根证书安装&#xff1a;导出证书系统安装 二、模拟器设置 1、设置网络桥接模式 【点击安装】提示安装成功后保存即可 2、开启root&#xff08;开启adb远程调试&#xff09; 3、开启磁盘写入 4、设置WLAN代理 5、证书安装&#xff1a;物…

跨越时空的对话:图灵与GPT-4聊AI的前世今生

&#xff08;背景&#xff1a;虚拟咖啡厅&#xff0c;图灵身着1950年代西装&#xff0c;端着一杯热茶&#xff0c;GPT-4以全息投影形态坐在对面&#xff09; 图灵&#xff08;喝了口茶&#xff09;&#xff1a;“听说你能写诗&#xff1f;我当年在布莱切利园破解Enigma时&…

双击PPT文件界面灰色不可用,需要再次打开该PPT文件才能正常打开

双击PPT文件界面灰色不可用&#xff0c;需要再次打开该PPT文件才能正常打开 1. 软件环境⚙️2. 问题描述&#x1f50d;3. 解决方法&#x1f421;解决步骤 4. 结果预览&#x1f914; 1. 软件环境⚙️ Windows10 或 Windows11 专业版64位&#xff0c;安装MotionGo软件&#xff08…

【时间序列聚类】Feature-driven Time Series Clustering(特征驱动的时间序列聚类)

文章目录 1.文章介绍2.问题背景3.拟解决的问题4.主要贡献5.提出的方法5.1模型pipeline5.2特征抽取和选择5.3图渲染和社区检测5.4共现矩阵的构建5.5对共现矩阵进行聚类 6.实验6.1模型设置6.2实验结果6.3消融实验 7.结论8.个人观点9.Reference 1.文章介绍 论文出处&#xff1a;ED…

tomcat负载均衡配置

这里拿Nginx和之前做的Tomcat 多实例来实现tomcat负载均衡 1.准备多实例与nginx tomcat单机多实例部署-CSDN博客 2.配置nginx做负载均衡 upstream tomcat{ server 192.168.60.11:8081; server 192.168.60.11:8082; server 192.168.60.11:8083; } ser…

SQLAlchemy系列教程:如何执行原生SQL

Python中的数据库交互提供了高级API。但是&#xff0c;有时您可能需要执行原始SQL以提高效率或利用数据库特定的特性。本指南介绍在SQLAlchemy框架内执行原始SQL。 在SQLAlchemy中执行原生SQL SQLAlchemy虽然以其对象-关系映射&#xff08;ORM&#xff09;功能而闻名&#xff…

19.HarmonyOS Next CustomSlider组件基础教程(一)

温馨提示&#xff1a;本篇博客的详细代码已发布到 git : https://gitcode.com/nutpi/HarmonyosNext 可以下载运行哦&#xff01; 1. 组件介绍 Slider&#xff08;滑动选择器&#xff09;是HarmonyOS中常用的交互组件&#xff0c;用于在给定的数值范围内进行连续值的选择。本教…

管中窥豹数字预失真(DPD)

管中窥豹数字预失真&#xff08;DPD&#xff09; 数字预失真在通信领域发挥了巨大的作用&#xff0c;对提高功放效率、改善误码率起了不可忽略的作用&#xff0c;广泛运用与通信、雷达等各种领域。但是对于普通用户&#xff0c;它显得及其高深神秘。今天就用这个短文&#xff…

MCP极简入门:超快速上手运行简单的MCP服务和MCP客户端

MCP是什么&#xff1f; 首先我们快速过一下MCP的基本概念&#xff0c;接着我们会通过一个简单的天气服务的教程&#xff0c;来上手学会使用MCP服务和在主机运行服务。本文根据官方教程改编。 1. MCP的基本概念 MCP&#xff08;Model Context Protocol&#xff0c;模型上下文…

DeepSeek进阶应用(一):结合Mermaid绘图(流程图、时序图、类图、状态图、甘特图、饼图)

&#x1f31f;前言: 在软件开发、项目管理和系统设计等领域&#xff0c;图表是表达复杂信息的有效工具。随着AI助手如DeepSeek的普及&#xff0c;我们现在可以更轻松地创建各种专业图表。 名人说&#xff1a;博观而约取&#xff0c;厚积而薄发。——苏轼《稼说送张琥》 创作者&…

海康线扫相机平场矫正教程

0、平场矫正前的准备确认 1、白纸准备 确保视野中有一张平整且无折痕的白纸&#xff0c;使其完全铺满相机的整个视野。 2、行高设置 将行高参数设定为 2048。 3、灰度值控制 相机端图像的灰度值应维持在 120 - 180 这个区间内。同时&#xff0c;最亮像素点与最暗像素点的灰度…

数智读书笔记系列015 探索思维黑箱:《心智社会:从细胞到人工智能,人类思维的优雅解读》读书笔记

引言 《The Society of Mind》&#xff08;《心智社会》&#xff09;的作者马文・明斯基&#xff08;Marvin Minsky&#xff09;&#xff0c;是人工智能领域的先驱和奠基者之一 &#xff0c;1969 年获得图灵奖&#xff0c;被广泛认为是对人工智能领域影响最大的科学家之一。他…

游戏引擎学习第148天

回顾并规划今天的工作 没有使用引擎&#xff0c;也没有任何库支持&#xff0c;只有我们自己&#xff0c;编写游戏的所有代码&#xff0c;不仅仅是小小的部分&#xff0c;而是从头到尾。现在&#xff0c;我们正处于一个我一直想做的任务中&#xff0c;虽然一切都需要按部就班&a…