如何使用golang實(shí)現(xiàn)自定義RPC框架
RPC (Remote Procedure Call)是一種遠(yuǎn)程調(diào)用協(xié)議,通過網(wǎng)絡(luò)傳輸,使得程序能夠像本地調(diào)用一樣調(diào)用遠(yuǎn)程服務(wù)。在現(xiàn)代微服務(wù)架構(gòu)中,RPC協(xié)議被廣泛使用。golang通過標(biāo)準(zhǔn)庫的net/rpc包提供了一套RPC框架,但是這個(gè)框架無法滿足一些特定的業(yè)務(wù)需求,本文就來介紹如何使用golang自己實(shí)現(xiàn)一個(gè)RPC框架。
1. 基本概念
在實(shí)現(xiàn)自定義RPC框架之前,需要先了解以下幾個(gè)基本概念:
- Service:RPC調(diào)用的服務(wù),即提供RPC服務(wù)的函數(shù)集合。
- Method:Service中的方法,即具體的RPC調(diào)用方法。
- Codec:序列化和反序列化的方法,將調(diào)用的參數(shù)和返回值序列化成二進(jìn)制數(shù)據(jù),以便通過網(wǎng)絡(luò)傳輸。
- Transport:網(wǎng)絡(luò)傳輸協(xié)議,用于將序列化后的二進(jìn)制數(shù)據(jù)通過網(wǎng)絡(luò)傳輸?shù)竭h(yuǎn)程服務(wù)。
2. 實(shí)現(xiàn)步驟
接下來我們就來實(shí)現(xiàn)一個(gè)簡單的自定義RPC框架,步驟如下:
- 定義Service和Method
- 實(shí)現(xiàn)Codec
- 實(shí)現(xiàn)Transport
- 完成框架
2.1 定義Service和Method
我們以一個(gè)簡單的計(jì)算器服務(wù)為例,在服務(wù)端提供兩個(gè)方法Add和Multiply,客戶端可以通過RPC調(diào)用這兩個(gè)方法。
定義服務(wù):
`go
// 定義CalculatorService接口
type CalculatorService interface {
Add(int, int) int
Multiply(int, int) int
}
// 實(shí)現(xiàn)具體的CalculatorService
type CalculatorServiceImpl struct {}
func (c *CalculatorServiceImpl) Add(a, b int) int {
return a + b
}
func (c *CalculatorServiceImpl) Multiply(a, b int) int {
return a * b
}
定義Service和Method之后,接下來需要定義一個(gè)struct來存儲Service和其對應(yīng)的Method。同時(shí),定義一個(gè)Register方法,用于注冊新的Service和Method。`gotype Server struct { services map*service}type service struct { typ reflect.Type method map*methodType}type methodType struct { method reflect.Method ArgType reflect.Type ReplyType reflect.Type}func (s *Server) Register(receiver interface{}) error { service := new(service) service.typ = reflect.TypeOf(receiver).Elem() service.method = make(map*methodType) for i := 0; i < service.typ.NumMethod(); i++ { method := service.typ.Method(i) mType := method.Type if mType.NumIn() != 3 || mType.NumOut() != 1 { continue } argType := mType.In(1) replyType := mType.In(2) if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) { continue } service.method = &methodType{ method: method, ArgType: argType, ReplyType: replyType, } } s.services = service return nil}func isExportedOrBuiltinType(t reflect.Type) bool { pkgPath := t.PkgPath() return pkgPath == "" || pkgPath == "builtin"}
在Register方法中,循環(huán)遍歷service.typ中的所有方法,將滿足條件的方法添加到service.method中。最后將service添加到Server.services中。
2.2 實(shí)現(xiàn)Codec
Codec用于將調(diào)用的參數(shù)和返回值序列化成二進(jìn)制數(shù)據(jù),以便通過網(wǎng)絡(luò)傳輸。
在這里,我們使用golang的標(biāo)準(zhǔn)庫encoding/gob實(shí)現(xiàn)Codec。Gob是golang標(biāo)準(zhǔn)庫中的編解碼庫,支持任意類型的編解碼和傳輸,比JSON和XML更高效。在實(shí)現(xiàn)Codec之前,需要先定義一個(gè)request結(jié)構(gòu)體和response結(jié)構(gòu)體,用于存儲調(diào)用信息和返回信息。
`go
type request struct {
ServiceMethod string // 形如"Service.Method"
Seq uint64 // 請求序列號
Args byte // 客戶端傳遞的參數(shù)
}
type response struct {
Seq uint64 // 請求序列號
ServiceMethod string // 形如"Service.Method"
Error string // 存儲錯(cuò)誤信息
Reply byte // 存儲響應(yīng)參數(shù)
}
接下來實(shí)現(xiàn)Codec,具體實(shí)現(xiàn)代碼如下:`gotype Codec struct { conn io.ReadWriteCloser dec *gob.Decoder enc *gob.Encoder mutex sync.Mutex ids uint64 pending map*call}type call struct { req *request resp *response done chan *call}func (c *Codec) WriteRequest(method string, args interface{}) (uint64, error) { c.mutex.Lock() defer c.mutex.Unlock() id := c.ids c.ids++ req := &request{ ServiceMethod: method, Seq: id, } buf := bytes.NewBuffer(nil) enc := gob.NewEncoder(buf) if err := enc.Encode(args); err != nil { return 0, err } req.Args = buf.Bytes() call := &call{ req: req, resp: new(response), done: make(chan *call), } c.pending = call if err := c.enc.Encode(req); err != nil { delete(c.pending, id) return 0, err } return id, nil}func (c *Codec) ReadResponseHeader() (*rpc.Response, error) { c.mutex.Lock() defer c.mutex.Unlock() var resp response if err := c.dec.Decode(&resp); err != nil { return nil, err } call := c.pending delete(c.pending, resp.Seq) call.resp = &resp call.done <- call return &rpc.Response{ ServiceMethod: resp.ServiceMethod, Seq: resp.Seq, Error: errors.New(resp.Error), }, nil}func (c *Codec) ReadResponseBody(x interface{}) error { c.mutex.Lock() defer c.mutex.Unlock() call := <-c.pending.done if call.resp.Error != "" { return errors.New(call.resp.Error) } dec := gob.NewDecoder(bytes.NewBuffer(call.resp.Reply)) return dec.Decode(x)}
在上面的代碼中,我們使用了一個(gè)pending map來存儲請求的序列號和請求的返回值。在WriteRequest方法中,我們將請求信息編碼成二進(jìn)制數(shù)據(jù),然后將請求信息和該請求的channel存儲到pending中。在ReadResponseHeader和ReadResponseBody方法中,我們根據(jù)pending中的請求序列號獲取該請求對應(yīng)的call,然后將call.resp進(jìn)行解碼后返回。
2.3 實(shí)現(xiàn)Transport
Transport用于將序列化后的二進(jìn)制數(shù)據(jù)通過網(wǎng)絡(luò)傳輸?shù)竭h(yuǎn)程服務(wù)。
在golang中,可以使用net包來實(shí)現(xiàn)簡單的Socket編程。在這里,我們通過net.Dial建立連接后,將Codec中序列化后的數(shù)據(jù)通過Socket發(fā)送到遠(yuǎn)程服務(wù)端。
`go
type Transport struct {
conn io.ReadWriteCloser
}
func (t *Transport) Dial(network, address string) error {
conn, err := net.Dial(network, address)
if err != nil {
return err
}
t.conn = conn
return nil
}
func (t *Transport) Close() error {
return t.conn.Close()
}
func (t *Transport) Codec() rpc.ClientCodec {
return &Codec{
conn: t.conn,
dec: gob.NewDecoder(t.conn),
enc: gob.NewEncoder(t.conn),
pending: make(map*call),
}
}
2.4 完成框架最后,我們完成自定義RPC框架的實(shí)現(xiàn)。具體代碼如下:`gotype Server struct { services map*service}type service struct { typ reflect.Type method map*methodType}type methodType struct { method reflect.Method ArgType reflect.Type ReplyType reflect.Type}func (s *Server) Register(receiver interface{}) error { service := new(service) service.typ = reflect.TypeOf(receiver).Elem() service.method = make(map*methodType) for i := 0; i < service.typ.NumMethod(); i++ { method := service.typ.Method(i) mType := method.Type if mType.NumIn() != 3 || mType.NumOut() != 1 { continue } argType := mType.In(1) replyType := mType.In(2) if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) { continue } service.method = &methodType{ method: method, ArgType: argType, ReplyType: replyType, } } s.services = service return nil}func (s *Server) ServeCodec(codec rpc.ServerCodec) error { for { req, err := codec.ReadRequestHeader() if err != nil { if err != io.EOF && err != io.ErrUnexpectedEOF { log.Println("rpc server:", err) } return err } serviceMethod := req.ServiceMethod dot := strings.LastIndex(serviceMethod, ".") if dot < 0 { err := errors.New("rpc server: service/method request ill-formed: " + serviceMethod) log.Println(err.Error()) resp := &rpc.Response{ ServiceMethod: serviceMethod, Seq: req.Seq, Error: err.Error(), } codec.WriteResponse(resp, nil) continue } serviceName, methodName := serviceMethod, serviceMethod service, ok := s.services if !ok { err := errors.New("rpc server: can't find service " + serviceName) log.Println(err.Error()) resp := &rpc.Response{ ServiceMethod: serviceMethod, Seq: req.Seq, Error: err.Error(), } codec.WriteResponse(resp, nil) continue } mtype, ok := service.method if !ok { err := errors.New("rpc server: can't find method " + methodName) log.Println(err.Error()) resp := &rpc.Response{ ServiceMethod: serviceMethod, Seq: req.Seq, Error: err.Error(), } codec.WriteResponse(resp, nil) continue } argv := reflect.New(mtype.ArgType) replyv := reflect.New(mtype.ReplyType).Elem() if err = codec.ReadRequestBody(argv.Interface()); err != nil { log.Println("rpc server: ", err) resp := &rpc.Response{ ServiceMethod: serviceMethod, Seq: req.Seq, Error: err.Error(), } codec.WriteResponse(resp, nil) continue } // Call the service method. returnValues := mtype.method.Func.Call(reflect.Value{ reflect.ValueOf(service), reflect.ValueOf(argv.Interface()), replyv, }) // The return value for the method is an error. errInter := returnValues.Interface() if errInter != nil { err := errInter.(error) log.Println("rpc server: ", err) resp := &rpc.Response{ ServiceMethod: serviceMethod, Seq: req.Seq, Error: err.Error(), } codec.WriteResponse(resp, nil) continue } resp := &rpc.Response{ ServiceMethod: serviceMethod, Seq: req.Seq, Error: "", } if err = codec.WriteResponse(resp, replyv.Interface()); err != nil { log.Println("rpc server: ", err) } }}func (s *Server) ServeTransport(transport *Transport) error { codec := transport.Codec() defer transport.Close() return s.ServeCodec(codec)}func isExportedOrBuiltinType(t reflect.Type) bool { pkgPath := t.PkgPath() return pkgPath == "" || pkgPath == "builtin"}
在上面的代碼中,我們定義了一個(gè)Server結(jié)構(gòu)體,用于注冊Service和Method,同時(shí)實(shí)現(xiàn)ServeCodec和ServeTransport方法,用于在服務(wù)端處理RPC請求。
3. 測試
完成自定義RPC框架的實(shí)現(xiàn)之后,我們需要對其進(jìn)行測試。下面我們將分別在服務(wù)端和客戶端使用該RPC框架。
服務(wù)端代碼:
`go
func main() {
server := new(Server)
server.services = make(map*service)
_ = server.Register(&CalculatorServiceImpl{})
transport := new(Transport)
_ = transport.Dial("tcp", "localhost:8080")
defer transport.Close()
log.Fatal(server.ServeTransport(transport))
}
在服務(wù)端,我們首先通過Server.Register方法注冊了一個(gè)CalculatorServiceImpl服務(wù),然后使用Transport.Dial方法連接到特定的地址。客戶端代碼:`gofunc main() { transport := new(Transport) _ = transport.Dial("tcp", "localhost:8080") defer transport.Close() client := rpc.NewClientWithCodec(transport.Codec()) var res int err := client.Call("CalculatorService.Add", int{10, 20}, &res) if err != nil { log.Fatal(err) } log.Printf("Add(10, 20) = %d", res) var mul int err = client.Call("CalculatorService.Multiply", int{10, 20}, &mul) if err != nil { log.Fatal(err) } log.Printf("Multiply(10, 20) = %d", mul)}
在客戶端,我們首先通過Transport.Dial方法連接到服務(wù)端,然后通過rpc.NewClientWithCodec方法創(chuàng)建一個(gè)客戶端,并使用client.Call方法調(diào)用服務(wù)端的方法。
最后,我們啟動服務(wù)端和客戶端,可以看到客戶端成功調(diào)用了服務(wù)端提供的Add和Multiply方法。
4. 總結(jié)
本文介紹了如何使用golang實(shí)現(xiàn)自定義RPC框架,包括定義Service和Method,實(shí)現(xiàn)Codec和Transport,完成框架等步驟,并通過一個(gè)簡單的計(jì)算器服務(wù)對該RPC框架進(jìn)行了測試。該自定義RPC框架適用于一些特定的業(yè)務(wù)需求,可以滿足不同的RPC調(diào)用場景。
以上就是IT培訓(xùn)機(jī)構(gòu)千鋒教育提供的相關(guān)內(nèi)容,如果您有web前端培訓(xùn),鴻蒙開發(fā)培訓(xùn),python培訓(xùn),linux培訓(xùn),java培訓(xùn),UI設(shè)計(jì)培訓(xùn)等需求,歡迎隨時(shí)聯(lián)系千鋒教育。