type Client struct{cc codec.Codec//编码方式opt *Option//发出请求的第一个包,用来协商后续包的格式和编码方式sending sync.Mutex // 当一个请求正在发送时,不可以转头去执行别的请求header codec.Header // 请求头内容mu sync.Mutex // protect followingseq uint64//记录该客户端一次请求连接的序号,pending map[uint64]*Call//通过seq快速找到客户端的某个请求closing bool// user has called Closeshutdown bool// server has told us to stop}
2、 一个客户端,异步发送多个请求,使用call结构体代表客户端的每次请求
type Call struct{Seq uint64//当前请求的序号,唯一标识一个请求ServiceMethod string// format "<service>.<method>" 此次请求的服务和方法Args interface{}// arguments to the function 请求函数的参数Reply interface{}// reply from the function 服务端函数的响应数据Error error// if error occurs, it will be set //发生错误时的信息Done chan*Call // Strobes when call is complete.完成一次请求通过chan来通知}
// Call invokes the named function, waits for it to complete,// and returns its error status.func(client *Client)Call(serviceMethod string, args, reply interface{})error{call :=<-client.Go(serviceMethod, args, reply,make(chan*Call,1)).Done//阻塞等待此次请求的channel,直到服务端处理并响应才返回return call.Error
}
绑定数据到请求中
// Go invokes the function asynchronously.// It returns the Call structure representing the invocation.func(client *Client)Go(serviceMethod string, args, reply interface{}, done chan*Call)*Call {if done ==nil{done =make(chan*Call,10)}elseifcap(done)==0{log.Panic("rpc client: done channel is unbuffered")}call :=&Call{ServiceMethod: serviceMethod,//此次请求的服务和方法Args: args,//此次请求的参数Reply: reply,//此处是引用类型,暂时还没有数据,等服务端响应就有数据了Done: done,//绑定此次请求的响应channel,服务端响应后就往对应的channel发一条数据}client.send(call)return call
}
发送请求到服务端
func(client *Client)send(call *Call){// make sure that the client will send a complete requestclient.sending.Lock()defer client.sending.Unlock()// register this call.seq, err := client.registerCall(call)//注册这次call,把这次的请求ID注册到客户端中。。。if err !=nil{call.Error = errcall.done()return}// prepare request headerclient.header.ServiceMethod = call.ServiceMethodclient.header.Seq = seqclient.header.Error =""// encode and send the requestif err := client.cc.Write(&client.header, call.Args); err !=nil{//发送请求头和请求参数call := client.removeCall(seq)// call may be nil, it usually means that Write partially failed,// client has received the response and handledif call !=nil{call.Error = errcall.done()}}}
4、客户端接收请求
func(client *Client)receive(){var err errorfor err ==nil{var h codec.Headerif err = client.cc.ReadHeader(&h); err !=nil{接收请求是一个个来,当连接关闭时,此处会报错,退出整个客户端break}call := client.removeCall(h.Seq)//通过Seq唯一标识符删除一个请求switch{case call ==nil:// it usually means that Write partially failed// and call was already removed.err = client.cc.ReadBody(nil)case h.Error !="":call.Error = fmt.Errorf(h.Error)err = client.cc.ReadBody(nil)call.done()default:err = client.cc.ReadBody(call.Reply)if err !=nil{call.Error = errors.New("reading body "+ err.Error())}call.done()//向通道发送一条消息,客户端等待的这个call可以推出了}}// error occurs, so terminateCalls pending callsclient.terminateCalls(err)//关闭所有请求}