七天实现一个go rpc框架
目录
- rpc协议
- 目的
- 关于RPC和框架
- 服务端与消息编码
- 确保接口的实现
- 消息的序列化与反序列化
- 通信过程
- 服务端的实现
- main 函数
- 支持并发与异步的客户端
- Call 的设计
- 实现客户端
- 服务注册(service register)
- 通过反射实现 service
- 集成到服务端
- 超时处理
- 创建连接超时
- Client.Call 超时
- 服务端处理超时
- 支持HTTP协议
- 服务端支持 HTTP 协议
- hijack(劫持)
- 使用场景
- 客户端支持 HTTP 协议
- 实现简单的 DEBUG 页面
- Demo
- 负载均衡(load balance)
- 负载均衡策略
- 服务发现
- 支持负载均衡的客户端
- 服务发现与注册中心(registry)
- Registry
- 个人总结
rpc协议
-
RPC(Remote Procedure Call,远程过程调用)是一种计算机通信协议,允许调用不同进程空间的程序。RPC 的客户端和服务器可以在一台机器上,也可以在不同的机器上。程序员使用时,就像调用本地程序一样,无需关注内部的实现细节。
-
不同的应用程序之间的通信方式有很多,比如浏览器和服务器之间广泛使用的基于 HTTP 协议的 Restful API。与 RPC 相比,Restful API 有相对统一的标准,因而更通用,兼容性更好,支持不同的语言。HTTP 协议是基于文本的,一般具备更好的可读性。但是缺点也很明显:
- Restful 接口需要额外的定义,无论是客户端还是服务端,都需要额外的代码来处理,而 RPC 调用则更接近于直接调用。
- 基于 HTTP 协议的 Restful 报文冗余,承载了过多的无效信息,而 RPC 通常使用自定义的协议格式,减少冗余报文。
- RPC 可以采用更高效的序列化协议,将文本转为二进制传输,获得更高的性能。
因为 RPC 的灵活性,所以更容易扩展和集成诸如注册中心、负载均衡等功能。
目的
-
需要确定采用的传输协议是什么?如果这个两个应用程序位于不同的机器,那么一般会选择 TCP 协议或者 HTTP 协议;那如果两个应用程序位于相同的机器,也可以选择 Unix Socket 协议。
-
还需要确定报文的编码格式,比如采用最常用的 JSON 或者 XML,那如果报文比较大,还可能会选择 protobuf 等其他的编码方式,甚至编码之后,再进行压缩。接收端获取报文则需要相反的过程,先解压再解码。
-
如果服务端的实例很多,客户端并不关心这些实例的地址和部署位置,只关心自己能否获取到期待的结果,那就引出了注册中心(registry)和负载均衡(load balance)的问题。(即客户端和服务端互相不感知对方的存在,服务端启动时将自己注册到注册中心,客户端调用时,从注册中心获取到所有可用的实例,选择一个来调用。)
关于RPC和框架
-
Go 语言广泛地应用于云计算和微服务,成熟的 RPC 框架和微服务框架汗牛充栋。grpc、rpcx、go-micro 等都是非常成熟的框架。一般而言,RPC 是微服务框架的一个子集,微服务框架可以自己实现 RPC 部分,当然,也可以选择不同的 RPC 框架作为通信基座。
-
上述成熟的框架代码量都比较庞大,而且通常和第三方库,例如 protobuf、etcd、zookeeper 等有比较深的耦合,难以直观地窥视框架的本质。
-
因此,从零实现 Go 语言官方的标准库 net/rpc,并在此基础上,新增协议交换(protocol exchange)、注册中心(registry)、服务发现(service discovery)、负载均衡(load balance)、超时处理(timeout processing)等特性。有助于理解 RPC 框架在设计时需要考虑什么。
服务端与消息编码
-
使用 encoding/gob 实现消息的编解码(序列化与反序列化)。
-
gob(Go binary)是Goland包自带的一个数据结构序列化的编码/解码工具。编码使用Encoder,解码使用Decoder。一种典型的应用场景就是RPC(remote procedure calls)。
确保接口的实现
代码中有这样的字眼:var _ Codec = (*GobCodec)(nil)
,作者给出的解释如下:
类似的方法还有:
// 验证httpGetter是否实现了PeerGetter接口
var _ PeerGetter = &httpGetter{}
包括gin
框架的源码:
type IRouter interface{ ... }
...
...
type RouterGroup struct { ... }
...
var _ IRouter = &RouterGroup{}
消息的序列化与反序列化
一个典型的 RPC 调用如下(参考go语言rpc/grpc介绍):
err = client.Call("Arith.Multiply", args, &reply)
客户端发送的请求包括服务名 Arith
,方法名 Multiply
,参数 args
三个,服务端的响应包括错误 error
,返回值 reply
2 个。
抽象出数据结构 Header:
package codecimport "io"type Header struct {// ServiceMethod 是服务名和方法名ServiceMethod string// Seq 是请求的序号,也可以认为是某个请求的 ID,用来区分不同的请求。Seq uint64Error string
}// Codec 抽象出对消息体进行编解码的接口
type Codec interface {io.CloserReadHeader(*Header) errorReadBody(any) errorWrite(*Header, any) error
}type NewCodecFunc func(closer io.ReadWriteCloser) Codecconst (GobType = "application/gob"JsonType = "application/json"
)var NewCodecFuncMap map[string]NewCodecFuncfunc init() {NewCodecFuncMap = make(map[string]NewCodecFunc)NewCodecFuncMap[GobType] = NewGobCodec
}
再实现编解码接口:
package codecimport ("bufio""encoding/gob""io""log"
)type GobCodec struct {// conn 是由构建函数传入,通常是通过 TCP 或者 Unix 建立 socket 时得到的链接实例conn io.ReadWriteCloser// buf 是为了防止阻塞而创建的带缓冲的 Writer,一般这么做能提升性能。buf *bufio.Writerdec *gob.Decoderenc *gob.Encoder
}// 确保GobCodec实现了Codec
var _ Codec = (*GobCodec)(nil)// NewGobCodec 是GobCodec的构造函数
func NewGobCodec(conn io.ReadWriteCloser) Codec {buf := bufio.NewWriter(conn)return &GobCodec{conn: conn,buf: buf,// dec 从conn解码dec: gob.NewDecoder(conn),// enc 编码到bufenc: gob.NewEncoder(buf),}
}func (c *GobCodec) ReadHeader(h *Header) error {return c.dec.Decode(h)
}func (c *GobCodec) ReadBody(body interface{}) error {return c.dec.Decode(body)
}func (c *GobCodec) Write(h *Header, body interface{}) (err error) {defer func() {_ = c.buf.Flush()if err != nil {_ = c.Close()}}()if err := c.enc.Encode(h); err != nil {log.Println("rpc codec: gob error encoding header:", err)return err}if err := c.enc.Encode(body); err != nil {log.Println("rpc codec: gob error encoding body:", err)return err}return nil
}func (c *GobCodec) Close() error {return c.conn.Close()
}
通信过程
客户端与服务端的通信需要协商一些内容,为了提升性能,一般在报文的最开始会规划固定的字节,来协商相关的信息。比如第1个字节用来表示序列化方式,第2个字节表示压缩方式,第3-6字节表示 header 的长度,7-10 字节表示 body 的长度。
服务端首先使用 JSON 解码 Option,然后通过 Option 的 CodeType 解码剩余的内容。即报文将以这样的形式发送:
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------ 固定 JSON 编码 ------> | <------- 编码方式由 CodeType 决定 ------->|
在一次连接中,Option 固定在报文的最开始,Header 和 Body 可以有多个,即报文可能是这样的。
| Option | Header1 | Body1 | Header2 | Body2 | ...
服务端的实现
package GenRpcimport ("github.com/Generlazy/GenGrpc/GenRpc/codec""log""net"
)const MagicNumber = 0x3bef5ctype Option struct {// MagicNumber标记这是一个GenRpc请求MagicNumber int// CodecType body编码方式CodecType string
}var DefaultOption = &Option{MagicNumber: MagicNumber,CodecType: codec.GobType,
}type Server struct{}// NewServer returns a new Server.
func NewServer() *Server {return &Server{}
}// DefaultServer 是一个默认的 Server 实例,主要为了用户使用方便。
var DefaultServer = NewServer()// Accept 接受请求
func (server *Server) Accept(lis net.Listener) {for {conn, err := lis.Accept()if err != nil {log.Println("rpc server: accept error:", err)return}// 异步服务requestgo server.ServeConn(conn)}
}func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
启动服务:
lis, _ := net.Listen("tcp", ":9999")
geerpc.Accept(lis)
实现ServerConn(conn):
func (server *Server) ServeConn(conn io.ReadWriteCloser) {defer func() { _ = conn.Close() }()var opt Option// 将 magicNumber 和 Content-type 解码到opt中if err := json.NewDecoder(conn).Decode(&opt); err != nil {log.Println("rpc server: options error: ", err)return}// 判断 magicNumber是否正确if opt.MagicNumber != MagicNumber {log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)return}// 获取解码器gob/json的构造函数f := codec.NewCodecFuncMap[opt.CodecType]if f == nil {log.Printf("rpc server: invalid codec type %s", opt.CodecType)return}// 获取具体构造器gob/jsoncodecObj := f(conn)// 开启服务server.serveCodec(codecObj)
}func (server *Server) serveCodec(cc codec.Codec) {// 确保发送完整的响应sending := new(sync.Mutex)// wait until all request are handledwg := new(sync.WaitGroup)for {// 一直读取请求(上文将连接对象传入到了gob中)req, err := server.readRequest(cc)if err != nil {if req == nil {break // it's not possible to recover, so close the connection}req.h.Error = err.Error()// 返回错误响应server.sendResponse(cc, req.h, invalidRequest, sending)continue}wg.Add(1)// 异步处理正确响应go server.handleRequest(cc, req, sending, wg)}wg.Wait()_ = cc.Close()
}
serveCodec 的过程非常简单。主要包含三个阶段:
- 读取请求 readRequest
- 处理请求 handleRequest
- 回复请求 sendResponse
在一次连接中,允许接收多个请求,即多个 request header 和 request body,因此这里使用了 for 无限制地等待请求的到来,直到发生错误(例如连接被关闭,接收到的报文有问题等),这里需要注意的点有三个:
- handleRequest 使用了协程并发执行请求。
- 处理请求是并发的,但是回复请求的报文必须是逐个发送的,并发容易导致多个回复报文交织在一起,客户端无法解析。在这里使用锁(sending)保证。
- 尽力而为,只有在 header 解析失败时,才终止循环。
// request 请求上下文
type request struct {// h 请求头h *codec.Header// argv 请求参数argv reflect.Value// respv 响应参数respv reflect.Value
}func (server *Server) readRequest(cc codec.Codec) (*request, error) {// 读取请求头h, err := server.readRequestHeader(cc)if err != nil {return nil, err}req := &request{h: h}req.argv = reflect.New(reflect.TypeOf(""))// 读取请求体if err = cc.ReadBody(req.argv.Interface()); err != nil {log.Println("rpc server: read argv err:", err)}return req, nil
}func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {var h codec.Header// 将头信息解码到h返回if err := cc.ReadHeader(&h); err != nil {if err != io.EOF && err != io.ErrUnexpectedEOF {log.Println("rpc server: read header error:", err)}return nil, err}return &h, nil
}func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) {sending.Lock()defer sending.Unlock()// 将h和body写入到conn中if err := cc.Write(h, body); err != nil {log.Println("rpc server: write response error:", err)}
}func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {defer wg.Done()// 输出请求参数log.Println(req.h, req.argv.Elem())req.respv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))server.sendResponse(cc, req.h, req.respv.Interface(), sending)
}
main 函数
package mainimport ("encoding/json""fmt""github.com/Generlazy/GenGrpc/GenRpc""github.com/Generlazy/GenGrpc/GenRpc/codec""log""net""time"
)func startServer(addr chan string) {// 监听tcp:8080l, err := net.Listen("tcp", ":8080")if err != nil {log.Fatal("network error:", err)}log.Println("start rpc server on", l.Addr())addr <- l.Addr().String()GenRpc.Accept(l)
}func main() {addr := make(chan string)// 开启服务go startServer(addr)// 客户端conn, _ := net.Dial("tcp", <-addr)defer func() { _ = conn.Close() }()time.Sleep(time.Second)// 发送Option 协商好的格式_ = json.NewEncoder(conn).Encode(GenRpc.DefaultOption)// 获取gob编码器cc := codec.NewGobCodec(conn)// send request & receive response// 一个conn连接,请求响应了10次for i := 0; i < 5; i++ {h := &codec.Header{// 调用Foo.SumServiceMethod: "Foo.Sum",// 序列号为 indexSeq: uint64(i),}_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))_ = cc.ReadHeader(h)var reply string_ = cc.ReadBody(&reply)log.Println("reply:", reply)}
}
支持并发与异步的客户端
Call 的设计
对 net/rpc 而言,一个函数需要能够被远程调用,需要满足如下五个条件:
- the method’s type is exported.
- the method is exported.
- the method has two arguments, both exported (or builtin) types.
- the method’s second argument is a pointer.
- the method has return type error.
func (t *T) MethodName(argType T1, replyType *T2) error
// Call represents an active RPC.
type Call struct {Seq uint64ServiceMethod string // format "<service>.<method>"Args interface{} // arguments to the functionReply interface{} // reply from the functionError error // if error occurs, it will be setDone chan *Call // Strobes when call is complete.
}func (call *Call) done() {call.Done <- call
}
为了支持异步调用,Call 结构体中添加了一个字段 Done,Done 的类型是 chan *Call,当调用结束时,会调用 call.done() 通知调用方。
实现客户端
// Client Rpc客户端
type Client struct {// cc 编解码器cc codec.Codec// opt 自定义协议选项opt *Option// sending 是一个互斥锁,和服务端类似,为了保证请求的有序发送,即防止出现多个请求报文混淆。sending sync.Mutex// header 是每个请求的消息头,header 只有在请求发送时才需要,而请求发送是互斥的,因此每个客户端只需要一个,声明在 Client 结构体中可以复用。header codec.Headermu sync.Mutex// seq 用于给发送的请求编号,每个请求拥有唯一编号。seq uint64// pending 存储未处理完的请求,键是编号,值是 Call 实例。pending map[uint64]*Call// closing 标记客户端是否关闭,通过调用 Close 设置closing bool// shutdown 置为 true 一般是有错误发生。shutdown bool
}// 验证Client是否实现了io.Closer 便于在编译阶段就报错
var _ io.Closer = (*Client)(nil)var ErrShutdown = errors.New("connection is shut down")// Close the connection
func (client *Client) Close() error {client.mu.Lock()defer client.mu.Unlock()if client.closing {return ErrShutdown}client.closing = truereturn client.cc.Close()
}// IsAvailable return true if the client does work
func (client *Client) IsAvailable() bool {client.mu.Lock()defer client.mu.Unlock()return !client.shutdown && !client.closing
}// registerCall:将参数 call 添加到 client.pending 中,并更新 client.seq。
func (client *Client) registerCall(call *Call) (uint64, error) {client.mu.Lock()defer client.mu.Unlock()if client.closing || client.shutdown {return 0, ErrShutdown}call.Seq = client.seqclient.pending[call.Seq] = callclient.seq++return call.Seq, nil
}// removeCall:根据 seq,从 client.pending 中移除对应的 call,并返回。
func (client *Client) removeCall(seq uint64) *Call {client.mu.Lock()defer client.mu.Unlock()call := client.pending[seq]delete(client.pending, seq)return call
}// terminateCalls:服务端或客户端发生错误时调用,将 shutdown 设置为 true,且将错误信息通知所有 pending 状态的 call。// terminateCalls:服务端或客户端发生错误时调用,将 shutdown 设置为 true,且将错误信息通知所有 pending 状态的 call。
func (client *Client) terminateCalls(err error) {// 先锁发送client.sending.Lock()defer client.sending.Unlock()// 再锁clientclient.mu.Lock()defer client.mu.Unlock()client.shutdown = truefor _, call := range client.pending {call.Error = errcall.done()}
}
对一个客户端端来说,接收响应、发送请求是最重要的 2 个功能。
接收到的响应有三种情况:
- call 不存在,可能是请求没有发送完整,或者因为其他原因被取消,但是服务端仍旧处理了。
- call 存在,但服务端处理出错,即 h.Error 不为空。
- call 存在,服务端处理正常,那么需要从 body 中读取 Reply 的值。
// receive 接收功能
func (client *Client) receive() {var err errorfor err == nil {var h codec.Header// 从conn解码header到hif err = client.cc.ReadHeader(&h); err != nil {break}// 根据header取出一个调用call := client.removeCall(h.Seq)// call的情况switch {case call == nil:// it usually means that Write partially failed// and call was already removed.err = client.cc.ReadBody(nil)case h.Error != "":call.Error = fmt.Errorf(h.Error)err = client.cc.ReadBody(nil)call.done()default:err = client.cc.ReadBody(call.Reply)if err != nil {call.Error = errors.New("reading body " + err.Error())}call.done()}}// error occurs, so terminateCalls pending callsclient.terminateCalls(err)
}
创建 Client 实例时,首先需要完成一开始的协议交换,即发送 Option 信息给服务端。协商好消息的编解码方式之后,再创建一个子协程调用 receive() 接收响应。
func NewClient(conn net.Conn, opt *Option) (*Client, error) {// 获取编解码器的初始化函数f := codec.NewCodecFuncMap[opt.CodecType]if f == nil {err := fmt.Errorf("invalid codec type %s", opt.CodecType)log.Println("rpc client: codec error:", err)return nil, err}// send options with server// 将option 按照规定json序列化 并传输给connif err := json.NewEncoder(conn).Encode(opt); err != nil {log.Println("rpc client: options error: ", err)_ = conn.Close()return nil, err}return newClientCodec(f(conn), opt), nil
}func newClientCodec(cc codec.Codec, opt *Option) *Client {client := &Client{seq: 1, // seq starts with 1, 0 means invalid callcc: cc,opt: opt,pending: make(map[uint64]*Call),}go client.receive()return client
}
实现 Dial 函数,便于用户传入服务端地址,创建 Client 实例。为了简化用户调用,通过 …*Option 将 Option 实现为可选参数(选项模式)。
func parseOptions(opts ...*Option) (*Option, error) {// 没有传入选项返回默认值if len(opts) == 0 || opts[0] == nil {return DefaultOption, nil}if len(opts) != 1 {return nil, errors.New("number of options is more than 1")}opt := opts[0]opt.MagicNumber = DefaultOption.MagicNumberif opt.CodecType == "" {opt.CodecType = DefaultOption.CodecType}return opt, nil
}// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*Option) (client *Client, err error) {opt, err := parseOptions(opts...)if err != nil {return nil, err}conn, err := net.Dial(network, address)if err != nil {return nil, err}// close the connection if client is nildefer func() {if client == nil {_ = conn.Close()}}()return NewClient(conn, opt)
}
实现发送请求的能力:
func (client *Client) send(call *Call) {// make sure that the client will send a complete requestclient.sending.Lock()defer client.sending.Unlock()// register this call.seq, err := client.registerCall(call)if err != nil {// 用于 receive 判断call的情况call.Error = errcall.done()return}// prepare request headerclient.header.ServiceMethod = call.ServiceMethodclient.header.Seq = seqclient.header.Error = ""// encode and send the requestif err := client.cc.Write(&client.header, call.Args); err != nil {call := client.removeCall(seq)// call may be nil, it usually means that Write partially failed,// client has received the response and handledif call != nil {call.Error = errcall.done()}}
}
Go 和 Call 是客户端暴露给用户的两个 RPC 服务调用接口,Go 是一个异步接口,返回 call 实例。
Call 是对 Go 的封装,阻塞 call.Done,等待响应返回,是一个同步接口。
// Go invokes the function asynchronously.
// It returns the Call structure representing the invocation.
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {if done == nil {done = make(chan *Call, 10)} else if cap(done) == 0 {log.Panic("rpc client: done channel is unbuffered")}call := &Call{ServiceMethod: serviceMethod,Args: args,Reply: reply,Done: done,}client.send(call)return call
}// Call invokes the named function, waits for it to complete,
// and returns its error status.
func (client *Client) Call(serviceMethod string, args, reply interface{}) error {call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Donereturn call.Error
}
服务注册(service register)
- 目前服务端接收到请求后
handleRequest
方法只是打印序列号:fmt.Sprintf("geerpc resp %d", req.h.Seq)
,并没有实现根据ServiceMethod去寻找对应服务的功能。 - RPC 框架的一个基础能力是:像调用本地程序一样调用远程服务。那如何将程序映射为服务呢?那么对 Go 来说,这个问题就变成了如何将结构体的方法映射为服务。
假设客户端发过来一个请求,包含 ServiceMethod 和 Argv:
{"ServiceMethod": "T.MethodName""Argv":"0101110101..." // 序列化之后的字节流
}
通过 “T.MethodName” 可以确定调用的是类型 T 的 MethodName,通过反射能够非常容易地获取某个结构体的所有方法,并且能够通过方法,获取到该方法所有的参数类型与返回值。
func main() {var wg sync.WaitGrouptyp := reflect.TypeOf(&wg)for i := 0; i < typ.NumMethod(); i++ {method := typ.Method(i)argv := make([]string, 0, method.Type.NumIn())returns := make([]string, 0, method.Type.NumOut())// j 从 1 开始,第 0 个入参是 wg 自己。for j := 1; j < method.Type.NumIn(); j++ {argv = append(argv, method.Type.In(j).Name())}for j := 0; j < method.Type.NumOut(); j++ {returns = append(returns, method.Type.Out(j).Name())}log.Printf("func (w *%s) %s(%s) %s",typ.Elem().Name(),method.Name,strings.Join(argv, ","),strings.Join(returns, ","))}
}func (w *WaitGroup) Add(int)
func (w *WaitGroup) Done()
func (w *WaitGroup) Wait()
通过反射实现 service
定义结构体 methodType:,实现了 2 个方法 newArgv 和 newReplyv,用于创建对应类型的实例。
type methodType struct {// method:方法本身method reflect.Method// ArgType:第一个参数的类型(请求参数)ArgType reflect.Type// ReplyType:第二个参数的类型(响应参数)ReplyType reflect.Type// numCalls:后续统计方法调用次数时会用到numCalls uint64
}// NumCalls 返回调用Method的次数
func (m *methodType) NumCalls() uint64 {return atomic.LoadUint64(&m.numCalls)
}func (m *methodType) newArgv() reflect.Value {var argv reflect.Value// arg may be a pointer type, or a value typeif m.ArgType.Kind() == reflect.Ptr {// 如果是指针,需要调用Elem()方法,相等于*ptr获取值argv = reflect.New(m.ArgType.Elem())} else {argv = reflect.New(m.ArgType).Elem()}return argv
}func (m *methodType) newReplyv() reflect.Value {// reply must be a pointer typereplyv := reflect.New(m.ReplyType.Elem())switch m.ReplyType.Elem().Kind() {case reflect.Map:replyv.Elem().Set(reflect.MakeMap(m.ReplyType.Elem()))case reflect.Slice:replyv.Elem().Set(reflect.MakeSlice(m.ReplyType.Elem(), 0, 0))}return replyv
}
定义结构体 service:
type service struct {// name 即映射的结构体的名称 比如 T,比如 WaitGroupname string// typ 是结构体的类型typ reflect.Type// rcvr 即结构体的实例本身,保留 rcvr 是因为在调用时需要 rcvr 作为第 0 个参数rcvr reflect.Value// method 是 map 类型,存储映射的结构体的所有符合条件的方法。method map[string]*methodType
}func newService(rcvr interface{}) *service {s := new(service)s.rcvr = reflect.ValueOf(rcvr)s.name = reflect.Indirect(s.rcvr).Type().Name()s.typ = reflect.TypeOf(rcvr)if !ast.IsExported(s.name) {log.Fatalf("rpc server: %s is not a valid service name", s.name)}s.registerMethods()return s
}func (s *service) registerMethods() {s.method = make(map[string]*methodType)for i := 0; i < s.typ.NumMethod(); i++ {method := s.typ.Method(i)mType := method.Typeif mType.NumIn() != 3 || mType.NumOut() != 1 {continue}if mType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {continue}argType, replyType := mType.In(1), mType.In(2)if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) {continue}s.method[method.Name] = &methodType{method: method,ArgType: argType,ReplyType: replyType,}log.Printf("rpc server: register %s.%s\n", s.name, method.Name)}
}func isExportedOrBuiltinType(t reflect.Type) bool {return ast.IsExported(t.Name()) || t.PkgPath() == ""
}
registerMethods 过滤出了符合条件的方法:
两个导出或内置类型的入参(反射时为 3 个,第 0 个是自身,类似于 python 的 self,java 中的 this)
返回值有且只有 1 个,类型为 error。
还需要实现 call 方法,即能够通过反射值调用方法。
func (s *service) call(m *methodType, argv, replyv reflect.Value) error {atomic.AddUint64(&m.numCalls, 1)f := m.method.FuncreturnValues := f.Call([]reflect.Value{s.rcvr, argv, replyv})if errInter := returnValues[0].Interface(); errInter != nil {return errInter.(error)}return nil
}
集成到服务端
通过反射结构体已经映射为服务,但请求的处理过程还没有完成。从接收到请求到回复还差以下几个步骤:第一步,根据入参类型,将请求的 body 反序列化;第二步,调用 service.call,完成方法调用;第三步,将 reply 序列化为字节流,构造响应报文,返回。
需要为 Server 实现一个方法 Register:
// Server represents an RPC Server.
type Server struct {serviceMap sync.Map
}// Register publishes in the server the set of methods of the
func (server *Server) Register(rcvr interface{}) error {s := newService(rcvr)if _, dup := server.serviceMap.LoadOrStore(s.name, s); dup {return errors.New("rpc: service already defined: " + s.name)}return nil
}// Register publishes the receiver's methods in the DefaultServer.
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
配套实现 findService 方法,即通过 ServiceMethod 从 serviceMap 中找到对应的 service:
func (server *Server) findService(serviceMethod string) (svc *service, mtype *methodType, err error) {dot := strings.LastIndex(serviceMethod, ".")if dot < 0 {err = errors.New("rpc server: service/method request ill-formed: " + serviceMethod)return}serviceName, methodName := serviceMethod[:dot], serviceMethod[dot+1:]svci, ok := server.serviceMap.Load(serviceName)if !ok {err = errors.New("rpc server: can't find service " + serviceName)return}svc = svci.(*service)mtype = svc.method[methodName]if mtype == nil {err = errors.New("rpc server: can't find method " + methodName)}return
}
findService 的实现看似比较繁琐,但是逻辑还是非常清晰的。因为 ServiceMethod 的构成是 “Service.Method”,因此先将其分割成 2 部分,第一部分是 Service 的名称,第二部分即方法名。现在 serviceMap 中找到对应的 service 实例,再从 service 实例的 method 中,找到对应的 methodType。
补全 readRequest 方法:
// request 请求上下文
type request struct {// h 请求头h *codec.Header// argv 请求参数argv reflect.Value// respv 响应参数respv reflect.Valuemtype *methodTypesvc *service
}func (server *Server) readRequest(cc codec.Codec) (*request, error) {h, err := server.readRequestHeader(cc)if err != nil {return nil, err}req := &request{h: h}req.svc, req.mtype, err = server.findService(h.ServiceMethod)if err != nil {return req, err}req.argv = req.mtype.newArgv()req.respv = req.mtype.newReplyv()// make sure that argvi is a pointer, ReadBody need a pointer as parameterargvi := req.argv.Interface()if req.argv.Type().Kind() != reflect.Ptr {argvi = req.argv.Addr().Interface()}if err = cc.ReadBody(argvi); err != nil {log.Println("rpc server: read body err:", err)return req, err}return req, nil
}
readRequest 方法中最重要的部分,即通过 newArgv() 和 newReplyv() 两个方法创建出两个入参实例,然后通过 cc.ReadBody() 将请求报文反序列化为第一个入参 argv,在这里同样需要注意 argv 可能是值类型,也可能是指针类型,所以处理方式有点差异。
补全 handleRequest 方法:
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {defer wg.Done()err := req.svc.call(req.mtype, req.argv, req.respv)if err != nil {req.h.Error = err.Error()server.sendResponse(cc, req.h, struct {}{}, sending)return}server.sendResponse(cc, req.h, req.respv.Interface(), sending)
}
相对于 readRequest,handleRequest 的实现非常简单,通过 req.svc.call 完成方法调用,将 replyv 传递给 sendResponse 完成序列化即可。
超时处理
超时处理是 RPC 框架一个比较基本的能力,如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现大大地降低了服务的可用性。因此,我们需要在 RPC 框架中加入超时处理的能力。
纵观整个远程调用的过程,需要客户端处理超时的地方有:
- 与服务端建立连接,导致的超时
- 发送请求到服务端,写报文导致的超时
- 等待服务端处理时,等待处理导致的超时(比如服务端已挂死,迟迟不响应)
- 从服务端接收响应时,读报文导致的超时
需要服务端处理超时的地方有:
- 读取客户端请求报文时,读报文导致的超时
- 发送响应报文时,写报文导致的超时
- 调用映射服务的方法时,处理报文导致的超时
创建连接超时
ConnectTimeout 默认值为 10s,HandleTimeout 默认值为 0,即不设限。
type Option struct {// MagicNumber标记这是一个GenRpc请求MagicNumber int// CodecType body编码方式CodecType stringConnectTimeout time.Duration // 0 means no limitHandleTimeout time.Duration
}var DefaultOption = &Option{MagicNumber: MagicNumber,CodecType: codec.GobType,ConnectTimeout: time.Second * 10,
}
客户端连接超时,只需要为 Dial 添加一层超时处理的外壳即可:
- 将 net.Dial 替换为 net.DialTimeout,如果连接创建超时,将返回错误。
- 使用子协程执行 NewClient,执行完成后则通过信道 ch 发送结果,如果 time.After() 信道先接收到消息,则说明 NewClient 执行超时,返回错误。
type clientResult struct {client *Clienterr error
}type newClientFunc func(conn net.Conn, opt *Option) (client *Client, err error)func dialTimeout(f newClientFunc, network, address string, opts ...*Option) (client *Client, err error) {opt, err := parseOptions(opts...)if err != nil {return nil, err}conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)if err != nil {return nil, err}// close the connection if client is nildefer func() {if err != nil {_ = conn.Close()}}()ch := make(chan clientResult)go func() {client, err := f(conn, opt)ch <- clientResult{client: client, err: err}}()if opt.ConnectTimeout == 0 {result := <-chreturn result.client, result.err}// 阻塞在这,直到某一个case有响应select {case <-time.After(opt.ConnectTimeout):return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)case result := <-ch:return result.client, result.err}
}// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*Option) (*Client, error) {return dialTimeout(NewClient, network, address, opts...)
}
Client.Call 超时
Client.Call 的超时处理机制,使用 context 包实现,控制权交给用户,控制更为灵活。
func (client *Client) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))select {case <-ctx.Done():client.removeCall(call.Seq)return errors.New("rpc client: call failed: " + ctx.Err().Error())case call := <-call.Done:return call.Error}
}
可以这样使用:
ctx, _ := context.WithTimeout(context.Background(), time.Second)
var reply int
err := client.Call(ctx, "Foo.Sum", &Args{1, 2}, &reply)
...
服务端处理超时
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {defer wg.Done()called := make(chan struct{})sent := make(chan struct{})go func() {err := req.svc.call(req.mtype, req.argv, req.respv)called <- struct{}{}if err != nil {req.h.Error = err.Error()server.sendResponse(cc, req.h, struct {}{}, sending)sent <- struct{}{}return}server.sendResponse(cc, req.h, req.respv.Interface(), sending)sent <- struct{}{}}()if timeout == 0 {<-called<-sentreturn}select {case <-time.After(timeout):req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)server.sendResponse(cc, req.h, struct {}{}, sending)case <-called:<-sent}
}
这里需要确保 sendResponse 仅调用一次,因此将整个过程拆分为 called 和 sent 两个阶段,在这段代码中只会发生如下两种情况:
- called 信道接收到消息,代表处理没有超时,继续执行 sendResponse。
- time.After() 先于 called 接收到消息,说明处理已经超时,called 和 sent 都将被阻塞。在 case <-time.After(timeout) 处调用 sendResponse。
支持HTTP协议
RPC 的消息格式与标准的 HTTP 协议并不兼容,在这种情况下,就需要一个协议的转换过程。HTTP 协议的 CONNECT 方法恰好提供了这个能力,CONNECT 一般用于代理服务。
假设浏览器与服务器之间的 HTTPS 通信都是加密的,浏览器通过代理服务器发起 HTTPS 请求时,由于请求的站点地址和端口号都是加密保存在 HTTPS 请求报文头中的,代理服务器如何知道往哪里发送请求呢?为了解决这个问题,浏览器通过 HTTP 明文形式向代理服务器发送一个 CONNECT 请求告诉代理服务器目标地址和端口,代理服务器接收到这个请求后,会在对应端口与目标站点建立一个 TCP 连接,连接建立成功后返回 HTTP 200 状态码告诉浏览器与该站点的加密通道已经完成。接下来代理服务器仅需透传浏览器和服务器之间的加密数据包即可,代理服务器无需解析 HTTPS 报文。
举一个简单例子:
- 浏览器向代理服务器发送 CONNECT 请求。
CONNECT geektutu.com:443 HTTP/1.0
- 代理服务器返回 HTTP 200 状态码表示连接已经建立。
HTTP/1.0 200 Connection Established
- 之后浏览器和服务器开始 HTTPS 握手并交换加密数据,代理服务器只负责传输彼此的数据包,并不能读取具体数据内容(代理服务器也可以选择安装可信根证书解密 HTTPS 报文)。
事实上,这个过程其实是通过代理服务器将 HTTP 协议转换为 HTTPS 协议的过程。对 RPC 服务端来,需要做的是将 HTTP 协议转换为 RPC 协议,对客户端来说,需要新增通过 HTTP CONNECT 请求创建连接的逻辑。
服务端支持 HTTP 协议
那通信过程应该是这样的:
- 客户端向 RPC 服务器发送 CONNECT 请求
CONNECT 10.0.0.1:9999/_genrpc_ HTTP/1.0
- RPC 服务器返回 HTTP 200 状态码表示连接建立。
HTTP/1.0 200 Connected to Gen RPC
- 客户端使用创建好的连接发送 RPC 报文,先发送 Option,再发送 N 个请求报文,服务端处理 RPC 请求并响应。
const (connected = "200 Connected to Gen RPC"defaultRPCPath = "/_genprc_"defaultDebugPath = "/debug/genrpc"
)// ServeHTTP implements a http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {if req.Method != "CONNECT" {w.Header().Set("Content-Type", "text/plain; charset=utf-8")w.WriteHeader(http.StatusMethodNotAllowed)_, _ = io.WriteString(w, "405 must CONNECT\n")return}// 将http请求劫持 获取连接conn, _, err := w.(http.Hijacker).Hijack()if err != nil {log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())return}_, _ = io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")server.ServeConn(conn)
}// HandleHTTP registers an HTTP handler for RPC messages on rpcPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
func (server *Server) HandleHTTP() {http.Handle(defaultRPCPath, server)
}// HandleHTTP is a convenient approach for default server to register HTTP handlers
func HandleHTTP() {DefaultServer.HandleHTTP()
}
hijack(劫持)
Hijack()可以将HTTP对应的TCP连接取出,连接在Hijack()之后,HTTP的相关操作就会受到影响,调用方需要负责去关闭连接:
type Hijacker interface {Hijack() (net.Conn, *bufio.ReadWriter, error)
}
func handle1(w http.ResponseWriter, r *http.Request) {hj, _ := w.(http.Hijacker)conn, buf, _ := hj.Hijack()defer conn.Close()buf.WriteString("hello world")buf.Flush()
}func handle2(w http.ResponseWriter, r *http.Request) {fmt.Fprintf(w, "hello world")
}
上面两个handle方法有什么区别呢?很简单,同样是http请求,返回的结果一个遵循http协议,一个不遵循。
➜ ~ curl -i http://localhost:9090/handle1
hello world% ➜ ~ curl -i http://localhost:9090/handle2
HTTP/1.1 200 OK
Date: Thu, 14 Jun 2018 07:51:31 GMT
Content-Length: 11
Content-Type: text/plain; charset=utf-8hello world%
分别是以上两者的返回,可以看到,hijack之后的返回,虽然body是相同的,但是完全没有遵循http协议。
http包的源码:
func (c *conn) serve(ctx context.Context) {...serverHandler{c.server}.ServeHTTP(w, w.req)w.cancelCtx()if c.hijacked() {return}w.finishRequest()...
}
这是net/http包中的方法,也是http路由的核心方法。调用ServeHTTP方法,如果被hijack(劫持)了就直接return了,而一般的http请求会经过后边的finishRequest方法,加入headers等并关闭连接。
使用场景
Hijack方法,一般在在创建连接阶段使用HTTP连接,后续自己完全处理connection。符合这样的使用场景的并不多,基于HTTP协议的rpc算一个,从HTTP升级到WebSocket也算一个。
-
go中自带的rpc可以直接复用http server处理请求的那一套流程去创建连接,连接创建完毕后再使用Hijack方法拿到连接。客户端通过向服务端发送method为connect的请求创建连接,创建成功后即可开始rpc调用。
-
websocket中的应用:websocket在创建连接的阶段与http使用相同的协议,而在后边的数据传输的过程中使用了他自己的协议,符合了Hijack的用途。通过serveWebSocket方法将HTTP协议升级到Websocket协议。
// ServeHTTP implements the http.Handler interface for a WebSocket func (s Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {s.serveWebSocket(w, req) }func (s Server) serveWebSocket(w http.ResponseWriter, req *http.Request) {rwc, buf, err := w.(http.Hijacker).Hijack()if err != nil {panic("Hijack failed: " + err.Error())}// The server should abort the WebSocket connection if it finds// the client did not send a handshake that matches with protocol// specification.defer rwc.Close()conn, err := newServerConn(rwc, buf, req, &s.Config, s.Handshake)if err != nil {return}if conn == nil {panic("unexpected nil conn")}s.Handler(conn) }
客户端支持 HTTP 协议
服务端已经能够接受 CONNECT 请求,并返回了 200 状态码 HTTP/1.0 200 Connected to Gee RPC,客户端要做的,发起 CONNECT 请求,检查返回状态码即可成功建立连接。
// NewHTTPClient new a Client instance via HTTP as transport protocol
func NewHTTPClient(conn net.Conn, opt *Option) (*Client, error) {_, _ = io.WriteString(conn, fmt.Sprintf("CONNECT %s HTTP/1.0\n\n", defaultRPCPath))// Require successful HTTP response// before switching to RPC protocol.resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})if err == nil && resp.Status == connected {return NewClient(conn, opt)}if err == nil {err = errors.New("unexpected HTTP response: " + resp.Status)}return nil, err
}// DialHTTP connects to an HTTP RPC server at the specified network address
// listening on the default HTTP RPC path.
func DialHTTP(network, address string, opts ...*Option) (*Client, error) {return dialTimeout(NewHTTPClient, network, address, opts...)
}
通过 HTTP CONNECT 请求建立连接之后,后续的通信过程就交给 NewClient 了。
为了简化调用,提供了一个统一入口 XDial
// XDial calls different functions to connect to a RPC server
// according the first parameter rpcAddr.
// rpcAddr is a general format (protocol@addr) to represent a rpc server
// eg, http@10.0.0.1:7001, tcp@10.0.0.1:9999, unix@/tmp/geerpc.sock
func XDial(rpcAddr string, opts ...*Option) (*Client, error) {parts := strings.Split(rpcAddr, "@")if len(parts) != 2 {return nil, fmt.Errorf("rpc client err: wrong format '%s', expect protocol@addr", rpcAddr)}protocol, addr := parts[0], parts[1]switch protocol {case "http":return DialHTTP("tcp", addr, opts...)default:// tcp, unix or other transport protocolreturn Dial(protocol, addr, opts...)}
}
实现简单的 DEBUG 页面
支持 HTTP 协议的好处在于,RPC 服务仅仅使用了监听端口的 /_genrpc 路径,在其他路径上我们可以提供诸如日志、统计等更为丰富的功能。接下来我们在 /debug/genrpc 上展示服务的调用统计视图。
package GenRpcimport ("fmt""html/template""net/http"
)const debugText = `<html><body><title>GenRPC Services</title>{{range .}}<hr>Service {{.Name}}<hr><table><th align=center>Method</th><th align=center>Calls</th>{{range $name, $mtype := .Method}}<tr><td align=left font=fixed>{{$name}}({{$mtype.ArgType}}, {{$mtype.ReplyType}}) error</td><td align=center>{{$mtype.NumCalls}}</td></tr>{{end}}</table>{{end}}</body></html>`var debug = template.Must(template.New("RPC debug").Parse(debugText))type debugHTTP struct {*Server
}type debugService struct {Name stringMethod map[string]*methodType
}// Runs at /debug/geerpc
func (server debugHTTP) ServeHTTP(w http.ResponseWriter, req *http.Request) {// Build a sorted version of the data.var services []debugServiceserver.serviceMap.Range(func(namei, svci interface{}) bool {svc := svci.(*service)services = append(services, debugService{Name: namei.(string),Method: svc.method,})return true})err := debug.Execute(w, services)if err != nil {_, _ = fmt.Fprintln(w, "rpc: error executing template:", err.Error())}
}
将 debugHTTP 实例绑定到地址 /debug/genrpc:
func (server *Server) HandleHTTP() {http.Handle(defaultRPCPath, server)http.Handle(defaultDebugPath, debugHTTP{server})log.Println("rpc server debug path:", defaultDebugPath)
}
Demo
定义服务:
type Foo inttype Args struct{ Num1, Num2 int }func (f Foo) Sum(args Args, reply *int) error {*reply = args.Num1 + args.Num2return nil
}func startServer(addrCh chan string) {var foo Fool, _ := net.Listen("tcp", ":9999")_ = geerpc.Register(&foo)geerpc.HandleHTTP()addrCh <- l.Addr().String()_ = http.Serve(l, nil)
}
客户端:
func call(addrCh chan string) {client, _ := geerpc.DialHTTP("tcp", <-addrCh)defer func() { _ = client.Close() }()time.Sleep(time.Second)// send request & receive responsevar wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()args := &Args{Num1: i, Num2: i * i}var reply intif err := client.Call(context.Background(), "Foo.Sum", args, &reply); err != nil {log.Fatal("call Foo.Sum error:", err)}log.Printf("%d + %d = %d", args.Num1, args.Num2, reply)}(i)}wg.Wait()
}func main() {log.SetFlags(0)ch := make(chan string)go call(ch)startServer(ch)
}
负载均衡(load balance)
负载均衡策略
假设有多个服务实例,每个实例提供相同的功能,为了提高整个系统的吞吐量,每个实例部署在不同的机器上。客户端可以选择任意一个实例进行调用,获取想要的结果。那如何选择呢?取决了负载均衡的策略。
- 随机选择策略 - 从服务列表中随机选择一个。
- 轮询算法(Round Robin) - 依次调度不同的服务器,每次调度执行 i = (i + 1) mode n。
- 加权轮询(Weight Round Robin) - 在轮询算法的基础上,为每个服务实例设置一个权重,高性能的机器赋予更高的权重,也可以根据服务实例的当前的负载情况做动态的调整,例如考虑最近5分钟部署服务器的 CPU、内存消耗情况。
- 哈希/一致性哈希策略 - 依据请求的某些特征,计算一个 hash 值,根据 hash 值将请求发送到对应的机器。一致性 hash 还可以解决服务实例动态添加情况下,调度抖动的问题。一致性哈希的一个典型应用场景是分布式缓存服务。
…
服务发现
负载均衡的前提是有多个服务实例,首先实现一个最基础的服务发现模块 Discovery:
定义 2 个类型:
- SelectMode 代表不同的负载均衡策略,简单起见仅实现 Random 和 RoundRobin 两种策略。
- Discovery 是一个接口类型,包含了服务发现所需要的最基本的接口。
- Refresh() 从注册中心更新服务列表
- Update(servers []string) 手动更新服务列表
- Get(mode SelectMode) 根据负载均衡策略,选择一个服务实例
- GetAll() 返回所有的服务实例
package xclientimport ("errors""math""math/rand""sync""time"
)type SelectMode intconst (RandomSelect SelectMode = iota // select randomlyRoundRobinSelect // select using Robbin algorithm
)type Discovery interface {Refresh() error // refresh from remote registryUpdate(servers []string) errorGet(mode SelectMode) (string, error)GetAll() ([]string, error)
}
紧接着,实现一个不需要注册中心,服务列表由手工维护的服务发现的结构体:MultiServersDiscovery:
// MultiServersDiscovery is a discovery for multi servers without a registry center
// user provides the server addresses explicitly instead
type MultiServersDiscovery struct {// r 是一个产生随机数的实例,初始化时使用时间戳设定随机数种子,避免每次产生相同的随机数序列。r *rand.Rand // generate random numbermu sync.RWMutex // protect followingservers []string// index 记录 Round Robin 算法已经轮询到的位置,为了避免每次从 0 开始,初始化时随机设定一个值。index int // record the selected position for robin algorithm
}// NewMultiServerDiscovery creates a MultiServersDiscovery instance
func NewMultiServerDiscovery(servers []string) *MultiServersDiscovery {d := &MultiServersDiscovery{servers: servers,r: rand.New(rand.NewSource(time.Now().UnixNano())),}d.index = d.r.Intn(math.MaxInt32 - 1)return d
}
实现 Discovery 接口:
var _ Discovery = (*MultiServersDiscovery)(nil)// Refresh doesn't make sense for MultiServersDiscovery, so ignore it
func (d *MultiServersDiscovery) Refresh() error {return nil
}// Update the servers of discovery dynamically if needed
func (d *MultiServersDiscovery) Update(servers []string) error {d.mu.Lock()defer d.mu.Unlock()d.servers = serversreturn nil
}// Get a server according to mode
func (d *MultiServersDiscovery) Get(mode SelectMode) (string, error) {d.mu.Lock()defer d.mu.Unlock()n := len(d.servers)if n == 0 {return "", errors.New("rpc discovery: no available servers")}switch mode {case RandomSelect:return d.servers[d.r.Intn(n)], nilcase RoundRobinSelect:s := d.servers[d.index%n] // servers could be updated, so mode n to ensure safetyd.index = (d.index + 1) % nreturn s, nildefault:return "", errors.New("rpc discovery: not supported select mode")}
}// returns all servers in discovery
func (d *MultiServersDiscovery) GetAll() ([]string, error) {d.mu.RLock()defer d.mu.RUnlock()// return a copy of d.serversservers := make([]string, len(d.servers), len(d.servers))copy(servers, d.servers)return servers, nil
}
支持负载均衡的客户端
向用户暴露一个支持负载均衡的客户端 XClient。
type XClient struct {d Discoverymode SelectModeopt *GenRpc.Optionmu sync.Mutex // protect followingclients map[string]*GenRpc.Client
}var _ io.Closer = (*XClient)(nil)func NewXClient(d Discovery, mode SelectMode, opt *Option) *XClient {return &XClient{d: d, mode: mode, opt: opt, clients: make(map[string]*Client)}
}func (xc *XClient) Close() error {xc.mu.Lock()defer xc.mu.Unlock()for key, client := range xc.clients {// I have no idea how to deal with error, just ignore it._ = client.Close()delete(xc.clients, key)}return nil
}
XClient 的构造函数需要传入三个参数,服务发现实例 Discovery、负载均衡模式 SelectMode 以及协议选项 Option。为了尽量地复用已经创建好的 Socket 连接,使用 clients 保存创建成功的 Client 实例,并提供 Close 方法在结束后,关闭已经建立的连接。
实现客户端最基本的功能 Call:
func (xc *XClient) dial(rpcAddr string) (*Client, error) {xc.mu.Lock()defer xc.mu.Unlock()client, ok := xc.clients[rpcAddr]if ok && !client.IsAvailable() {_ = client.Close()delete(xc.clients, rpcAddr)client = nil}if client == nil {var err errorclient, err = XDial(rpcAddr, xc.opt)if err != nil {return nil, err}xc.clients[rpcAddr] = client}return client, nil
}func (xc *XClient) call(rpcAddr string, ctx context.Context, serviceMethod string, args, reply interface{}) error {client, err := xc.dial(rpcAddr)if err != nil {return err}return client.Call(ctx, serviceMethod, args, reply)
}// Call invokes the named function, waits for it to complete,
// and returns its error status.
// xc will choose a proper server.
func (xc *XClient) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {rpcAddr, err := xc.d.Get(xc.mode)if err != nil {return err}return xc.call(rpcAddr, ctx, serviceMethod, args, reply)
}
我们将复用 Client 的能力封装在方法 dial 中,dial 的处理逻辑如下:
- 检查 xc.clients 是否有缓存的 Client,如果有,检查是否是可用状态,如果是则返回缓存的 Client,如果不可用,则从缓存中删除。
- 如果步骤 1) 没有返回缓存的 Client,则说明需要创建新的 Client,缓存并返回。
另外,我们为 XClient 添加一个常用功能:Broadcast。
// Broadcast invokes the named function for every server registered in discovery
func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply interface{}) error {servers, err := xc.d.GetAll()if err != nil {return err}var wg sync.WaitGroupvar mu sync.Mutex // protect e and replyDonevar e errorreplyDone := reply == nil // if reply is nil, don't need to set valuectx, cancel := context.WithCancel(ctx)for _, rpcAddr := range servers {wg.Add(1)go func(rpcAddr string) {defer wg.Done()var clonedReply interface{}if reply != nil {clonedReply = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()}err := xc.call(rpcAddr, ctx, serviceMethod, args, clonedReply)mu.Lock()if err != nil && e == nil {e = errcancel() // if any call failed, cancel unfinished calls}if err == nil && !replyDone {reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(clonedReply).Elem())replyDone = true}mu.Unlock()}(rpcAddr)}wg.Wait()return e
}
服务发现与注册中心(registry)
如果没有注册中心,客户端需要硬编码服务端的地址,而且没有机制保证服务端是否处于可用状态。当然注册中心的功能还有很多,比如配置的动态同步、通知机制等。比较常用的注册中心有 etcd(推荐)、zookeeper、consul,一般比较出名的微服务或者 RPC 框架,这些主流的注册中心都是支持的。
Registry
首先定义 GeeRegistry 结构体,默认超时时间设置为 5 min,也就是说,任何注册的服务超过 5 min,即视为不可用状态。
// GeeRegistry is a simple register center, provide following functions.
// add a server and receive heartbeat to keep it alive.
// returns all alive servers and delete dead servers sync simultaneously.
type GeeRegistry struct {timeout time.Durationmu sync.Mutex // protect followingservers map[string]*ServerItem
}type ServerItem struct {Addr stringstart time.Time
}const (defaultPath = "/_geerpc_/registry"defaultTimeout = time.Minute * 5
)// New create a registry instance with timeout setting
func New(timeout time.Duration) *GeeRegistry {return &GeeRegistry{servers: make(map[string]*ServerItem),timeout: timeout,}
}var DefaultGeeRegister = New(defaultTimeout)
为 GeeRegistry 实现添加服务实例和返回服务列表的方法。
- putServer:添加服务实例,如果服务已经存在,则更新 start。
- aliveServers:返回可用的服务列表,如果存在超时的服务,则删除。
func (r *GeeRegistry) putServer(addr string) {r.mu.Lock()defer r.mu.Unlock()s := r.servers[addr]if s == nil {r.servers[addr] = &ServerItem{Addr: addr, start: time.Now()}} else {s.start = time.Now() // if exists, update start time to keep alive}
}func (r *GeeRegistry) aliveServers() []string {r.mu.Lock()defer r.mu.Unlock()var alive []stringfor addr, s := range r.servers {if r.timeout == 0 || s.start.Add(r.timeout).After(time.Now()) {alive = append(alive, addr)} else {delete(r.servers, addr)}}sort.Strings(alive)return alive
}
为了实现上的简单,GenRegistry 采用 HTTP 协议提供服务,且所有的有用信息都承载在 HTTP Header 中。
Get:返回所有可用的服务列表,通过自定义字段 X-Geerpc-Servers 承载。
Post:添加服务实例或发送心跳,通过自定义字段 X-Geerpc-Server 承载。
// Runs at /_geerpc_/registry
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {switch req.Method {case "GET":// keep it simple, server is in req.Headerw.Header().Set("X-Geerpc-Servers", strings.Join(r.aliveServers(), ","))case "POST":// keep it simple, server is in req.Headeraddr := req.Header.Get("X-Geerpc-Server")if addr == "" {w.WriteHeader(http.StatusInternalServerError)return}r.putServer(addr)default:w.WriteHeader(http.StatusMethodNotAllowed)}
}// HandleHTTP registers an HTTP handler for GeeRegistry messages on registryPath
func (r *GeeRegistry) HandleHTTP(registryPath string) {http.Handle(registryPath, r)log.Println("rpc registry path:", registryPath)
}func HandleHTTP() {DefaultGeeRegister.HandleHTTP(defaultPath)
}
另外,提供 Heartbeat 方法,便于服务启动时定时向注册中心发送心跳,默认周期比注册中心设置的过期时间少 1 min。
// Heartbeat send a heartbeat message every once in a while
// it's a helper function for a server to register or send heartbeat
func Heartbeat(registry, addr string, duration time.Duration) {if duration == 0 {// make sure there is enough time to send heart beat// before it's removed from registryduration = defaultTimeout - time.Duration(1)*time.Minute}var err errorerr = sendHeartbeat(registry, addr)go func() {// 定时发送心跳报文t := time.NewTicker(duration)for err == nil {<-t.Cerr = sendHeartbeat(registry, addr)}}()
}func sendHeartbeat(registry, addr string) error {log.Println(addr, "send heart beat to registry", registry)httpClient := &http.Client{}req, _ := http.NewRequest("POST", registry, nil)req.Header.Set("X-Geerpc-Server", addr)if _, err := httpClient.Do(req); err != nil {log.Println("rpc server: heart beat err:", err)return err}return nil
}
在 xclient 中对应实现 Discovery:
package xclienttype GeeRegistryDiscovery struct {*MultiServersDiscoveryregistry stringtimeout time.DurationlastUpdate time.Time
}const defaultUpdateTimeout = time.Second * 10func NewGeeRegistryDiscovery(registerAddr string, timeout time.Duration) *GeeRegistryDiscovery {if timeout == 0 {timeout = defaultUpdateTimeout}d := &GeeRegistryDiscovery{MultiServersDiscovery: NewMultiServerDiscovery(make([]string, 0)),registry: registerAddr,timeout: timeout,}return d
}
- GeeRegistryDiscovery 嵌套了 MultiServersDiscovery,很多能力可以复用。
- registry 即注册中心的地址
- timeout 服务列表的过期时间
- lastUpdate 是代表最后从注册中心更新服务列表的时间,默认 10s 过期,即 10s 之后,需要从注册中心更新新的列表。
实现 Update 和 Refresh 方法,超时重新获取的逻辑在 Refresh 中实现:
func (d *GeeRegistryDiscovery) Update(servers []string) error {d.mu.Lock()defer d.mu.Unlock()d.servers = serversd.lastUpdate = time.Now()return nil
}func (d *GeeRegistryDiscovery) Refresh() error {d.mu.Lock()defer d.mu.Unlock()if d.lastUpdate.Add(d.timeout).After(time.Now()) {return nil}log.Println("rpc registry: refresh servers from registry", d.registry)resp, err := http.Get(d.registry)if err != nil {log.Println("rpc registry refresh err:", err)return err}servers := strings.Split(resp.Header.Get("X-Geerpc-Servers"), ",")d.servers = make([]string, 0, len(servers))for _, server := range servers {if strings.TrimSpace(server) != "" {d.servers = append(d.servers, strings.TrimSpace(server))}}d.lastUpdate = time.Now()return nil
}
Get 和 GetAll 与 MultiServersDiscovery 相似,唯一的不同在于,GeeRegistryDiscovery 需要先调用 Refresh 确保服务列表没有过期。
func (d *GeeRegistryDiscovery) Get(mode SelectMode) (string, error) {if err := d.Refresh(); err != nil {return "", err}return d.MultiServersDiscovery.Get(mode)
}func (d *GeeRegistryDiscovery) GetAll() ([]string, error) {if err := d.Refresh(); err != nil {return nil, err}return d.MultiServersDiscovery.GetAll()
}
个人总结
- 照着写(抄)完了,迷迷糊糊懵懵懂懂。
- 加深反射理解,在编译阶段判断是否实现了接口,hijack操作。
- 刚接触微服务,需要多复习几遍。
- 大佬地址:geerpc
相关文章:
七天实现一个go rpc框架
目录rpc协议目的关于RPC和框架服务端与消息编码确保接口的实现消息的序列化与反序列化通信过程服务端的实现main 函数支持并发与异步的客户端Call 的设计实现客户端服务注册(service register)通过反射实现 service集成到服务端超时处理创建连接超时Client.Call 超时服务端处理…...
EMQX Cloud Serverless 正式上线:三秒部署、按量计费的 MQTT Serverless 云服务
近日,全球领先的开源物联网数据基础设施软件供应商 EMQ 正式发布了 MQTT Serverless 云服务 —— EMQX Cloud Serverless 的 Beta 版本,开创性地采用弹性多租户技术,用户无需关心服务器基础设施和服务规格伸缩所需资源,仅用三秒即…...
快速排序 容易理解的版本
package huaweiod.排序算法;import java.util.Arrays;public class 快速排序 {public static void main(String[] args) {int[] arr {9,8,3,5,6,7,8,9};mysort(arr, 0, arr.length - 1); // myprint(arr," ");}private static void myprint(int[] arr, Strin…...
Linux体系结构
Linux体系结构一、引入概念二、内核三、管理1、内存管理2、进程管理3、进程调度控制进程对CPU的访问4、设备驱动程序和网络接口四、Linux Shell五、磁盘分区硬盘内的分区Linux下磁盘分区和目录的关系一、引入 操作系统的本质是什么? 是一种管理(协调)资源机制&…...
【汽车电子】什么是ADAS?
文章目录ADAS——先进驾驶辅助系统ADAS——商用车安全性能提升的利器总结ADAS——先进驾驶辅助系统 ADAS,全称Advanced Driver Assistance Systems ,“先进驾驶辅助系统”,adas是汽车上面的一种系统,中文名叫做高级驾驶辅助系统&…...
java: 错误: 不支持发行版本 5(快速解决办法)
目录 前言 一、出现报错 二、报错的原因 三、解决办法 四、解决成功 前言 在maven web项目上面要部署运行tomcat时候,会出现这个问题 一、出现报错 java: 错误: 不支持发行版本 5 二、报错的原因 (1)官方解释:这个错误…...
QT中pro文件常用qmake语法
变量 配置QT模块 QT core gui sql network QT - sql注释 # 开启注释 # DEFINES QT_DISABLE_DEPRECATED_BEFORE0x060000 # disables all the APIs deprecated before Qt 6.0.0添加源文件 SOURCES \main.cpp \widget.cppSOURCES *.cpp SOURCES 1.cpp 2.cpp 3.cpp添加…...
Android 一体机研发之修改系统设置————声音
Android 一体机研发之修改系统设置————屏幕亮度 Android 一体机研发之修改系统设置————声音 Android 一体机研发之修改系统设置————自动锁屏 修改系统设置系列篇章马上开张了! 本章将为大家细节讲解声音。 对于声音功能大家都不陌生,在多…...
挖掘长尾关键词的五大思路
本文重点介绍做SEO挖掘长尾词的五大思路:长尾词是搜索量不大,但是关键词的量非常庞大的词,那我们可以通过以下方法挖掘:1、目标型长尾搜索的关键词是直接包含了商业需求的,直接能跟我们的服务或产品对接的,…...
ccc-Brief Introduction of Deep Learning-李宏毅(6)
文章目录Three Steps for Deep LearningFully Connect Feedforward NetworkMatrix OperationOutput Layer as Multi-Class ClassifierExample ApplicationNeural NetworkGoodness of functionPick the best functionThree Steps for Deep Learning 与机器学习三步骤基本相同。 …...
【TVM 学习资料】用 Schedule 模板和 AutoTVM 优化算子
完整 TVM 中文文档,访问→TVM 中文站 作者:Lianmin Zheng,Chris Hoge 本教程将展示如何用 TVM 张量表达式(TE)语言编写 schedule 模板,并通过 AutoTVM 对模板进行搜索,从而找到最佳 schedule。…...
蓝牙Mesh学习笔记(一)
Mesh系统结构1 Mesh网络分层1.1 模型层(Model layer)1.2 基础模型层(Foundation Model layer)1.3 接入层(Access layer)1.4 上层传输层(Upper transport layer)1.5 下层传输层(Lower transport layer)1.6 网络层(Network layer)1.7 承载层(Bearer layer)1.8 BLE内核规范(BLE Co…...
【1234. 替换子串得到平衡字符串】
来源:力扣(LeetCode) 描述: 有一个只含有 Q, W, E, R 四种字符,且长度为 n 的字符串。 假如在该字符串中,这四个字符都恰好出现 n/4 次,那么它就是一个「平衡字符串」。 给你一个这样的字符…...
独自开:提供创业机会、享受平台分红、推出新颖赚钱副业
💗wei_shuo的个人主页 💫wei_shuo的学习社区 🌐Hello World ! 前言 独自开:一款聚焦软件定制开发,独立、自主、开放平台 独创分层标准化平台架构,满足系统不断生长的个性化需求多端一键部署前端业务交互与展…...
C++【二叉树进阶(二叉搜索树)】
文章目录前言1、二叉搜索树1-1、 二叉搜索树概念2、二叉搜索树操作2-1、树和节点的基本框架2-2、二叉搜索树的查找2-3、中序遍历2-4、二叉搜索树的插入2-5、二叉搜索树的删除3、二叉搜索树的模拟实现3-1、循环版本3-2、递归版本4、二叉搜索树的应用4-1、K模型4-2、KV模型4-3、K…...
【C++初阶】vector的使用
大家好我是沐曦希💕 文章目录一.vector介绍二、构造函数三、遍历1.[]2.迭代器3.范围for四、容量操作1.扩容机制五、增删查改六、迭代器失效问题一.vector介绍 vector是表示可变大小数组的序列容器。就像数组一样,vector也采用的连续存储空间来存储元素。…...
OPenPCDet windows流程及其问题
首先的首先极其不推荐将OPenPCDet运行在Windows上,过程非常复杂,适配也不是很好,不推荐在windows下载并训练,本人做这个主要是方便再笔记本电脑上对实验结果进行整理,处理一些简单的推理评估等任务。如有必要请继续阅读: 以下是正文: 常规的安装流程不再赘述,请参考官方…...
【自学Python】Python字符大小写判断
大纲 Python字符串是否是小写 Python字符串是否是小写教程 在开发过程中,有时候我们需要判断一个 字符串 是否是小写形式(即,所有的字符都是小写字母,不是英文字符的忽略不做判断),在 Python 中ÿ…...
设计模式之美总结(开源实战篇)
title: 设计模式之美总结(开源实战篇) date: 2023-01-10 17:13:05 tags: 设计模式 categories:设计模式 cover: https://cover.png feature: false 文章目录1. Java JDK 应用到的设计模式1.1 工厂模式在 Calendar 类中的应用1.2 建造者模式在 Calendar …...
两个月,测试转岗产品经理,我是怎么规划的?
本期同学依旧来自深圳 测试到产品转变,用了两个月 本周,为大家介绍M同学的佛系转岗经历 学员档 学员档案 原岗位:测试 转岗级别:中级产品经理 转岗特点: 1.未接触产品工作 2.对岗位地点要求严格 先看结果 …...
三数之和-力扣15-java排序+双指针
一、题目描述给你一个整数数组 nums ,判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k 且 j ! k ,同时还满足 nums[i] nums[j] nums[k] 0 。请你返回所有和为 0 且不重复的三元组。注意:答案中不可以包含重复的三元组。…...
【编程基础之Python】3、创建Python虚拟环境
【编程基础之Python】3、创建Python虚拟环境创建Python虚拟环境为什么需要虚拟环境Windows上的Anaconda创建虚拟环境conda 命令conda env 命令创建虚拟环境切换虚拟环境验证虚拟环境Linux上的Anaconda创建虚拟环境创建虚拟环境切换虚拟环境验证虚拟环境总结创建Python虚拟环境 …...
kettle开发-Day36-循环驱动作业
前言:在日常数据处理时,我们通过变量传参来完成某个日期的数据转换。但可能因程序或者网络原因导致某个时间段的数据抽取失败。常见导致kettle作业失败的原因大概分为三大类,数据源异常、数据库异常、程序异常。因此面对这些异常时࿰…...
2023秋招 新凯来 算法工程师 面经分享
本专栏分享 计算机小伙伴秋招春招找工作的面试经验和面试的详情知识点 专栏首页:秋招算法类面经分享 主要分享计算机算法类在面试互联网公司时候一些真实的经验 一面 技术面 30分钟左右 1.主要是问项目和论文上的东西,问的不深,中间还介绍他们是做缺陷检测的,大概问了16分钟…...
Web3CN|Damus刷频背后,大众在期待什么样的去中心化社交?
刚过去的一周,许多人的朋友圈包括Twitter、Faceboo在内都在被一串公钥字母刷屏,其重要起因就是 Twitter 前首席执行官 Jack Dorsey 发推称,(2月1日)基于去中心化社交协议 Nostr 的社交产品 Damus 和 Amethyst 已分别在…...
Jenkins自动发布到WindowsServer,在WindowsServer执行的命令
echo off set apppoolname"6.usegitee" set websitename"6.usegitee" set webfolder"usegitee" echo 停止站点的应用程序池 C:\Windows\System32\inetsrv\appcmd.exe stop apppool %apppoolname% echo 停止站点 c:\Windows\System32\inetsrv\a…...
【Git学习】Git如何Clone带有Submodule的仓库?
文章目录一、问题描述二、解决问题三、参考链接四、解决问题4.1 下载主模块4.2 查看主模块的配置4.2 子模块的添加4.3 查看子模块的配置4.4 查看子模块的检出状态4.5 检出submodule4.6 再次查看.git/config4.7 重新打开Android Studio运行代码一、问题描述 在GitHub上下载了一…...
C语言进阶——通讯录模拟实现
🌇个人主页:_麦麦_ 📚今日名言:只有走在路上,才能摆脱局限,摆脱执着,让所有的选择,探寻,猜测,想象都生机勃勃。——余秋雨《文化苦旅》 目录 一、前言 二、正…...
【C#基础】C# 变量和常量的使用
序号系列文章1【C#基础】C# 程序通用结构总结2【C#基础】C# 程序基础语法解析3【C#基础】C# 数据类型总结文章目录前言一. 变量(variable)1,变量定义及初始化2,变量的类别3,接收输出变量二. 常量(constant&…...
nvm安装后出现‘node‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件
出现这个问题多半是path地址不对。 打开系统环境变量。看看path里面有没有?没有的话,加上就行! 我的报错原因就是因为path里没有自动加上nvm的相关路径。 注意项: 1,在安装nvm之前,提前要把本机以前安装…...
主题网站设计模板/百度模拟搜索点击软件
准备好滚滚的JOE – Java on Everything吗? Java on Everything掌握了如何在几乎所有内容上运行Java的关键。 没有操作系统? 没问题。 JOE无需操作系统即可工作。 放弃操作系统的好处是什么? Java on Everything是由约瑟夫库里格(…...
学网站建设 赚钱/女装标题优化关键词
我们来看看未来区块链技术会怎样影响我们的生活。20年后的某一天,M国总统大选正在如火如荼地进行,你把智能手表调到投票界面,看了下选举人:今年好像没什么有特色的竞选人啊。李查得?没意思,一个中规中矩的政…...
民政府公众信息网站建设/推广一次多少钱
所有题目均有四种语言实现。C++ 实现目录、Python实现目录、Java实现目录、JavaScript实现目录 题目 题目描述:去除文本多余空格,但不去除配对单引号之间的多余空格。给出关键词的起始和结束下标,去除多余空格后刷新关键词的起始和结束下标。输入描述: 输入为两行字符串: 第…...
响应式网站怎么写/网站推广要点
// socket已经连接成功 var socketOpen false // socket已经调用关闭function var socketClose false // socket发送的消息队列 var socketMsgQueue [] // 判断心跳变量 var heart // 心跳失败次数 var heartBeatFailCount 0 // 终止心跳 var heartBeatTimeOut null; //…...
兖州城乡建设局网站/免费网站提交入口
内省是 Java 语言对 Bean 类属性、事件的一种处理方法(也就是说给定一个javabean对象,我们就可以得到/调用它的所有的get/set方法)。例如类 A 中有属性 name, 那我们可以通过 getName,setName 来得到其值或者设置新的值。通过 getName/setName 来访问 name 属性,这就是默认的规…...
可以自己做网站/营销软文500字
参考博客: http://openwares.net/database/mybatis_generator_example.html 一、Example类的作用:一个用于筛选复杂条件的类 二、Example类中查询方法的介绍 1、Criterion Criterion是最基本,它是最底层的Where条件,用于字段级的筛选…...