目录
- Day5-Part2:Zinx 的消息队列及多任务机制
- 创建消息队列
- 创建及启动 Worker 工作池
- 在 Server 启动的同时对连接池进行初始化
Day5-Part2:Zinx 的消息队列及多任务机制
接下来我们需要给 ZInx 添加消息队列以及多任务 Worker 机制。可以通过限制 worker 的数量来限定处理业务的 goroutine 数量,而不是无限制的开辟 goroutine。虽然 golang 在调度 goroutine 这方面已经做到极致了,但是数量过多的 goroutine 依然会带来不必要的环境切换成本,这应该是服务器节省掉的成本。
我们可以用消息队列来对 worker 进行缓冲。
创建消息队列
首先,处理消息队列的部分应该集成到MsgHandler
模块下,它属于消息模块的范畴:
type MsgHandle struct {Apis map[uint32]ziface.IRouter // map 存放每个 MsgId 对应的处理方法WorkerPoolSize uint32 // 业务工作 worker 池的数量TaskQueue []chan ziface.IRequest // worker 负责取任务的消息队列
}func NewMsgHandle() *MsgHandle {return &MsgHandle{Apis: make(map[uint32]ziface.IRouter),WorkerPoolSize: settings.Conf.WorkerPoolSize,TaskQueue: make([]chan ziface.IRequest, settings.Conf.WorkerPoolSize),}
}
这里添加了两个成员:
WorkerPoolSize
:工作池的数量;TaskQueue
:TaskQueue 是一个 Request 请求的 channel 集合,用来缓冲请求 worker 处理的 Request 信息,worker 会从对应的队列当中取客户端的请求数据并进行具体的业务处理。
还有一点要注意:我们在此处新添加了settings.Conf.WorkerPoolSize
字段,意味着有新的参数需要设定,我们需要修改 conf.yaml 文件以及 settings 中的 Conf 结构,新增字段 WorkerPoolSize。如果后续出现了新的以 settings.Conf 为前缀的字段,同样需要按照上述逻辑修改 Conf 的成员以及 yaml 文件。
创建及启动 Worker 工作池
我们修改了 MsgHandle 的成员,新增了 WorkerPoolSize 和 TaskQueue,现在我们为其新添加一些方法,首先修改 IMsgHandle 这个接口的定义,即首先在接口中定义好要新增哪些方法:
type IMsgHandle interface {DoMsgHandler(request IRequest) // 立即以非阻塞的方式处理消息AddRouter(msgId uint32, router IRouter) // 为消息添加具体的处理逻辑StartWorkerPool() // 启动 worker 工作池SendMsgToTaskQueue(request IRequest) // 将消息交给 TaskQueue, 由 worker 进行处理
}
定义好接口的方法,现在我们需要为 MsgHandle 实现新增的两个方法,以使其实现接口:
// StartOneWorker 启动一个 Worker 的工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {fmt.Println("Worker ID = ", workerID, " is started.")for {select {case request := <-taskQueue:mh.DoMsgHandler(request)}}
}// StartWorkerPool 启动 worker 工作池
func (mh *MsgHandle) StartWorkerPool() {// 遍历需要启动的 worker, 依次启动for i := 0; i < int(mh.WorkerPoolSize); i++ {// 一个 worker 被启动时, 给当前的 worker 对应的任务队列开辟空间mh.TaskQueue[i] = make(chan ziface.IRequest, settings.Conf.MaxWorkerTaskLen)// 启动当前 worker, 阻塞地等待对应的任务队列是否有消息传来go mh.StartOneWorker(i, mh.TaskQueue[i])}
}// SendMsgToTaskQueue 将消息交给 TaskQueue, 由 worker 进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {// 根据 ConnID 来分配当前的连接应该由哪个 worker 负责处理// 使用轮询分配法则// 得到需要处理此条连接地 workerIDworkerID := request.GetConnection().GetConnID() % mh.WorkerPoolSizefmt.Println("Add ConnID = ", request.GetConnection().GetConnID(), " request msgID = ", request.GetMsgID(), "to workerID = ", workerID)// 将请求消息发送给任务队列mh.TaskQueue[workerID] <- request
}
StartWorkerPool()
方法会启动 Worker 工作池,它会遍历 TaskQueue 这个存放着 request 类型的 channel 的列表,需要注意的是,TaskQueue 在工厂函数中初始化时,是初始化了保存 channel 的 slice,而没有初始化 channel,因此 channel 使用默认的零值 nil 进行填充。在启动工作池的时候,应当对 channel 进行正确的初始化,即使用 make 方法指定其类型以及缓冲区的大小。指定了缓冲区大小的 channel 更像是一个队列,可以完成对 request 的缓冲。初始化每一个 TaskQueue 下辖的 channel 之后,启动 StartOneWorker 这个 goroutine,即让相应的 worker 开启工作。StartOneWorker()
方法就是一个 Worker 的工作业务,它使用一个 for loop 来等待 taskQueue 中 request 的到来,注意形参 taskQueue,它是一个 Request 类型的通道,传入的实参是 StartWorkerPool 方法中的TaskQueue[i]
。- 现在,我们的 workers 就绪了,还需要一个给 worker 传入数据的入口,在
SendMsgToTaskQueue
实现这项功能。SendMsgToTaskQueue()
是工作池的数据入口,采用轮询的分配机制,根据 Server 分配的 ConnID,与 WorkerPoolSize 取模来计算具体要把这个 request 分配到哪个 worker 当中。分配好 worker 之后,通过TaskQueue[i]
将 Request 发送给对应的 worker,worker 就会在其 goroutine 中处理这个请求。
在 Server 启动的同时对连接池进行初始化
现在我们将消息队列及多任务机制集成到 Zinx 中。在 Server 的 Start 方法下,服务端 Accept 来自客户端的连接请求之前,使用 StartWorkerPool
启动线程池:
func (s *Server) Start() {//...go func() {//0 启动worker工作池机制s.msgHandler.StartWorkerPool()//1 获取一个TCP的Addraddr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Println("resolve tcp addr err: ", err)return}//...//...}}()
}
其次,当我们在 Connection 的 Reader 中得到来自客户端的数据之后,将数据发送给 Worker 的工作池,进行业务处理:
// StartReader 开启处理 conn 读数据的 goroutine
func (c *Connection) StartReader() {fmt.Println("Reader Goroutine is running")defer fmt.Println(c.RemoteAddr().String(), " conn reader exit !")defer c.Stop()for {// 创建封包拆包的对象dp := NewDataPack()// 读取客户端的 msg headheadData := make([]byte, dp.GetHeadLen()) // 注意 GetHeadLen() 返回常量 8, 因为包的头部长度固定if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head error", err)c.ExitBuffChan <- truereturn}// 拆包, 得到 msgid 和 datalen, 并放在 msg 中msg, err := dp.Unpack(headData)if err != nil {fmt.Println("unpack error", err)c.ExitBuffChan <- truereturn}// 根据 dataLen 读取 data, 放在 msg.Data 中var data []byteif msg.GetDataLen() > 0 {data = make([]byte, msg.GetDataLen())if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data error", err)c.ExitBuffChan <- truereturn}}msg.SetData(data)// 得到当前客户端请求的 Request 数据req := Request{conn: c,msg: msg,}if settings.Conf.WorkerPoolSize > 0 {// 已经启动工作池机制, 将消息交给 Worker 处理c.Msghandler.SendMsgToTaskQueue(&req)} else {// 从绑定好的消息和对应的处理方法中执行 Handle 方法go c.Msghandler.DoMsgHandler(&req)}}
}
此处我们并没有强制的启用多任务机制,如果 WorkerPoolSize 小于等于零,我们只开启一个临时的 goroutine 去处理业务。