zinx框架
Zinx 是一个用 Go 语言编写的高性能、轻量级的 TCP 服务器框架,它被设计为简单、快速且易于使用。Zinx 提供了一系列的功能,包括但不限于连接管理、数据编解码、业务处理、负载均衡等,适用于构建各种 TCP 网络服务,如游戏服务器、即时通讯服务器等。
下面实现zinx的多个功能包括:路由、全局配置、消息封装、读写分离、消息队列、链接管理等。
utils包下GlobalObj.go
package utilsimport ("datarace/zinx/ziface""encoding/json""io/ioutil"
)/*
存储一切有关Zinx框架的全局参数,供其他模块使用
一些参数也可以通过 用户根据 zinx.json来配置
*/
type GlobalObj struct {TcpServer ziface.IServer //当前Zinx的全局Server对象Host string //当前服务器主机IPTcpPort int //当前服务器主机监听端口号Name string //当前服务器名称Version string //当前Zinx版本号MaxPacketSize uint32 //都需数据包的最大值MaxConn int //当前服务器主机允许的最大链接个数WorkerPoolSize uint32 //业务工作Worker池的数量MaxWorkerTaskLen uint32 //业务工作Worker对应负责的任务队列最大任务存储数量ConfFilePath stringMaxMsgChanLen int
}/*
定义一个全局的对象
*/
var GlobalObject *GlobalObj// 读取用户的配置文件
func (g *GlobalObj) Reload() {data, err := ioutil.ReadFile("zinx.json")if err != nil {panic(err)}//将json数据解析到struct中//fmt.Printf("json :%s\n", data)err = json.Unmarshal(data, &GlobalObject)if err != nil {panic(err)}
}
func init() {//初始化GlobalObject变量,设置一些默认值GlobalObject = &GlobalObj{Name: "ZinxServerApp",Version: "V0.4",TcpPort: 7777,Host: "0.0.0.0",MaxConn: 12000,MaxPacketSize: 4096,ConfFilePath: "conf/zinx.json",WorkerPoolSize: 10,MaxWorkerTaskLen: 1024,}//从配置文件中加载一些用户配置的参数GlobalObject.Reload()
}
ziface包
package zifacetype IConnManager interface {Add(conn IConnection) //添加链接Remove(conn IConnection) //删除连接Get(connID uint32) (IConnection, error) //利用ConnID获取链接Len() int //获取当前连接ClearConn() //删除并停止所有链接
}
package zifaceimport "net"type IConnection interface {Start()Stop()GetConnID() uint32GetTCPConnection() *net.TCPConnRemoteAddr() net.AddrSendMsg(msgId uint32, data []byte) error//直接将Message数据发送给远程的TCP客户端(有缓冲)SendBuffMsg(msgId uint32, data []byte) error //添加带缓冲发送消息接口//设置链接属性SetProperty(key string, value interface{})//获取链接属性GetProperty(key string) (interface{}, error)//移除链接属性RemoveProperty(key string)
}
type HandFunc func(*net.TCPConn, []byte, int) error
package zifacetype IDataPack interface {GetHeadLen() int32Pack(msg IMessage) ([]byte, error)Unpack([]byte) (IMessage, error)
}
package zifacetype IMessage interface {GetDataLen() uint32GetMsgId() uint32GetData() []byteSetMsgId(uint32)SetData([]byte)SetDataLen(uint32)
}
package zifacetype IMsgHandle interface {DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑StartWorkerPool() //启动worker工作池SendMsgToTaskQueue(request IRequest) //将消息交给TaskQueue,由worker进行处理
}
package zifacetype IRequest interface {GetConnection() IConnectionGetData() []byteGetMsgID() uint32
}
package zifacetype IRouter interface {PreHandle(req IRequest)Handle(req IRequest)PostHandle(req IRequest)
}
package zifacetype IServer interface {Start()Stop()Serve()AddRouter(msgId uint32, router IRouter)//得到链接管理GetConnMgr() IConnManager//设置该Server的连接创建时Hook函数SetOnConnStart(func(IConnection))//设置该Server的连接断开时的Hook函数SetOnConnStop(func(IConnection))//调用连接OnConnStart Hook函数CallOnConnStart(conn IConnection)//调用连接OnConnStop Hook函数CallOnConnStop(conn IConnection)
}
znet包
connection
package znetimport ("datarace/zinx/utils""datarace/zinx/ziface""errors""fmt""io""net""sync"
)type Connection struct {//当前Conn属于哪个ServerTcpServer ziface.IServer //当前conn属于哪个server,在conn初始化的时候添加即可//当前连接的socket TCP套接字Conn *net.TCPConn//当前连接的ID 也可以称作为SessionID,ID全局唯一ConnID uint32//当前连接的关闭状态isClosed bool//消息管理MsgId和对应处理方法的消息管理模块MsgHandler ziface.IMsgHandle//告知该链接已经退出/停止的channelExitBuffChan chan bool//无缓冲管道,用于读、写两个goroutine之间的消息通信msgChan chan []byte//有关冲管道,用于读、写两个goroutine之间的消息通信msgBuffChan chan []byte//链接属性property map[string]interface{}//保护链接属性修改的锁propertyLock sync.RWMutex
}// 创建连接的方法
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {//初始化Conn属性c := &Connection{TcpServer: server, //将隶属的server传递进来Conn: conn,ConnID: connID,isClosed: false,MsgHandler: msgHandler,ExitBuffChan: make(chan bool, 1),msgChan: make(chan []byte),msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen), //不要忘记初始化property: make(map[string]interface{}), //对链接属性map初始化}//将新创建的Conn添加到链接管理中c.TcpServer.GetConnMgr().Add(c) //将当前新创建的连接添加到ConnManager中return c
}// 设置链接属性
func (c *Connection) SetProperty(key string, value interface{}) {c.propertyLock.Lock()defer c.propertyLock.Unlock()c.property[key] = value
}// 获取链接属性
func (c *Connection) GetProperty(key string) (interface{}, error) {c.propertyLock.RLock()defer c.propertyLock.RUnlock()if value, ok := c.property[key]; ok {return value, nil} else {return nil, errors.New("no property found")}
}// 移除链接属性
func (c *Connection) RemoveProperty(key string) {c.propertyLock.Lock()defer c.propertyLock.Unlock()delete(c.property, key)
}
func (c *Connection) startReader() {fmt.Println("[Reader Goroutine is running]")defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")defer c.Stop()for {// 创建拆包解包的对象dp := NewDataPack()//读取客户端的Msg headheadData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head error ", err)break}//拆包,得到msgid 和 datalen 放在msg中msg, err := dp.Unpack(headData)if err != nil {fmt.Println("unpack error ", err)break}//根据 dataLen 读取 data,放在msg.Data中var data []byteif msg.GetDataLen() > 0 {data = make([]byte, msg.GetDataLen())if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data error ", err)continue}}msg.SetData(data)//得到当前客户端请求的Request数据req := Request{conn: c,msg: msg,}//从绑定好的消息和对应的处理方法中执行对应的Handle方法if utils.GlobalObject.WorkerPoolSize > 0 {//已经启动工作池机制,将消息交给Worker处理c.MsgHandler.SendMsgToTaskQueue(&req)} else {//从绑定好的消息和对应的处理方法中执行对应的Handle方法go c.MsgHandler.DoMsgHandler(&req)}}
}// 启动连接,让当前连接开始工作
func (c *Connection) Start() {//1 开启用户从客户端读取数据流程的Goroutinego c.startReader()//2 开启用于写回客户端数据流程的Goroutinego c.StartWriter()//按照用户传递进来的创建连接时需要处理的业务,执行钩子方法c.TcpServer.CallOnConnStart(c)
}//func (c *Connection) Start() {
// go c.startReader()
// for {
// select {
// case <-c.ExitBuffChan:
// return
// }
// }
//}// 停止连接,结束当前连接状态M
func (c *Connection) Stop() {fmt.Println("Conn Stop()...ConnID = ", c.ConnID)//如果当前链接已经关闭if c.isClosed == true {return}c.isClosed = true//==================//如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用c.TcpServer.CallOnConnStop(c)//==================// 关闭socket链接c.Conn.Close()//关闭Writerc.ExitBuffChan <- true//将链接从连接管理器中删除c.TcpServer.GetConnMgr().Remove(c)//关闭该链接全部管道close(c.ExitBuffChan)close(c.msgBuffChan)
}/*
写消息Goroutine, 用户将数据发送给客户端
*/
func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")for {select {case data := <-c.msgChan://有数据要写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send Data error:, ", err, " Conn Writer exit")return}//针对有缓冲channel需要些的数据处理case data, ok := <-c.msgBuffChan:if ok {//有数据要写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")return}} else {breakfmt.Println("msgBuffChan is Closed")}case <-c.ExitBuffChan:return}}
}// 直接将Message数据发送数据给远程的TCP客户端
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed == true {return errors.New("Connection closed when send msg")}//将data封包,并且发送dp := NewDataPack()msg, err := dp.Pack(NewMsgPackage(msgId, data))if err != nil {fmt.Println("Pack error msg id = ", msgId)return errors.New("Pack error msg ")}//写回客户端c.msgChan <- msg //将之前直接回写给conn.Write的方法 改为 发送给Channel 供Writer读取return nil
}// 从当前连接获取原始的socket TCPConn
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}// 获取当前连接ID
func (c *Connection) GetConnID() uint32 {return c.ConnID
}// 获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}// // 直接将Message数据发送数据给远程的TCP客户端
//
// func (c *Connection) SendMsg(msgId uint32, data []byte) error {
// if c.isClosed == true {
// return errors.New("Connection closed when send msg")
// }
// //将data封包,并且发送
// dp := NewDataPack()
// msg, err := dp.Pack(NewMsgPackage(msgId, data))
// if err != nil {
// fmt.Println("Pack error msg id = ", msgId)
// return errors.New("Pack error msg ")
// }
// //写回客户端
// if _, err := c.Conn.Write(msg); err != nil {
// fmt.Println("Write msg id ", msgId, " error ")
// c.ExitBuffChan <- true
// return errors.New("conn Write error")
// }
// return nil
// }
func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {if c.isClosed == true {return errors.New("Connection closed when send buff msg")}//将data封包,并且发送dp := NewDataPack()msg, err := dp.Pack(NewMsgPackage(msgId, data))if err != nil {fmt.Println("Pack error msg id = ", msgId)return errors.New("Pack error msg ")}//写回客户端c.msgBuffChan <- msgreturn nil
}
ConnManager
package znetimport ("datarace/zinx/ziface""errors""fmt""sync"
)type ConnManager struct {connections map[uint32]ziface.IConnectionconnLock sync.RWMutex
}/*
创建一个链接管理
*/
func NewConnManager() *ConnManager {return &ConnManager{connections: make(map[uint32]ziface.IConnection),}
}// 添加链接
func (connMgr *ConnManager) Add(conn ziface.IConnection) {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//将conn连接添加到ConnMananger中connMgr.connections[conn.GetConnID()] = connfmt.Println("connection add to ConnManager successfully: conn num = ", connMgr.Len())
}// 删除连接
func (connMgr *ConnManager) Remove(conn ziface.IConnection) {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//删除连接信息delete(connMgr.connections, conn.GetConnID())fmt.Println("connection Remove ConnID=", conn.GetConnID(), " successfully: conn num = ", connMgr.Len())
}// 利用ConnID获取链接
func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {//保护共享资源Map 加读锁connMgr.connLock.RLock()defer connMgr.connLock.RUnlock()if conn, ok := connMgr.connections[connID]; ok {return conn, nil} else {return nil, errors.New("connection not found")}
}// 获取当前连接
func (connMgr *ConnManager) Len() int {return len(connMgr.connections)
}// 清除并停止所有连接
func (connMgr *ConnManager) ClearConn() {//保护共享资源Map 加写锁connMgr.connLock.Lock()defer connMgr.connLock.Unlock()//停止并删除全部的连接信息for connID, conn := range connMgr.connections {//停止conn.Stop()//删除delete(connMgr.connections, connID)}fmt.Println("Clear All Connections successfully: conn num = ", connMgr.Len())
}
datapack
package znetimport ("bytes""datarace/zinx/utils""datarace/zinx/ziface""encoding/binary""errors"
)type DataPack struct{}func NewDataPack() *DataPack {return &DataPack{}
}
func (dp *DataPack) GetHeadLen() uint32 {//Id uint32(4字节) + DataLen uint32(4字节)return 8
}// 封包方法(压缩数据)
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {//创建一个存放bytes字节的缓冲dataBuff := bytes.NewBuffer([]byte{})//写dataLenif err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil {return nil, err}//写msgIDif err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {return nil, err}//写data数据if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {return nil, err}return dataBuff.Bytes(), nil
}
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {//创建一个从输入二进制数据的ioReaderdataBuff := bytes.NewReader(binaryData)//只解压head的信息,得到dataLen和msgIDmsg := &Message{}//读dataLenif err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {return nil, err}//读msgIDif err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {return nil, err}//判断dataLen的长度是否超出我们允许的最大包长度if utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize {return nil, errors.New("Too large msg data recieved")}//这里只需要把head的数据拆包出来就可以了,然后再通过head的长度,再从conn读取一次数据return msg, nil
}
message
package znettype Message struct {Id uint32DataLen uint32Data []byte
}func NewMsgPackage(id uint32, data []byte) *Message {return &Message{Id: id,DataLen: uint32(len(data)),Data: data,}
}// 获取消息数据段长度
func (msg *Message) GetDataLen() uint32 {return msg.DataLen
}// 获取消息ID
func (msg *Message) GetMsgId() uint32 {return msg.Id
}// 获取消息内容
func (msg *Message) GetData() []byte {return msg.Data
}// 设置消息数据段长度
func (msg *Message) SetDataLen(len uint32) {msg.DataLen = len
}// 设计消息ID
func (msg *Message) SetMsgId(msgId uint32) {msg.Id = msgId
}// 设计消息内容
func (msg *Message) SetData(data []byte) {msg.Data = data
}
msgHandler
package znetimport ("datarace/zinx/utils""datarace/zinx/ziface""fmt""strconv"
)type MsgHandle struct {Apis map[uint32]ziface.IRouterWorkerPoolSize uint32TaskQueue []chan ziface.IRequest
}func NewMsgHandle() *MsgHandle {return &MsgHandle{Apis: make(map[uint32]ziface.IRouter),WorkerPoolSize: utils.GlobalObject.WorkerPoolSize,TaskQueue: make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),}
}// 马上以非阻塞方式处理消息
func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {handler, ok := mh.Apis[request.GetMsgID()]if !ok {fmt.Println("api msgId = ", request.GetMsgID(), " is not FOUND!")return}//执行对应处理方法handler.PreHandle(request)handler.Handle(request)handler.PostHandle(request)
}// 为消息添加具体的处理逻辑
func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {//1 判断当前msg绑定的API处理方法是否已经存在if _, ok := mh.Apis[msgId]; ok {panic("repeated api , msgId = " + strconv.Itoa(int(msgId)))}//2 添加msg与api的绑定关系mh.Apis[msgId] = routerfmt.Println("Add api msgId = ", msgId)
}// 启动一个Worker工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {fmt.Println("Worker ID = ", workerID, " is started.")//不断的等待队列中的消息for {select {//有消息则取出队列的Request,并执行绑定的业务方法case request := <-taskQueue:mh.DoMsgHandler(request)}}
}// 启动worker工作池
func (mh *MsgHandle) StartWorkerPool() {//遍历需要启动worker的数量,依此启动for i := 0; i < int(mh.WorkerPoolSize); i++ {//一个worker被启动//给当前worker对应的任务队列开辟空间mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来go mh.StartOneWorker(i, mh.TaskQueue[i])}
}// 将消息交给TaskQueue,由worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {//根据ConnID来分配当前的连接应该由哪个worker负责处理//轮询的平均分配法则//得到需要处理此条连接的workerIDworkerID := request.GetConnection().GetConnID() % mh.WorkerPoolSizefmt.Println("Add ConnID=", request.GetConnection().GetConnID(), " request msgID=", request.GetMsgID(), "to workerID=", workerID)//将请求消息发送给任务队列mh.TaskQueue[workerID] <- request
}
request
package znetimport ("datarace/zinx/ziface"
)type Request struct {conn ziface.IConnectionmsg ziface.IMessage
}func (r *Request) GetConnection() ziface.IConnection {return r.conn
}// 获取请求消息的数据
func (r *Request) GetData() []byte {return r.msg.GetData()
}// 获取请求的消息的ID
func (r *Request) GetMsgID() uint32 {return r.msg.GetMsgId()
}
router
package znetimport "datarace/zinx/ziface"type BaseRouter struct{}func (br *BaseRouter) PreHandle(request ziface.IRequest) {}
func (br *BaseRouter) Handle(request ziface.IRequest) {}
func (br *BaseRouter) PostHandle(request ziface.IRequest) {}
server
package znetimport ("datarace/zinx/utils""datarace/zinx/ziface""fmt""net""time"
)// iServer 接口实现,定义一个Server服务类
type Server struct {//服务器的名称Name string//tcp4 or otherIPVersion string//服务绑定的IP地址IP string//服务绑定的端口Port int//当前Server的消息管理模块,用来绑定MsgId和对应的处理方法msgHandler ziface.IMsgHandle//当前Server的链接管理器ConnMgr ziface.IConnManager//新增两个hook函数原型//该Server的连接创建时Hook函数OnConnStart func(conn ziface.IConnection)//该Server的连接断开时的Hook函数OnConnStop func(conn ziface.IConnection)
}// 得到链接管理
func (s *Server) GetConnMgr() ziface.IConnManager {return s.ConnMgr
}// ============== 实现 ziface.IServer 里的全部接口方法 ========
// 开启网络服务
func (s *Server) Start() {fmt.Printf("[START] Server listenner at IP: %s, Port %d, is starting\n", s.IP, s.Port)fmt.Printf("[START] Server name: %s,listenner at IP: %s, Port %d is starting\n", s.Name, s.IP, s.Port)fmt.Printf("[Zinx] Version: %s, MaxConn: %d, MaxPacketSize: %d\n",utils.GlobalObject.Version,utils.GlobalObject.MaxConn,utils.GlobalObject.MaxPacketSize)//开启一个go去做服务端Linster业务go func() {//1 获取一个TCP的Addrs.msgHandler.StartWorkerPool()addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Println("resolve tcp addr err: ", err)return}//2 监听服务器地址listenner, err := net.ListenTCP(s.IPVersion, addr)if err != nil {fmt.Println("listen", s.IPVersion, "err", err)return}//已经监听成功fmt.Println("start Zinx server ", s.Name, " succ, now listenning...")var cid uint32cid = 0//3 启动server网络连接业务for {//3.1 阻塞等待客户端建立连接请求conn, err := listenner.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}//=============//3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn {conn.Close()continue}//=============//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的dealConn := NewConntion(s, conn, cid, s.msgHandler)cid++//3.4 启动当前链接的处理业务go dealConn.Start()//go func() {// //不断的循环从客户端获取数据// for {// buf := make([]byte, 512)// cnt, err := conn.Read(buf)// if err != nil {// fmt.Println("recv buf err ", err)// continue// }// //回显// if _, err := conn.Write(buf[:cnt]); err != nil {// fmt.Println("write back buf err ", err)// continue// }// }//}()}}()
}
func (s *Server) Stop() {fmt.Println("[STOP] Zinx server , name ", s.Name)//将其他需要清理的连接信息或者其他信息 也要一并停止或者清理s.ConnMgr.ClearConn()
}
func (s *Server) Serve() {s.Start()//TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加//阻塞,否则主Go退出, listenner的go将会退出for {time.Sleep(10 * time.Second)}
}
func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {s.msgHandler.AddRouter(msgId, router)fmt.Println("Add Router SUCC!! msgID = ", msgId)
}/*
创建一个服务器句柄
*/
func NewServer() *Server {utils.GlobalObject.Reload()s := &Server{Name: utils.GlobalObject.Name,IPVersion: "tcp4",IP: utils.GlobalObject.Host,Port: utils.GlobalObject.TcpPort,msgHandler: NewMsgHandle(), //msgHandler 初始化ConnMgr: NewConnManager(), //创建ConnManage}return s
}// 设置该Server的连接创建时Hook函数
func (s *Server) SetOnConnStart(hookFunc func(ziface.IConnection)) {s.OnConnStart = hookFunc
}// 设置该Server的连接断开时的Hook函数
func (s *Server) SetOnConnStop(hookFunc func(ziface.IConnection)) {s.OnConnStop = hookFunc
}// 调用连接OnConnStart Hook函数
func (s *Server) CallOnConnStart(conn ziface.IConnection) {if s.OnConnStart != nil {fmt.Println("---> CallOnConnStart....")s.OnConnStart(conn)}
}// 调用连接OnConnStop Hook函数
func (s *Server) CallOnConnStop(conn ziface.IConnection) {if s.OnConnStop != nil {fmt.Println("---> CallOnConnStop....")s.OnConnStop(conn)}
}
客户端
package mainimport ("datarace/zinx/znet""fmt""io""net""time"
)/*
模拟客户端
*/
func main() {fmt.Println("Client Test ... start")//3秒之后发起测试请求,给服务端开启服务的机会time.Sleep(3 * time.Second)conn, err := net.Dial("tcp", "127.0.0.1:7777")if err != nil {fmt.Println("client start err, exit!")return}for {//发封包message消息dp := znet.NewDataPack()msg, _ := dp.Pack(znet.NewMsgPackage(0, []byte("Zinx V0.8 Client0 Test Message")))_, err := conn.Write(msg)if err != nil {fmt.Println("write error err ", err)return}//先读出流中的head部分headData := make([]byte, dp.GetHeadLen())_, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止if err != nil {fmt.Println("read head error")break}//将headData字节流 拆包到msg中msgHead, err := dp.Unpack(headData)if err != nil {fmt.Println("server unpack err:", err)return}if msgHead.GetDataLen() > 0 {//msg 是有data数据的,需要再次读取data数据msg := msgHead.(*znet.Message)msg.Data = make([]byte, msg.GetDataLen())//根据dataLen从io中读取字节流_, err := io.ReadFull(conn, msg.Data)if err != nil {fmt.Println("server unpack data err:", err)return}fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))}time.Sleep(1 * time.Second)}
}
服务端
package mainimport ("datarace/zinx/ziface""datarace/zinx/znet""fmt"
)// ping test 自定义路由
type PingRouter struct {znet.BaseRouter
}// Ping Handle
func (this *PingRouter) Handle(request ziface.IRequest) {fmt.Println("Call PingRouter Handle")//先读取客户端的数据,再回写ping...ping...pingfmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))err := request.GetConnection().SendBuffMsg(0, []byte("ping...ping...ping"))if err != nil {fmt.Println(err)}
}type HelloZinxRouter struct {znet.BaseRouter
}// HelloZinxRouter Handle
func (this *HelloZinxRouter) Handle(request ziface.IRequest) {fmt.Println("Call HelloZinxRouter Handle")//先读取客户端的数据,再回写ping...ping...pingfmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))err := request.GetConnection().SendBuffMsg(1, []byte("Hello Zinx Router V0.10"))if err != nil {fmt.Println(err)}
}// 创建连接的时候执行
func DoConnectionBegin(conn ziface.IConnection) {fmt.Println("DoConnecionBegin is Called ... ")//=============设置两个链接属性,在连接创建之后===========fmt.Println("Set conn Name, Home done!")conn.SetProperty("Name", "Aceld")conn.SetProperty("Home", "https://www.jianshu.com/u/35261429b7f1")//===================================================err := conn.SendMsg(2, []byte("DoConnection BEGIN..."))if err != nil {fmt.Println(err)}
}// 连接断开的时候执行
func DoConnectionLost(conn ziface.IConnection) {//============在连接销毁之前,查询conn的Name,Home属性=====if name, err := conn.GetProperty("Name"); err == nil {fmt.Println("Conn Property Name = ", name)}if home, err := conn.GetProperty("Home"); err == nil {fmt.Println("Conn Property Home = ", home)}//===================================================fmt.Println("DoConneciotnLost is Called ... ")
}
func main() {//创建一个server句柄s := znet.NewServer()//注册链接hook回调函数s.SetOnConnStart(DoConnectionBegin)s.SetOnConnStop(DoConnectionLost)//配置路由s.AddRouter(0, &PingRouter{})s.AddRouter(1, &HelloZinxRouter{})//开启服务s.Serve()
}