并发服务器框架——zinx

zinx框架

Zinx 是一个用 Go 语言编写的高性能、轻量级的 TCP 服务器框架,它被设计为简单、快速且易于使用。Zinx 提供了一系列的功能,包括但不限于连接管理、数据编解码、业务处理、负载均衡等,适用于构建各种 TCP 网络服务,如游戏服务器、即时通讯服务器等。

下面实现zinx的多个功能包括:路由、全局配置、消息封装、读写分离、消息队列、链接管理等。

在这里插入图片描述
utils包下GlobalObj.go

package utilsimport ("datarace/zinx/ziface""encoding/json""io/ioutil"
)/*
存储一切有关Zinx框架的全局参数,供其他模块使用
一些参数也可以通过 用户根据 zinx.json来配置
*/
type GlobalObj struct {TcpServer        ziface.IServer //当前Zinx的全局Server对象Host             string         //当前服务器主机IPTcpPort          int            //当前服务器主机监听端口号Name             string         //当前服务器名称Version          string         //当前Zinx版本号MaxPacketSize    uint32         //都需数据包的最大值MaxConn          int            //当前服务器主机允许的最大链接个数WorkerPoolSize   uint32         //业务工作Worker池的数量MaxWorkerTaskLen uint32         //业务工作Worker对应负责的任务队列最大任务存储数量ConfFilePath     stringMaxMsgChanLen    int
}/*
定义一个全局的对象
*/
var GlobalObject *GlobalObj// 读取用户的配置文件
func (g *GlobalObj) Reload() {data, err := ioutil.ReadFile("zinx.json")if err != nil {panic(err)}//将json数据解析到struct中//fmt.Printf("json :%s\n", data)err = json.Unmarshal(data, &GlobalObject)if err != nil {panic(err)}
}
func init() {//初始化GlobalObject变量,设置一些默认值GlobalObject = &GlobalObj{Name:             "ZinxServerApp",Version:          "V0.4",TcpPort:          7777,Host:             "0.0.0.0",MaxConn:          12000,MaxPacketSize:    4096,ConfFilePath:     "conf/zinx.json",WorkerPoolSize:   10,MaxWorkerTaskLen: 1024,}//从配置文件中加载一些用户配置的参数GlobalObject.Reload()
}

ziface包

package zifacetype IConnManager interface {Add(conn IConnection)                   //添加链接Remove(conn IConnection)                //删除连接Get(connID uint32) (IConnection, error) //利用ConnID获取链接Len() int                               //获取当前连接ClearConn()                             //删除并停止所有链接
}
package zifaceimport "net"type IConnection interface {Start()Stop()GetConnID() uint32GetTCPConnection() *net.TCPConnRemoteAddr() net.AddrSendMsg(msgId uint32, data []byte) error//直接将Message数据发送给远程的TCP客户端(有缓冲)SendBuffMsg(msgId uint32, data []byte) error //添加带缓冲发送消息接口//设置链接属性SetProperty(key string, value interface{})//获取链接属性GetProperty(key string) (interface{}, error)//移除链接属性RemoveProperty(key string)
}
type HandFunc func(*net.TCPConn, []byte, int) error
package zifacetype IDataPack interface {GetHeadLen() int32Pack(msg IMessage) ([]byte, error)Unpack([]byte) (IMessage, error)
}
package zifacetype IMessage interface {GetDataLen() uint32GetMsgId() uint32GetData() []byteSetMsgId(uint32)SetData([]byte)SetDataLen(uint32)
}
package zifacetype IMsgHandle interface {DoMsgHandler(request IRequest)          //马上以非阻塞方式处理消息AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑StartWorkerPool()                       //启动worker工作池SendMsgToTaskQueue(request IRequest)    //将消息交给TaskQueue,由worker进行处理
}
package zifacetype IRequest interface {GetConnection() IConnectionGetData() []byteGetMsgID() uint32
}
package zifacetype IRouter interface {PreHandle(req IRequest)Handle(req IRequest)PostHandle(req IRequest)
}
package zifacetype IServer interface {Start()Stop()Serve()AddRouter(msgId uint32, router IRouter)//得到链接管理GetConnMgr() IConnManager//设置该Server的连接创建时Hook函数SetOnConnStart(func(IConnection))//设置该Server的连接断开时的Hook函数SetOnConnStop(func(IConnection))//调用连接OnConnStart Hook函数CallOnConnStart(conn IConnection)//调用连接OnConnStop Hook函数CallOnConnStop(conn IConnection)
}

znet包
connection

package znetimport ("datarace/zinx/utils""datarace/zinx/ziface""errors""fmt""io""net""sync"
)type Connection struct {//当前Conn属于哪个ServerTcpServer ziface.IServer //当前conn属于哪个server,在conn初始化的时候添加即可//当前连接的socket TCP套接字Conn *net.TCPConn//当前连接的ID 也可以称作为SessionID,ID全局唯一ConnID uint32//当前连接的关闭状态isClosed bool//消息管理MsgId和对应处理方法的消息管理模块MsgHandler ziface.IMsgHandle//告知该链接已经退出/停止的channelExitBuffChan chan bool//无缓冲管道,用于读、写两个goroutine之间的消息通信msgChan chan []byte//有关冲管道,用于读、写两个goroutine之间的消息通信msgBuffChan chan []byte//链接属性property map[string]interface{}//保护链接属性修改的锁propertyLock sync.RWMutex
}// 创建连接的方法
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {//初始化Conn属性c := &Connection{TcpServer:    server, //将隶属的server传递进来Conn:         conn,ConnID:       connID,isClosed:     false,MsgHandler:   msgHandler,ExitBuffChan: make(chan bool, 1),msgChan:      make(chan []byte),msgBuffChan:  make(chan []byte, utils.GlobalObject.MaxMsgChanLen), //不要忘记初始化property:     make(map[string]interface{}),                        //对链接属性map初始化}//将新创建的Conn添加到链接管理中c.TcpServer.GetConnMgr().Add(c) //将当前新创建的连接添加到ConnManager中return c
}// 设置链接属性
func (c *Connection) SetProperty(key string, value interface{}) {c.propertyLock.Lock()defer c.propertyLock.Unlock()c.property[key] = value
}// 获取链接属性
func (c *Connection) GetProperty(key string) (interface{}, error) {c.propertyLock.RLock()defer c.propertyLock.RUnlock()if value, ok := c.property[key]; ok {return value, nil} else {return nil, errors.New("no property found")}
}// 移除链接属性
func (c *Connection) RemoveProperty(key string) {c.propertyLock.Lock()defer c.propertyLock.Unlock()delete(c.property, key)
}
func (c *Connection) startReader() {fmt.Println("[Reader Goroutine is running]")defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")defer c.Stop()for {// 创建拆包解包的对象dp := NewDataPack()//读取客户端的Msg headheadData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head error ", err)break}//拆包,得到msgid 和 datalen 放在msg中msg, err := dp.Unpack(headData)if err != nil {fmt.Println("unpack error ", err)break}//根据 dataLen 读取 data,放在msg.Data中var data []byteif msg.GetDataLen() > 0 {data = make([]byte, msg.GetDataLen())if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data error ", err)continue}}msg.SetData(data)//得到当前客户端请求的Request数据req := Request{conn: c,msg:  msg,}//从绑定好的消息和对应的处理方法中执行对应的Handle方法if utils.GlobalObject.WorkerPoolSize > 0 {//已经启动工作池机制,将消息交给Worker处理c.MsgHandler.SendMsgToTaskQueue(&req)} else {//从绑定好的消息和对应的处理方法中执行对应的Handle方法go c.MsgHandler.DoMsgHandler(&req)}}
}// 启动连接,让当前连接开始工作
func (c *Connection) Start() {//1 开启用户从客户端读取数据流程的Goroutinego c.startReader()//2 开启用于写回客户端数据流程的Goroutinego c.StartWriter()//按照用户传递进来的创建连接时需要处理的业务,执行钩子方法c.TcpServer.CallOnConnStart(c)
}//func (c *Connection) Start() {
//	go c.startReader()
//	for {
//		select {
//		case <-c.ExitBuffChan:
//			return
//		}
//	}
//}// 停止连接,结束当前连接状态M
func (c *Connection) Stop() {fmt.Println("Conn Stop()...ConnID = ", c.ConnID)//如果当前链接已经关闭if c.isClosed == true {return}c.isClosed = true//==================//如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用c.TcpServer.CallOnConnStop(c)//==================// 关闭socket链接c.Conn.Close()//关闭Writerc.ExitBuffChan <- true//将链接从连接管理器中删除c.TcpServer.GetConnMgr().Remove(c)//关闭该链接全部管道close(c.ExitBuffChan)close(c.msgBuffChan)
}/*
写消息Goroutine, 用户将数据发送给客户端
*/
func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")for {select {case data := <-c.msgChan://有数据要写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send Data error:, ", err, " Conn Writer exit")return}//针对有缓冲channel需要些的数据处理case data, ok := <-c.msgBuffChan:if ok {//有数据要写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")return}} else {breakfmt.Println("msgBuffChan is Closed")}case <-c.ExitBuffChan:return}}
}// 直接将Message数据发送数据给远程的TCP客户端
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed == true {return errors.New("Connection closed when send msg")}//将data封包,并且发送dp := NewDataPack()msg, err := dp.Pack(NewMsgPackage(msgId, data))if err != nil {fmt.Println("Pack error msg id = ", msgId)return errors.New("Pack error msg ")}//写回客户端c.msgChan <- msg //将之前直接回写给conn.Write的方法 改为 发送给Channel 供Writer读取return nil
}// 从当前连接获取原始的socket TCPConn
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}// 获取当前连接ID
func (c *Connection) GetConnID() uint32 {return c.ConnID
}// 获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}// // 直接将Message数据发送数据给远程的TCP客户端
//
//	func (c *Connection) SendMsg(msgId uint32, data []byte) error {
//		if c.isClosed == true {
//			return errors.New("Connection closed when send msg")
//		}
//		//将data封包,并且发送
//		dp := NewDataPack()
//		msg, err := dp.Pack(NewMsgPackage(msgId, data))
//		if err != nil {
//			fmt.Println("Pack error msg id = ", msgId)
//			return errors.New("Pack error msg ")
//		}
//		//写回客户端
//		if _, err := c.Conn.Write(msg); err != nil {
//			fmt.Println("Write msg id ", msgId, " error ")
//			c.ExitBuffChan <- true
//			return errors.New("conn Write error")
//		}
//		return nil
//	}
func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {if c.isClosed == true {return errors.New("Connection closed when send buff msg")}//将data封包,并且发送dp := NewDataPack()msg, err := dp.Pack(NewMsgPackage(msgId, data))if err != nil {fmt.Println("Pack error msg id = ", msgId)return errors.New("Pack error msg ")}//写回客户端c.msgBuffChan <- msgreturn nil
}

ConnManager

package znetimport ("datarace/zinx/ziface""errors""fmt""sync"
)type ConnManager struct {connections map[uint32]ziface.IConnectionconnLock    sync.RWMutex
}/*
创建一个链接管理
*/
func NewConnManager() *ConnManager {return &ConnManager{connections: make(map[uint32]ziface.IConnection),}
}// 添加链接
func (connMgr *ConnManager) Add(conn ziface.IConnection) {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//将conn连接添加到ConnMananger中connMgr.connections[conn.GetConnID()] = connfmt.Println("connection add to ConnManager successfully: conn num = ", connMgr.Len())
}// 删除连接
func (connMgr *ConnManager) Remove(conn ziface.IConnection) {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//删除连接信息delete(connMgr.connections, conn.GetConnID())fmt.Println("connection Remove ConnID=", conn.GetConnID(), " successfully: conn num = ", connMgr.Len())
}// 利用ConnID获取链接
func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {//保护共享资源Map 加读锁connMgr.connLock.RLock()defer connMgr.connLock.RUnlock()if conn, ok := connMgr.connections[connID]; ok {return conn, nil} else {return nil, errors.New("connection not found")}
}// 获取当前连接
func (connMgr *ConnManager) Len() int {return len(connMgr.connections)
}// 清除并停止所有连接
func (connMgr *ConnManager) ClearConn() {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//停止并删除全部的连接信息for connID, conn := range connMgr.connections {//停止conn.Stop()//删除delete(connMgr.connections, connID)}fmt.Println("Clear All Connections successfully: conn num = ", connMgr.Len())
}

datapack

package znetimport ("bytes""datarace/zinx/utils""datarace/zinx/ziface""encoding/binary""errors"
)type DataPack struct{}func NewDataPack() *DataPack {return &DataPack{}
}
func (dp *DataPack) GetHeadLen() uint32 {//Id uint32(4字节) +  DataLen uint32(4字节)return 8
}// 封包方法(压缩数据)
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {//创建一个存放bytes字节的缓冲dataBuff := bytes.NewBuffer([]byte{})//写dataLenif err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil {return nil, err}//写msgIDif err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {return nil, err}//写data数据if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {return nil, err}return dataBuff.Bytes(), nil
}
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {//创建一个从输入二进制数据的ioReaderdataBuff := bytes.NewReader(binaryData)//只解压head的信息,得到dataLen和msgIDmsg := &Message{}//读dataLenif err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {return nil, err}//读msgIDif err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {return nil, err}//判断dataLen的长度是否超出我们允许的最大包长度if utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize {return nil, errors.New("Too large msg data recieved")}//这里只需要把head的数据拆包出来就可以了,然后再通过head的长度,再从conn读取一次数据return msg, nil
}

message

package znettype Message struct {Id      uint32DataLen uint32Data    []byte
}func NewMsgPackage(id uint32, data []byte) *Message {return &Message{Id:      id,DataLen: uint32(len(data)),Data:    data,}
}// 获取消息数据段长度
func (msg *Message) GetDataLen() uint32 {return msg.DataLen
}// 获取消息ID
func (msg *Message) GetMsgId() uint32 {return msg.Id
}// 获取消息内容
func (msg *Message) GetData() []byte {return msg.Data
}// 设置消息数据段长度
func (msg *Message) SetDataLen(len uint32) {msg.DataLen = len
}// 设计消息ID
func (msg *Message) SetMsgId(msgId uint32) {msg.Id = msgId
}// 设计消息内容
func (msg *Message) SetData(data []byte) {msg.Data = data
}

msgHandler

package znetimport ("datarace/zinx/utils""datarace/zinx/ziface""fmt""strconv"
)type MsgHandle struct {Apis           map[uint32]ziface.IRouterWorkerPoolSize uint32TaskQueue      []chan ziface.IRequest
}func NewMsgHandle() *MsgHandle {return &MsgHandle{Apis:           make(map[uint32]ziface.IRouter),WorkerPoolSize: utils.GlobalObject.WorkerPoolSize,TaskQueue:      make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),}
}// 马上以非阻塞方式处理消息
func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {handler, ok := mh.Apis[request.GetMsgID()]if !ok {fmt.Println("api msgId = ", request.GetMsgID(), " is not FOUND!")return}//执行对应处理方法handler.PreHandle(request)handler.Handle(request)handler.PostHandle(request)
}// 为消息添加具体的处理逻辑
func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {//1 判断当前msg绑定的API处理方法是否已经存在if _, ok := mh.Apis[msgId]; ok {panic("repeated api , msgId = " + strconv.Itoa(int(msgId)))}//2 添加msg与api的绑定关系mh.Apis[msgId] = routerfmt.Println("Add api msgId = ", msgId)
}// 启动一个Worker工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {fmt.Println("Worker ID = ", workerID, " is started.")//不断的等待队列中的消息for {select {//有消息则取出队列的Request,并执行绑定的业务方法case request := <-taskQueue:mh.DoMsgHandler(request)}}
}// 启动worker工作池
func (mh *MsgHandle) StartWorkerPool() {//遍历需要启动worker的数量,依此启动for i := 0; i < int(mh.WorkerPoolSize); i++ {//一个worker被启动//给当前worker对应的任务队列开辟空间mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来go mh.StartOneWorker(i, mh.TaskQueue[i])}
}// 将消息交给TaskQueue,由worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {//根据ConnID来分配当前的连接应该由哪个worker负责处理//轮询的平均分配法则//得到需要处理此条连接的workerIDworkerID := request.GetConnection().GetConnID() % mh.WorkerPoolSizefmt.Println("Add ConnID=", request.GetConnection().GetConnID(), " request msgID=", request.GetMsgID(), "to workerID=", workerID)//将请求消息发送给任务队列mh.TaskQueue[workerID] <- request
}

request

package znetimport ("datarace/zinx/ziface"
)type Request struct {conn ziface.IConnectionmsg  ziface.IMessage
}func (r *Request) GetConnection() ziface.IConnection {return r.conn
}// 获取请求消息的数据
func (r *Request) GetData() []byte {return r.msg.GetData()
}// 获取请求的消息的ID
func (r *Request) GetMsgID() uint32 {return r.msg.GetMsgId()
}

router

package znetimport "datarace/zinx/ziface"type BaseRouter struct{}func (br *BaseRouter) PreHandle(request ziface.IRequest)  {}
func (br *BaseRouter) Handle(request ziface.IRequest)     {}
func (br *BaseRouter) PostHandle(request ziface.IRequest) {}

server

package znetimport ("datarace/zinx/utils""datarace/zinx/ziface""fmt""net""time"
)// iServer 接口实现,定义一个Server服务类
type Server struct {//服务器的名称Name string//tcp4 or otherIPVersion string//服务绑定的IP地址IP string//服务绑定的端口Port int//当前Server的消息管理模块,用来绑定MsgId和对应的处理方法msgHandler ziface.IMsgHandle//当前Server的链接管理器ConnMgr ziface.IConnManager//新增两个hook函数原型//该Server的连接创建时Hook函数OnConnStart func(conn ziface.IConnection)//该Server的连接断开时的Hook函数OnConnStop func(conn ziface.IConnection)
}// 得到链接管理
func (s *Server) GetConnMgr() ziface.IConnManager {return s.ConnMgr
}// ============== 实现 ziface.IServer 里的全部接口方法 ========
// 开启网络服务
func (s *Server) Start() {fmt.Printf("[START] Server listenner at IP: %s, Port %d, is starting\n", s.IP, s.Port)fmt.Printf("[START] Server name: %s,listenner at IP: %s, Port %d is starting\n", s.Name, s.IP, s.Port)fmt.Printf("[Zinx] Version: %s, MaxConn: %d,  MaxPacketSize: %d\n",utils.GlobalObject.Version,utils.GlobalObject.MaxConn,utils.GlobalObject.MaxPacketSize)//开启一个go去做服务端Linster业务go func() {//1 获取一个TCP的Addrs.msgHandler.StartWorkerPool()addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Println("resolve tcp addr err: ", err)return}//2 监听服务器地址listenner, err := net.ListenTCP(s.IPVersion, addr)if err != nil {fmt.Println("listen", s.IPVersion, "err", err)return}//已经监听成功fmt.Println("start Zinx server  ", s.Name, " succ, now listenning...")var cid uint32cid = 0//3 启动server网络连接业务for {//3.1 阻塞等待客户端建立连接请求conn, err := listenner.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}//=============//3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn {conn.Close()continue}//=============//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的dealConn := NewConntion(s, conn, cid, s.msgHandler)cid++//3.4 启动当前链接的处理业务go dealConn.Start()//go func() {//	//不断的循环从客户端获取数据//	for {//		buf := make([]byte, 512)//		cnt, err := conn.Read(buf)//		if err != nil {//			fmt.Println("recv buf err ", err)//			continue//		}//		//回显//		if _, err := conn.Write(buf[:cnt]); err != nil {//			fmt.Println("write back buf err ", err)//			continue//		}//	}//}()}}()
}
func (s *Server) Stop() {fmt.Println("[STOP] Zinx server , name ", s.Name)//将其他需要清理的连接信息或者其他信息 也要一并停止或者清理s.ConnMgr.ClearConn()
}
func (s *Server) Serve() {s.Start()//TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加//阻塞,否则主Go退出, listenner的go将会退出for {time.Sleep(10 * time.Second)}
}
func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {s.msgHandler.AddRouter(msgId, router)fmt.Println("Add Router SUCC!! msgID = ", msgId)
}/*
创建一个服务器句柄
*/
func NewServer() *Server {utils.GlobalObject.Reload()s := &Server{Name:       utils.GlobalObject.Name,IPVersion:  "tcp4",IP:         utils.GlobalObject.Host,Port:       utils.GlobalObject.TcpPort,msgHandler: NewMsgHandle(),   //msgHandler 初始化ConnMgr:    NewConnManager(), //创建ConnManage}return s
}// 设置该Server的连接创建时Hook函数
func (s *Server) SetOnConnStart(hookFunc func(ziface.IConnection)) {s.OnConnStart = hookFunc
}// 设置该Server的连接断开时的Hook函数
func (s *Server) SetOnConnStop(hookFunc func(ziface.IConnection)) {s.OnConnStop = hookFunc
}// 调用连接OnConnStart Hook函数
func (s *Server) CallOnConnStart(conn ziface.IConnection) {if s.OnConnStart != nil {fmt.Println("---> CallOnConnStart....")s.OnConnStart(conn)}
}// 调用连接OnConnStop Hook函数
func (s *Server) CallOnConnStop(conn ziface.IConnection) {if s.OnConnStop != nil {fmt.Println("---> CallOnConnStop....")s.OnConnStop(conn)}
}

客户端

package mainimport ("datarace/zinx/znet""fmt""io""net""time"
)/*
模拟客户端
*/
func main() {fmt.Println("Client Test ... start")//3秒之后发起测试请求,给服务端开启服务的机会time.Sleep(3 * time.Second)conn, err := net.Dial("tcp", "127.0.0.1:7777")if err != nil {fmt.Println("client start err, exit!")return}for {//发封包message消息dp := znet.NewDataPack()msg, _ := dp.Pack(znet.NewMsgPackage(0, []byte("Zinx V0.8 Client0 Test Message")))_, err := conn.Write(msg)if err != nil {fmt.Println("write error err ", err)return}//先读出流中的head部分headData := make([]byte, dp.GetHeadLen())_, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止if err != nil {fmt.Println("read head error")break}//将headData字节流 拆包到msg中msgHead, err := dp.Unpack(headData)if err != nil {fmt.Println("server unpack err:", err)return}if msgHead.GetDataLen() > 0 {//msg 是有data数据的,需要再次读取data数据msg := msgHead.(*znet.Message)msg.Data = make([]byte, msg.GetDataLen())//根据dataLen从io中读取字节流_, err := io.ReadFull(conn, msg.Data)if err != nil {fmt.Println("server unpack data err:", err)return}fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))}time.Sleep(1 * time.Second)}
}

服务端

package mainimport ("datarace/zinx/ziface""datarace/zinx/znet""fmt"
)// ping test 自定义路由
type PingRouter struct {znet.BaseRouter
}// Ping Handle
func (this *PingRouter) Handle(request ziface.IRequest) {fmt.Println("Call PingRouter Handle")//先读取客户端的数据,再回写ping...ping...pingfmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))err := request.GetConnection().SendBuffMsg(0, []byte("ping...ping...ping"))if err != nil {fmt.Println(err)}
}type HelloZinxRouter struct {znet.BaseRouter
}// HelloZinxRouter Handle
func (this *HelloZinxRouter) Handle(request ziface.IRequest) {fmt.Println("Call HelloZinxRouter Handle")//先读取客户端的数据,再回写ping...ping...pingfmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))err := request.GetConnection().SendBuffMsg(1, []byte("Hello Zinx Router V0.10"))if err != nil {fmt.Println(err)}
}// 创建连接的时候执行
func DoConnectionBegin(conn ziface.IConnection) {fmt.Println("DoConnecionBegin is Called ... ")//=============设置两个链接属性,在连接创建之后===========fmt.Println("Set conn Name, Home done!")conn.SetProperty("Name", "Aceld")conn.SetProperty("Home", "https://www.jianshu.com/u/35261429b7f1")//===================================================err := conn.SendMsg(2, []byte("DoConnection BEGIN..."))if err != nil {fmt.Println(err)}
}// 连接断开的时候执行
func DoConnectionLost(conn ziface.IConnection) {//============在连接销毁之前,查询conn的Name,Home属性=====if name, err := conn.GetProperty("Name"); err == nil {fmt.Println("Conn Property Name = ", name)}if home, err := conn.GetProperty("Home"); err == nil {fmt.Println("Conn Property Home = ", home)}//===================================================fmt.Println("DoConneciotnLost is Called ... ")
}
func main() {//创建一个server句柄s := znet.NewServer()//注册链接hook回调函数s.SetOnConnStart(DoConnectionBegin)s.SetOnConnStop(DoConnectionLost)//配置路由s.AddRouter(0, &PingRouter{})s.AddRouter(1, &HelloZinxRouter{})//开启服务s.Serve()
}

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/502524.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

科研绘图系列:R语言科研绘图之标记热图(heatmap)

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍加载R包数据下载导入数据数据预处理画图系统信息参考介绍 科研绘图系列:R语言科研绘图之标记热图(heatmap) 加载R包 library(tidyverse) library(ggplot2) library(reshape)…

物体切割效果

1、物体切割效果是什么 在游戏开发中&#xff0c;物体切割效果就是物体看似被切割、分割或隐藏一部分的视觉效果。 这种效果常用与游戏和动画中&#xff0c;比如角色攻击时的切割效果&#xff0c;场景中的墙壁切割效果等等。 2、物体切割效果的基本原理 在片元着色器中判断片…

Prism模块化

1.先假设ModuleA是需要被模块化的&#xff0c;里面随便写了个用户控件 2.需要用这个模块就给添加一下它的引用 3.使用这个模块的时候就在App.xaml.cs中添加这个模块&#xff0c;通过重写方法ConfigureModuleCatalog实现 protected override void ConfigureModuleCatalog(IModu…

vue3+Echarts+ts实现甘特图

项目场景&#xff1a; vue3Echartsts实现甘特图;发布任务 代码实现 封装ganttEcharts.vue <template><!-- Echarts 甘特图 --><div ref"progressChart" class"w100 h100"></div> </template> <script lang"ts&qu…

【FlutterDart】 拖动边界线改变列宽并且有边界高亮和鼠标效果(12 /100)

【Flutter&Dart】 拖动改变 widget 的窗口尺寸大小GestureDetector&#xff5e;简单实现&#xff08;10 /100&#xff09; 【Flutter&Dart】 拖动边界线改变列宽类似 vscode 那种拖动改变编辑框窗口大小&#xff08;11 /100&#xff09; 上效果 对比一下vscode的效果&…

umd格式

umd格式是啥&#xff1f; umd格式是一种通用模块&#xff0c;他同时支持AMD、CJS、ESM模块和全局变量的方式 umd格式打包后的基本代码结构如下: (function (root, factory) {if (typeof define function && define.amd) {// AMDdefine([dependency], factory);} el…

《Rust权威指南》学习笔记(二)

枚举enum 1.枚举的定义和使用如下图所示&#xff1a; 定义时还可以给枚举的成员指定数据类型&#xff0c;例如&#xff1a;enum IpAddr{V4(u8, u8, u8, u8),V6(String),}。枚举的变体都位于标识符的命名空间下&#xff0c;使用::进行分隔。 2.一个特殊的枚举Option&#xff0…

CoppeliaSim和Python进行无人机联合仿真

首先建立起CoppeliaSim和Python的连接,其次在Python中生成轨迹,CoppeliaSim仿真环境中的无人机进行跟踪,并绘制出轨迹曲线,有每一步详细的教学。 最终运行效果: 一、 建立起CoppeliaSim和Python的远程连接 1. 拷贝API函数和库文件 拷贝库函数文件 sim.py、simConst.p…

「Java 数据结构全面解读」:从基础到进阶的实战指南

「Java 数据结构全面解读」&#xff1a;从基础到进阶的实战指南 数据结构是程序设计中的核心部分&#xff0c;用于组织和管理数据。Java 提供了丰富的集合框架和工具类&#xff0c;涵盖了常见的数据结构如数组、链表、栈、队列和树等。本文将系统性地介绍这些数据结构的概念、…

windows11安装minikube

主要是按照官网步骤安装&#xff0c;由于是英文&#xff0c;又不是常规安装包的形式&#xff0c;稍微难理解一点&#xff0c;特此记录。 下文仅是对部分步骤做了说明&#xff0c;需要以官网为主&#xff0c;本文为辅。 一、访问minikube官网 https://minikube.sigs.k8s.io/d…

LLM大模型RAG内容安全合规检查

1.了解内容安全合规涉及的范围 我们先回顾一下智能答疑机器人的问答流程。问答流程主要包括用户、智能答疑机器人、知识库、大语言模型这四个主体。 涉及内容安全的关键阶段主要有&#xff1a; 输入阶段&#xff1a;用户发起提问。 输出阶段&#xff1a;机器人返回回答。 知识…

OpenCV计算机视觉 05 图像边缘检测(Sobel算子、Scharr算子、Laplacian算子、Canny边缘检测)

图像边缘检测 边缘检测是图形图像处理、计算机视觉和机器视觉中的一个基本工具&#xff0c;通常用于特征提取和特征检测&#xff0c;旨在检测一张数字图像中有明显变化的边缘或者不连续的区域。 yuancv2.imread(yuan.png) cv2.imshow(yuan,yuan) cv2.waitKey(0) yuan_xcv2.Sob…

【C++】P2550 [AHOI2001] 彩票摇奖

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;题目描述输入格式&#xff1a;输出格式&#xff1a;输入输出样例&#xff1a; &#x1f4af;题解思路1. 问题解析 &#x1f4af;我的实现实现逻辑问题分析 &#x1f4af;老…

【调试记录】在CARLA中插入可以播放视频的组件

〇、问题描述 做实验验证的时候&#xff0c;需要在CARLA仿真环境中添加一个可以播放视频的功能&#xff0c;查了很多现有的实验&#xff0c;基本都是插入图像&#xff0c;而对于插入视频&#xff0c;实现的方法就很麻烦了。一开始考虑的是直接用射影变换进行叠加&#xff0c;计…

SQL—Group_Concat函数用法详解

SQL—Group_Concat函数用法详解 在LC遇见的一道很有趣的SQL题&#xff0c;有用到这个函数&#xff0c;就借这道题抛砖引玉&#xff0c;在此讲解一下group_concat函数的用法。&#x1f923; GROUP_CONCAT([DISTINCT] expression [ORDER BY expression] [SEPARATOR separator])…

深入解析 Linux 设备树中的引脚控制(pinctrl)二

在嵌入式开发中,设备树(Device Tree)是描述硬件设备和系统拓扑的重要结构。而在 Linux 内核中,引脚控制(pinctrl)是一个关键的硬件资源管理部分,负责管理和配置设备的引脚(GPIO、I2C、SPI 等接口)功能和状态。设备树通过描述这些引脚的特性,指导 Linux 内核如何正确地…

MySQL(六)MySQL 案例

1. MySQL 案例 1.1. 设计数据库 1、首先根据相关业务需求(主要参考输出输入条件)规划出表的基本结构   2、根据业务规则进行状态字段设计   3、预估相关表的数据量进行容量规划   4、确定主键   5、根据对相关处理语句的分析对数据结构进行相应的变更。   设计表的时…

后台管理系统动态面包屑Breadcrumb组件的实现

在后管理系统开发中&#xff0c;面包屑导航是一个非常常见的功能&#xff0c;通常是根据当前的 url 自动生成面包屑导航菜单&#xff0c;当跳转路由发生变化时&#xff0c;面包屑导航都会随之发生变化&#xff0c;即动态面包屑。 要完成动态面包屑我们需要制作一个动态数组&am…

4.1.2 栈和队列(一)

文章目录 栈的定义栈的基本运算栈的存储结构栈的应用表达式求值 栈和队列的逻辑结构与线性表相同&#xff0c;但是其运算受到限制&#xff0c;统称为运算受限的线性表。 栈&#xff0c; 先进后出 队列&#xff0c;先进先出 栈的定义 栈顶&#xff0c;唯一能操作端 栈底&#xf…