从0到1开发go-tcp框架【3-读写协程分离、引入消息队列、进入连接管理器、引入连接属性】【基础篇完结】

从0到1开发go-tcp框架【3-读写协程分离、引入消息队列、进入连接管理器、引入连接属性】

1 读写协程分离[v0.7]

  1. 添加一个Reader和Writer之间通信的channel
  2. 添加一个Writer goroutine
  3. Reader由之前直接发送给客户端改为发送给通信channel
  4. 启动Reader和Writer一起工作

zinx/znet/connection.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""io""net"
)type Connection struct {Conn       *net.TCPConnConnID     uint32isClosed   boolmsgChannel chan []byte//告知当前的连接已经退出/停止(由Reader告知writer退出)ExitChan   chan boolMsgHandler *MsgHandle
}func NewConnection(conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c := &Connection{Conn:       conn,ConnID:     connID,MsgHandler: msgHandle,isClosed:   false,msgChannel: make(chan []byte),ExitChan:   make(chan bool, 1),}return c
}func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println("[conn Writer  goroutine exit!]", c.RemoteAddr().String())//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端for {select {case data := <-c.msgChannel://有数据写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send data error , ", err)return}case <-c.ExitChan://代表reader已经退出,此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp := NewDataPack()//读取客户端的msg Head 二进制流 8字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head err ", err)break}//拆包,将读取到的headData封装为msgmsg, err := dp.UnPack(headData)if err != nil {fmt.Println("unpack msg err ", err)break}//根据dataLen,再次读取Data,放在msg.Data中,var data []byte//如果数据包中有数据,则读取if msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())//将切片data读满if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data err ", err)break}}msg.SetData(data)//封装请求,改为router处理r := Request{conn: c,msg:  msg,}go c.MsgHandler.DoMsgHandler(&r)}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = truec.Conn.Close()c.ExitChan <- trueclose(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New("connection closed\n")}//将data进行封包dp := NewDataPack()binaryMsg, err := dp.Pack(NewMessage(msgId, data))if err != nil {fmt.Println("Pack error msg id=", msgId)return errors.New("pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("write msg id ", msgId, " error ", err)return errors.New("conn write err ")}return nil
}

测试

myDemo/ZinxV0.7/client.go

  • client0.go
package mainimport ("fmt""io""myTest/zinx/znet""net""time"
)/*
模拟客户端
*/
func main() {fmt.Println("client start...")time.Sleep(time.Second * 1)//1 创建服务器连接conn, err := net.Dial("tcp", "127.0.0.1:8092")if err != nil {fmt.Println("client start err ", err)return}for {//发送封装后的数据包dp := znet.NewDataPack()binaryMsg, err := dp.Pack(znet.NewMessage(0, []byte("Zinx client0 test msg")))if err != nil {fmt.Println("client pack msg err ", err)return}if _, err := conn.Write(binaryMsg); err != nil {fmt.Println("client write err ", err)return}//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...//1 先读取流中的head部分,得到Id和dataLenbinaryHead := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(conn, binaryHead); err != nil {fmt.Println("client read head err ", err)break}//将二进制的head拆包到msg中msgHead, err := dp.UnPack(binaryHead)if err != nil {fmt.Println("client unpack msgHead err ", err)break}if msgHead.GetMsgLen() > 0 {//2 有数据, 再根据dataLen进行二次读取,将data读出来msg := msgHead.(*znet.Message)msg.Data = make([]byte, msg.GetMsgLen())if _, err := io.ReadFull(conn, msg.Data); err != nil {fmt.Println("read msg data error ", err)return}fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))}//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片time.Sleep(time.Second * 1)}
}
  • client1.go

在这里插入图片描述

myDemo/ZinxV0.7/server.go

package mainimport ("fmt""myTest/zinx/ziface""myTest/zinx/znet"
)//自定义一个Router,测试路由功能
type PingRouter struct {znet.BaseRouter
}func (pr *PingRouter) Handler(request ziface.IRequest) {fmt.Println("call router handler...")//先读取客户端数据,再回写ping...ping...ping...fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))//回写pingerr := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))if err != nil {fmt.Println(err)}
}//定义第二个Router
type HelloRouter struct {znet.BaseRouter
}func (hr *HelloRouter) Handler(request ziface.IRequest) {fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))if err != nil {fmt.Println(err)}
}func main() {s := znet.NewServer("[Zinx v0.7]")//添加自定义路由(PingRouter和HelloRouter)router0 := &PingRouter{}s.AddRouter(0, router0)router1 := &HelloRouter{}s.AddRouter(1, router1)s.Serve()
}

结果:
在这里插入图片描述

  • 接受多个客户端也可以
    在这里插入图片描述
  • 当client0退出时,不会影响client1
    在这里插入图片描述

2 创建消息队列及多任务[v0.8]

  1. 创建一个消息队列,MsgHandler消息管理模块增加:TaskQueue、WorkerPoolSize
  2. 创还能多任务worker的工作池并且启动
  3. 将之前发送的消息,全部改为把消息发送给消息队列和worker工作池来处理

在这里插入图片描述

实现消息队列机制和工作池机制(集成到自定义框架)

  1. 创建一个消息队列:MsgHandler消息管理模块
  2. 创建多任务worker的工作池并启动
  3. 将之前发送的消息,全部改为把消息发送给消息队列和worker工作池来处理
  4. 将消息队列机制集成到Zinx框架中
  • 开启并调用消息队列及worker工作池
  • 将从客户端处理的消息,发送给当前Worker的工作池来处理

zinx/znet/server.go

package znetimport ("fmt""myTest/zinx/util""myTest/zinx/ziface""net"
)type Server struct {Name       stringIPVersion  stringIP         stringPort       intMsgHandler *MsgHandle
}func NewServer(name string) *Server {s := &Server{Name:       name,IPVersion:  "tcp4",IP:         util.GlobalObject.Host,Port:       util.GlobalObject.TcpPort,MsgHandler: NewMsgHandle(),}return s
}func (s *Server) Start() {//启动服务监听端口fmt.Printf("[Zinx] Server Name :%s , listen IP :%v , Port: %d is starting \n", s.Name, s.IP, s.Port)fmt.Printf("[Zinx] Version :%s , MaxConn:%v , MaxPackageSize: %d \n", util.GlobalObject.Version, util.GlobalObject.MaxConn, util.GlobalObject.MaxPackageSize)var cid uint32 = 0go func() {//0 开启消息队列及Worker工作池s.MsgHandler.StartWorkerPool()addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Printf("resolve tcp addr error %v\n", err)return}listener, err := net.ListenTCP(s.IPVersion, addr)if err != nil {fmt.Println("listen ", s.IPVersion, " err ", err)return}fmt.Println("[start] Zinx server success ", s.Name, "Listening...")//阻塞连接,处理业务for {conn, err := listener.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}dealConn := NewConnection(conn, cid, s.MsgHandler)cid++//开启goroutine处理启动当前conngo dealConn.Start()}}()
}func (s *Server) Stop() {}func (s *Server) Serve() {s.Start()//阻塞,一直读取客户端所发送过来的消息select {}
}func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {s.MsgHandler.AddRouter(msgId, router)
}

zinx/znet/connection.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""io""myTest/zinx/util""net"
)type Connection struct {Conn       *net.TCPConnConnID     uint32isClosed   boolmsgChannel chan []byte//告知当前的连接已经退出/停止(由Reader告知writer退出)ExitChan   chan boolMsgHandler *MsgHandle
}func NewConnection(conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c := &Connection{Conn:       conn,ConnID:     connID,MsgHandler: msgHandle,isClosed:   false,msgChannel: make(chan []byte),ExitChan:   make(chan bool, 1),}return c
}func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println("[conn Writer  goroutine exit!]", c.RemoteAddr().String())//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端for {select {case data := <-c.msgChannel://有数据写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send data error , ", err)return}case <-c.ExitChan://代表reader已经退出,此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp := NewDataPack()//读取客户端的msg Head 二进制流 8字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head err ", err)break}//拆包,将读取到的headData封装为msgmsg, err := dp.UnPack(headData)if err != nil {fmt.Println("unpack msg err ", err)break}//根据dataLen,再次读取Data,放在msg.Data中,var data []byte//如果数据包中有数据,则读取if msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())//将切片data读满if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data err ", err)break}}msg.SetData(data)//封装请求,改为router处理r := Request{conn: c,msg:  msg,}//判断是否开启workerPool,如果没有开启则直接创建协程处理;如果开启则通过workerPool处理if util.GlobalObject.WorkerPoolSize > 0 {c.MsgHandler.SendMsgToTaskQueue(&r)} else {go c.MsgHandler.DoMsgHandler(&r)}}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = truec.Conn.Close()c.ExitChan <- trueclose(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New("connection closed\n")}//将data进行封包dp := NewDataPack()binaryMsg, err := dp.Pack(NewMessage(msgId, data))if err != nil {fmt.Println("Pack error msg id=", msgId)return errors.New("pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("write msg id ", msgId, " error ", err)return errors.New("conn write err ")}return nil
}

zinx/znet/msgHandler.go

package znetimport ("fmt""myTest/zinx/util""myTest/zinx/ziface""strconv"
)type MsgHandle struct {//msgId与对应的router对应Api map[uint32]ziface.IRouter//负责worker取任务的消息队列TaskQueue []chan ziface.IRequest//业务工作worker池的goroutine数量WorkerPoolSize uint32
}func NewMsgHandle() *MsgHandle {return &MsgHandle{Api:            make(map[uint32]ziface.IRouter),TaskQueue:      make([]chan ziface.IRequest, util.GlobalObject.WorkerPoolSize),WorkerPoolSize: util.GlobalObject.WorkerPoolSize,}
}func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {//判断是否有对应的routerif _, ok := mh.Api[request.GetMsgID()]; !ok {fmt.Println("msgId ", request.GetMsgID(), "does not exist handler, need to add router")return}//call handlerrouter := mh.Api[request.GetMsgID()]router.PreHandle(request)router.Handler(request)router.PostHandler(request)
}func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {if _, ok := mh.Api[msgId]; ok {//如果已经存在了对应的router,则提示panic("repeat api, msgId = " + strconv.Itoa(int(msgId)))}mh.Api[msgId] = routerfmt.Println("msgId ", msgId, "Add router success ")
}//启动一个worker工作池(开启工作池的动作只能发生一次,一个zinx框架只能有一个worker工作池)
func (mh *MsgHandle) StartWorkerPool() {for i := 0; i < int(mh.WorkerPoolSize); i++ {//开辟任务队列mh.TaskQueue[i] = make(chan ziface.IRequest, util.GlobalObject.MaxWorkerTaskLen)//启动workergo mh.startOneWorker(i, mh.TaskQueue[i])}
}func (mh *MsgHandle) startOneWorker(workerId int, taskQueue chan ziface.IRequest) {fmt.Println("Worker ID=", workerId, " is started...")for {select {//从任务队列中取消息(如果有消息过来,出列的就是request,然后执行该request所绑定的业务)case request := <-taskQueue:mh.DoMsgHandler(request)}}
}//将消息交给taskQueue,由Worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {//通过取余数的方式来达到负载均衡workID := request.GetConnection().GetConnectionID() % util.GlobalObject.WorkerPoolSizefmt.Println("Add ConnID=", request.GetConnection().GetConnectionID()," requestID=", request.GetMsgID()," workID=", workID)//将消息发送给对应worker的任务队列mh.TaskQueue[workID] <- request
}

zinx/ziface/imsgHandler.go

package zifacetype IMsgHandler interface {DoMsgHandler(request IRequest)AddRouter(msgId uint32, router IRouter)StartWorkerPool()SendMsgToTaskQueue(request IRequest)
}

测试

myDemo/ZinxV0.8/Server.go

同myDemo/ZinxV0.7/Server.go,修改一下NewServer时候所传的Zinx的名称即可

myDemo/ZinxV0.8/Client.go

同myDemo/ZinxV0.7/Client.go

myDemo/ZinxV0.8/zinx.json

{"Name": "Zinx Server Application","Version": "V0.8","Host": "0.0.0.0","TcpPort": 8092,"MaxConn": 30,"MaxPackageSize": 1024,"WorkerPoolSize": 10
}

在这里插入图片描述

在这里插入图片描述

3 连接管理器(connManager)[v0.9]

3.1 连接管理器(conn)的定义与实现

创建一个连接管理模块ConnManager

  • 添加连接
  • 删除连接
  • 根据连接ID查找对应的连接
  • 总连接个数
  • 清理全部的连接

3.2 将连接管理模块集成到Zinx框架中

  1. 给server添加一个ConnMgr属性
  2. 修改NewServer方法,加入ConnMgr初始化
  3. 判断当前连接数是否超出最大值MaxConn
  4. 当server停止的时候(调用server.Stop方法),应该加入ConnMgr.ClearConn()

3.3 提供创建连接/销毁连之前所需的Hook函数

给我们自定义框架Zinx提供创建连接之后/销毁连接之前所要处理的一些业务。提供给用户能够注册的Hook函数

  • 添加OnConnStart()
  • 添加OnConnStop()

zinx/ziface/iserver.go

package zifacetype IServer interface {Start()Stop()Serve()AddRouter(msgId uint32, router IRouter)GetConnMgr() IConnManager//注册创OnConnStart钩子函数SetOnConnStart(func(conn IConnection))SetOnConnStop(func(conn IConnection))//调用OnConnStart钩子函数CallOnConnStart(conn IConnection)CallOnConnStop(conn IConnection)
}

zinx/ziface/iconnmanager.go

package zifacetype IConnManager interface {Add(conn IConnection)Remove(conn IConnection)Get(connID uint32) (IConnection, error)Len() intClearConn()
}

zinx/znet/connmanager.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""myTest/zinx/util""myTest/zinx/ziface""sync"
)type ConnManager struct {connections map[uint32]ziface.IConnection //管理的连接集合connLock    sync.RWMutex                  //保护连接集合的读写锁
}func NewConnManager() *ConnManager {return &ConnManager{connections: make(map[uint32]ziface.IConnection, util.GlobalObject.MaxConn),}
}
func (cm *ConnManager) Add(conn ziface.IConnection) {//添加写锁cm.connLock.Lock()defer cm.connLock.Unlock()cm.connections[conn.GetConnectionID()] = connfmt.Println("connectionID=", conn.GetConnectionID(), " add to ConnManager success, conn num=", cm.Len())
}func (cm *ConnManager) Remove(conn ziface.IConnection) {//保护共享资源mapcm.connLock.Lock()defer cm.connLock.Unlock()delete(cm.connections, conn.GetConnectionID())fmt.Println("connectionID=", conn.GetConnectionID(), " remote from ConnManager success, conn num=", cm.Len())
}func (cm *ConnManager) Get(connID uint32) (ziface.IConnection, error) {cm.connLock.RLock()defer cm.connLock.RUnlock()if conn, ok := cm.connections[connID]; ok {return conn, nil} else {return nil, errors.New("connection NOT FOUND")}
}func (cm *ConnManager) Len() int {return len(cm.connections)
}func (cm *ConnManager) ClearConn() {cm.connLock.Lock()defer cm.connLock.Unlock()for connID, conn := range cm.connections {//停止连接conn.Stop()//删除连接delete(cm.connections, connID)}fmt.Println("Clear All connections success! conn num=", cm.Len())
}

zinx/znet/connection.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""io""myTest/zinx/util""myTest/zinx/ziface""net"
)type Connection struct {Conn       *net.TCPConnConnID     uint32isClosed   boolmsgChannel chan []byte//告知当前的连接已经退出/停止(由Reader告知writer退出)ExitChan   chan boolMsgHandler *MsgHandleTcpServer  ziface.IServer
}func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c := &Connection{Conn:       conn,ConnID:     connID,MsgHandler: msgHandle,isClosed:   false,msgChannel: make(chan []byte),ExitChan:   make(chan bool, 1),TcpServer:  server,}//将conn添加到connMgr中c.TcpServer.GetConnMgr().Add(c)return c
}func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println("[conn Writer  goroutine exit!]", c.RemoteAddr().String())//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端for {select {case data := <-c.msgChannel://有数据写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send data error , ", err)return}case <-c.ExitChan://代表reader已经退出,此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp := NewDataPack()//读取客户端的msg Head 二进制流 8字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head err ", err)break}//拆包,将读取到的headData封装为msgmsg, err := dp.UnPack(headData)if err != nil {fmt.Println("unpack msg err ", err)break}//根据dataLen,再次读取Data,放在msg.Data中,var data []byte//如果数据包中有数据,则读取if msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())//将切片data读满if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data err ", err)break}}msg.SetData(data)//封装请求,改为router处理r := Request{conn: c,msg:  msg,}//判断是否开启workerPool,如果没有开启则直接创建协程处理;如果开启则通过workerPool处理if util.GlobalObject.WorkerPoolSize > 0 {c.MsgHandler.SendMsgToTaskQueue(&r)} else {go c.MsgHandler.DoMsgHandler(&r)}}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()//执行钩子函数c.TcpServer.CallOnConnStart(c)
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = true//连接关闭之前执行hook关闭的钩子函数c.TcpServer.CallOnConnStop(c)c.Conn.Close()c.ExitChan <- true//连接conn关闭时,需要从连接管理模块中移除c.TcpServer.GetConnMgr().Remove(c)close(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New("connection closed\n")}//将data进行封包dp := NewDataPack()binaryMsg, err := dp.Pack(NewMessage(msgId, data))if err != nil {fmt.Println("Pack error msg id=", msgId)return errors.New("pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("write msg id ", msgId, " error ", err)return errors.New("conn write err ")}return nil
}

zinx/znet/server.go

package znetimport ("fmt""myTest/zinx/util""myTest/zinx/ziface""net"
)type Server struct {Name       stringIPVersion  stringIP         stringPort       intMsgHandler *MsgHandleConnMgr    *ConnManager//创建连接之前的Hook函数OnConnStart func(conn ziface.IConnection)OnConnStop  func(conn ziface.IConnection)
}func NewServer(name string) *Server {s := &Server{Name:       name,IPVersion:  "tcp4",IP:         util.GlobalObject.Host,Port:       util.GlobalObject.TcpPort,MsgHandler: NewMsgHandle(),ConnMgr:    NewConnManager(),}return s
}func (s *Server) Start() {//启动服务监听端口fmt.Printf("[Zinx] Server Name :%s , listen IP :%v , Port: %d is starting \n", s.Name, s.IP, s.Port)fmt.Printf("[Zinx] Version :%s , MaxConn:%v , MaxPackageSize: %d \n", util.GlobalObject.Version, util.GlobalObject.MaxConn, util.GlobalObject.MaxPackageSize)var cid uint32 = 0go func() {//0 开启消息队列及Worker工作池s.MsgHandler.StartWorkerPool()addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Printf("resolve tcp addr error %v\n", err)return}listener, err := net.ListenTCP(s.IPVersion, addr)if err != nil {fmt.Println("listen ", s.IPVersion, " err ", err)return}fmt.Println("[start] Zinx server success ", s.Name, "Listening...")//阻塞连接,处理业务for {conn, err := listener.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}//判断当前连接数是否超过最大连接数,如果超过则关闭新创建的连接if s.ConnMgr.Len() >= util.GlobalObject.MaxConn {//TODO 给客户端返回一个超出最大连接的错误包fmt.Println("-----------------》 Tcp Conn exceed, conn num=", util.GlobalObject.MaxConn)conn.Close()//关闭当前连接,等待下一次连接【如果当前连接数小于最大连接数】continue}dealConn := NewConnection(s, conn, cid, s.MsgHandler)cid++//开启goroutine处理启动当前conngo dealConn.Start()}}()
}func (s *Server) Stop() {//释放相关资源fmt.Println("[STOP] Zinx server name ", s.Name)s.ConnMgr.ClearConn()
}func (s *Server) Serve() {s.Start()//阻塞,一直读取客户端所发送过来的消息select {}
}func (s *Server) GetConnMgr() ziface.IConnManager {return s.ConnMgr
}func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {s.MsgHandler.AddRouter(msgId, router)
}//注册创OnConnStart钩子函数
func (s *Server) SetOnConnStart(hookFunc func(conn ziface.IConnection)) {s.OnConnStart = hookFunc
}func (s *Server) SetOnConnStop(hookFunc func(conn ziface.IConnection)) {s.OnConnStop = hookFunc
}//调用OnConnStart钩子函数
func (s *Server) CallOnConnStart(conn ziface.IConnection) {if s.OnConnStart != nil {fmt.Println("---------> call OnConnStart()")s.OnConnStart(conn)}
}func (s *Server) CallOnConnStop(conn ziface.IConnection) {if s.OnConnStop != nil {fmt.Println("----------> call OnConnStop()")s.OnConnStop(conn)}
}

测试

myDemo/ZinxV0.9/Server.go
package mainimport ("fmt""myTest/zinx/ziface""myTest/zinx/znet"
)//自定义一个Router,测试路由功能
type PingRouter struct {znet.BaseRouter
}func (pr *PingRouter) Handler(request ziface.IRequest) {fmt.Println("call router handler...")//先读取客户端数据,再回写ping...ping...ping...fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))//回写pingerr := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))if err != nil {fmt.Println(err)}
}//定义第二个Router
type HelloRouter struct {znet.BaseRouter
}func (hr *HelloRouter) Handler(request ziface.IRequest) {fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))if err != nil {fmt.Println(err)}
}//连接创建成功之后需要执行的逻辑
func DoConnBegin(conn ziface.IConnection) {fmt.Println("=====>Do Conn Begin...")if err := conn.SendMsg(202, []byte("do connection begin...")); err != nil {fmt.Println("err")}
}//连接断开之前要执行的逻辑
func DoConnLost(conn ziface.IConnection) {fmt.Println("=====>Do Conn Lost...")fmt.Println("connID=", conn.GetConnectionID(), " is Lost....")
}func main() {s := znet.NewServer("[Zinx v0.9]")//添加自定义路由(PingRouter和HelloRouter)router0 := &PingRouter{}s.AddRouter(0, router0)router1 := &HelloRouter{}s.AddRouter(1, router1)//注册hook钩子函数s.SetOnConnStart(DoConnBegin)s.SetOnConnStop(DoConnLost)s.Serve()
}

测试代码中的myDemo/ZinxV0.9/Client.go和myDemo/ZinxV0.8/Client.go一样。

  • 为了方便测试超过最大连接数的报错信息,我们可以修改配置文件
    在这里插入图片描述
//将最大连接数设置为2,然后我们复制Client.go,可以多起几个Client来进行测试
{"Name": "Zinx Server Application","Version": "V0.9","Host": "0.0.0.0","TcpPort": 8092,"MaxConn": 2,"MaxPackageSize": 1024,"WorkerPoolSize": 10
}

测试最大连接数与连接管理:
在这里插入图片描述

测试钩子函数:
在这里插入图片描述

4 添加连接属性并测试【v0.10】

通过map[string]interface{}来存储连接的属性值,通过RWLock来保证读写connection属性值安全

  • 设置连接属性
  • 获取连接属性
  • 移除连接属性

zinx/ziface/iconnection.go

package zifaceimport "net"type IConnection interface {//启动连接Start()//停止连接Stop()//获取当前连接的Conn对象GetTCPConnection() *net.TCPConn//获取当前连接模块的idGetConnectionID() uint32//获取远程客户端的TCP状态 IP:PortRemoteAddr() net.Addr//发送数据SendMsg(msgId uint32, data []byte) errorSetProperty(key string, value interface{})GetProperty(key string) (interface{}, error)RemoveProperty(key string)
}//定义一个处理连接业务的方法
type HandleFunc func(*net.TCPConn, []byte, int) error

zinx/znet/connection.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""io""myTest/zinx/util""myTest/zinx/ziface""net""sync"
)type Connection struct {Conn       *net.TCPConnConnID     uint32isClosed   boolmsgChannel chan []byte//告知当前的连接已经退出/停止(由Reader告知writer退出)ExitChan     chan boolMsgHandler   *MsgHandleTcpServer    ziface.IServerproperty     map[string]interface{}propertyLock sync.RWMutex
}func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c := &Connection{Conn:       conn,ConnID:     connID,MsgHandler: msgHandle,isClosed:   false,msgChannel: make(chan []byte),ExitChan:   make(chan bool, 1),TcpServer:  server,property: make(map[string]interface{}),}//将conn添加到connMgr中c.TcpServer.GetConnMgr().Add(c)return c
}func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println("[conn Writer  goroutine exit!]", c.RemoteAddr().String())//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端for {select {case data := <-c.msgChannel://有数据写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send data error , ", err)return}case <-c.ExitChan://代表reader已经退出,此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp := NewDataPack()//读取客户端的msg Head 二进制流 8字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head err ", err)break}//拆包,将读取到的headData封装为msgmsg, err := dp.UnPack(headData)if err != nil {fmt.Println("unpack msg err ", err)break}//根据dataLen,再次读取Data,放在msg.Data中,var data []byte//如果数据包中有数据,则读取if msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())//将切片data读满if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data err ", err)break}}msg.SetData(data)//封装请求,改为router处理r := Request{conn: c,msg:  msg,}//判断是否开启workerPool,如果没有开启则直接创建协程处理;如果开启则通过workerPool处理if util.GlobalObject.WorkerPoolSize > 0 {c.MsgHandler.SendMsgToTaskQueue(&r)} else {go c.MsgHandler.DoMsgHandler(&r)}}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()//执行钩子函数c.TcpServer.CallOnConnStart(c)
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = true//连接关闭之前执行hook关闭的钩子函数c.TcpServer.CallOnConnStop(c)c.Conn.Close()c.ExitChan <- true//连接conn关闭时,需要从连接管理模块中移除c.TcpServer.GetConnMgr().Remove(c)close(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New("connection closed\n")}//将data进行封包dp := NewDataPack()binaryMsg, err := dp.Pack(NewMessage(msgId, data))if err != nil {fmt.Println("Pack error msg id=", msgId)return errors.New("pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("write msg id ", msgId, " error ", err)return errors.New("conn write err ")}return nil
}func (c *Connection) SetProperty(key string, value interface{}) {c.propertyLock.Lock()defer c.propertyLock.Unlock()c.property[key] = value
}func (c *Connection) GetProperty(key string) (interface{}, error) {c.propertyLock.RLock()defer c.propertyLock.RUnlock()if value, ok := c.property[key]; ok {return value, nil} else {return nil, errors.New("no property found")}
}func (c *Connection) RemoveProperty(key string) {c.propertyLock.Lock()defer c.propertyLock.Unlock()delete(c.property, key)
}

测试

myDemo/ZinxV0.10/Server.go
package mainimport ("fmt""myTest/zinx/ziface""myTest/zinx/znet"
)//自定义一个Router,测试路由功能
type PingRouter struct {znet.BaseRouter
}func (pr *PingRouter) Handler(request ziface.IRequest) {fmt.Println("call router handler...")//先读取客户端数据,再回写ping...ping...ping...fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))//回写pingerr := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))if err != nil {fmt.Println(err)}
}//定义第二个Router
type HelloRouter struct {znet.BaseRouter
}func (hr *HelloRouter) Handler(request ziface.IRequest) {fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))if err != nil {fmt.Println(err)}
}//连接创建成功之后需要执行的逻辑
func DoConnBegin(conn ziface.IConnection) {fmt.Println("=====>Do Conn Begin...")if err := conn.SendMsg(202, []byte("do connection begin...")); err != nil {fmt.Println("err")}//给conn设置属性conn.SetProperty("Name", "ziyi")conn.SetProperty("士兵突击", "https://www.bilibili.com/video/BV1Lk4y1N7tC/")
}//连接断开之前要执行的逻辑
func DoConnLost(conn ziface.IConnection) {fmt.Println("=====>Do Conn Lost...")fmt.Println("connID=", conn.GetConnectionID(), " is Lost....")//读取属性property, _ := conn.GetProperty("Name")fmt.Println("Get Property Name=", property)property, _ = conn.GetProperty("士兵突击")fmt.Println("Get Property 士兵突击=", property)
}func main() {s := znet.NewServer("[Zinx v0.10]")//添加自定义路由(PingRouter和HelloRouter)router0 := &PingRouter{}s.AddRouter(0, router0)router1 := &HelloRouter{}s.AddRouter(1, router1)//注册hook钩子函数s.SetOnConnStart(DoConnBegin)s.SetOnConnStop(DoConnLost)s.Serve()
}
myDemo/ZinxV0.10/Client.go
package mainimport ("fmt""io""myTest/zinx/znet""net""time"
)/*
模拟客户端
*/
func main() {fmt.Println("client start...")time.Sleep(time.Second * 1)//1 创建服务器连接conn, err := net.Dial("tcp", "127.0.0.1:8092")if err != nil {fmt.Println("client start err ", err)return}for {//发送封装后的数据包dp := znet.NewDataPack()binaryMsg, err := dp.Pack(znet.NewMessage(0, []byte("Zinx client0 test msg")))if err != nil {fmt.Println("client pack msg err ", err)return}if _, err := conn.Write(binaryMsg); err != nil {fmt.Println("client write err ", err)return}//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...//1 先读取流中的head部分,得到Id和dataLenbinaryHead := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(conn, binaryHead); err != nil {fmt.Println("client read head err ", err)break}//将二进制的head拆包到msg中msgHead, err := dp.UnPack(binaryHead)if err != nil {fmt.Println("client unpack msgHead err ", err)break}if msgHead.GetMsgLen() > 0 {//2 有数据, 再根据dataLen进行二次读取,将data读出来msg := msgHead.(*znet.Message)msg.Data = make([]byte, msg.GetMsgLen())if _, err := io.ReadFull(conn, msg.Data); err != nil {fmt.Println("read msg data error ", err)return}fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))}//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片time.Sleep(time.Second * 1)}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/74310.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Golang 接口自动化05】使用yml管理自动化用例

目录 YAML 基本语法 对象&#xff1a;键值对的集合(key:value) 数组&#xff1a;一组按顺序排列的值 字面量&#xff1a;单个的、不可再分的值&#xff08;数字、字符串、布尔值&#xff09; yml 格式的测试用例 定义yml文件 创建结构体 读取yml文件中的用例数据 调试…

【1.3】Java微服务:Spring Cloud版本说明

✅作者简介&#xff1a;大家好&#xff0c;我是 Meteors., 向往着更加简洁高效的代码写法与编程方式&#xff0c;持续分享Java技术内容。 &#x1f34e;个人主页&#xff1a;Meteors.的博客 &#x1f49e;当前专栏&#xff1a; 微服务 ✨特色专栏&#xff1a; 知识分享 &#x…

【后端面经】微服务构架 (1-6) | 隔离:如何确保心悦会员体验无忧?唱响隔离的鸣奏曲!

文章目录 一、前置知识1、什么是隔离?2、为什么要隔离?3、怎么进行隔离?A) 机房隔离B) 实例隔离C) 分组隔离D) 连接池隔离 与 线程池隔离E) 信号量隔离F) 第三方依赖隔离二、面试环节1、面试准备2、基本思路3、亮点方案A) 慢任务隔离B) 制作库与线上库分离三、章节总结 …

深度学习各层负责什么内容?

1、深度学习——神经网络简介 深度学习(Deep Learning)(也称为深度结构学习【Deep Structured Learning】、层次学习【Hierarchical Learning】或者是深度机器学习【Deep Machine Learning】)是一类算法集合&#xff0c;是机器学习的一个分支。 深度学习方法近年来&#xff0c…

元宇宙虚拟展厅的特点是什么呢?优势有哪些?

元宇宙是一个很广阔的虚拟世界&#xff0c;它可以创造出更为丰富、沉浸式的体验&#xff0c;这种全新的体验为展览和艺术领域带来了更多的可能性&#xff0c;元宇宙虚拟展厅以其多样化、互动性、沉浸式展示的特点&#xff0c;带领大家进入一个虚拟现实的全新世界。 元宇宙虚拟展…

MACOM EDI 需求分析

MACOM 是一家全球性半导体公司&#xff0c;专注于设计和制造高性能射频、微波和光电元件&#xff0c;其产品被广泛应用于通信、航空航天、国防、工业和医疗等领域。随着 MACOM 的不断发展&#xff0c;传统数据传输方式效率较低&#xff0c;无法满足 MACOM 的需求。为了提高企业…

使用自适应去噪在线顺序极限学习机预测飞机发动机剩余使用寿命(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

代理模式--静态代理和动态代理

1.代理模式 定义&#xff1a;代理模式就是代替对象具备真实对象的功能&#xff0c;并代替真实对象完成相应的操作并且在不改变真实对象源代码的情况下扩展其功能&#xff0c;在某些情况下&#xff0c;⼀个对象不适合或者不能直接引⽤另⼀个对象&#xff0c;⽽代理对象可以在客户…

【前端知识】React 基础巩固(三十六)——RTK中的异步操作

React 基础巩固(三十六)——RTK中的异步操作 一、RTK中使用异步操作 引入RTK中的createAsyncThunk&#xff0c;在extraReducers中监听执行状态 import { createSlice, createAsyncThunk } from "reduxjs/toolkit"; import axios from "axios";export cons…

Spring事务传播机制、实现方式、失效场景即原理

贴一篇源码分析的好文章&#xff1a;https://blog.csdn.net/qq_30905661/article/details/114400417 本质&#xff1a; 一个事务对应一个数据库连接。 通过 this 来调用某个带有 Transactional 注解的方法时&#xff0c;这个注解是失效的&#xff0c;可以看做这个方法&#x…

安防视频汇聚平台EasyCVR视频广场面包屑侧边栏支持拖拽操作

智能视频监控平台EasyCVR能在复杂的网络环境中&#xff0c;将海量设备实现集中统一接入与汇聚管理&#xff0c;实现视频的处理与分发、录像与存储、按需调阅、平台级联等。 TSINGSEE青犀视频汇聚平台EasyCVR可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协…

2023年中职组“网络安全”赛项吉安市竞赛任务书

2023年中职组“网络安全”赛项 吉安市竞赛任务书 一、竞赛时间 总计&#xff1a;360分钟 竞赛阶段 竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 A模块 A-1 登录安全加固 180分钟 200分 A-2 本地安全策略配置 A-3 流量完整性保护 A-4 事件监控 A-5 服务加固…

MYSQL进阶-事务

什么是数据库事务&#xff1f; 事务是一个不可分割的数据库操作序列&#xff0c;也是数据库并发控制的基本单位&#xff0c;其执 行的结果必须使数据库从一种一致性状态变到另一种一致性状态。事务是逻辑上 的一组操作&#xff0c;要么都执行&#xff0c;要么都不执行。 事务最…

使用express搭建后端服务

目录 1 创建工程目录2 初始化3 安装express依赖4 启动服务5 访问服务总结 上一篇我们利用TDesign搭建了前端服务&#xff0c;现在的开发讲究一个前后端分离&#xff0c;后端的话需要单独搭建服务。后端服务的技术栈还挺多&#xff0c;有java、php、python、nodejs等。在众多的技…

2023-08-03 LeetCode每日一题(删除注释)

2023-08-03每日一题 一、题目编号 722. 删除注释二、题目链接 点击跳转到题目位置 三、题目描述 给一个 C 程序&#xff0c;删除程序中的注释。这个程序source是一个数组&#xff0c;其中source[i]表示第 i 行源码。 这表示每行源码由 ‘\n’ 分隔。 在 C 中有两种注释风…

【图论】强连通分量

一.定义 强连通分量&#xff08;Strongly Connected Components&#xff0c;简称SCC&#xff09;是图论中的一个概念&#xff0c;用于描述有向图中的一组顶点&#xff0c;其中任意两个顶点之间都存在一条有向路径。换句话说&#xff0c;对于图中的任意两个顶点u和v&#xff0c;…

数学学习——最优化问题引入、凸集、凸函数、凸优化、梯度、Jacobi矩阵、Hessian矩阵

文章目录 最优化问题引入凸集凸函数凸优化梯度Jacobi矩阵Hessian矩阵 最优化问题引入 例如&#xff1a;有一根绳子&#xff0c;长度一定的情况下&#xff0c;需要如何围成一个面积最大的图像&#xff1f;这就是一个最优化的问题。就是我们高中数学中最常见的最值问题。 最优化…

《Java-SE-第二十七章》之常见的锁策略

前言 在你立足处深挖下去,就会有泉水涌出!别管蒙昧者们叫嚷:“下边永远是地狱!” 博客主页&#xff1a;KC老衲爱尼姑的博客主页 博主的github&#xff0c;平常所写代码皆在于此 共勉&#xff1a;talk is cheap, show me the code 作者是爪哇岛的新手&#xff0c;水平很有限&…

Maven发布项目到Nexus私服

项目pom配置 在项目pom.xml中文件中的仓库配置&#xff0c;Nexus私服如何搭建在这里不介绍了可自行百度。 <distributionManagement><repository><id>releases</id><name>Nexus Release Repository</name><url>http://私服地址:34…