用gRPC框架实现简单的调用和双向流式传输。
gRPC是一个高性能、通用的开源RPC框架,主要由Google主要面向移动应用开发,并基于HTTP/2协议标准设计,基于ProtoBuf序列化协议开发,且支持众多开发语言。
gRPC主要有4中请求/响应模式,分别是:
1. 简单模式 (Simple RPC)
这种模式最为传统,即客户端发起一次请求,服务端响应一次。这和大家平常熟悉的RPC没有什么区别,所以不再做介绍。
2. 服务端数据流模式 (Server-side streaming RPC)
这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端把该股票的实时数据源源不断返回客户端。
3. 客户端数据流模式 (Client-side streaming RPC)
与服务端数据流模式相反,这次是客户端源源不断向服务端发送数据流,在发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器报送数据。
4. 双向数据流模式(Bidirectional streaming RPC)
顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,这就是可以实现实时交互。典型的例子是聊天机器人。
先看 hello.proto 文件,定义helloService接口:
1 2 3 4 5 6 7 8 9 10 11 12 |
syntax = "proto3"; package main; message String { string value = 1; } service HelloService { rpc Hello (String) returns (String); rpc Channel (stream String) returns (stream String); } |
这段代码有两个接口,Hello函数是简单模式、Channel函数是双向数据流模式。
关键字stream指定启用流特性,参数部分是接受客户端的流,返回值是返回给客户端的流。
用protoc-gen-go内置的gRPC插件自动生成GRPC代码:
1 |
$ protoc --go_out=plugins=grpc:. hello.proto |
我们看看需要关注的自动生成的代码内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// hello.pb.go, Line172, Client // HelloServiceClient is the client API for HelloService service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type HelloServiceClient interface { Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) Channel(ctx context.Context, opts ...grpc.CallOption) (HelloService_ChannelClient, error) } // hello.pb.go, Line225, Server // HelloServiceServer is the server API for HelloService service. type HelloServiceServer interface { Hello(context.Context, *String) (*String, error) Channel(HelloService_ChannelServer) error } |
这是客户端和服务端调用接口,gRPC通过context为每个方法提供了上下文支持。
客户端在调用方法的时候,可以通过可选的grpc.CallOption类型的参数提供额外的上下文信息。
Channel方法返回的HelloService_ChannelClient、HelloService_ChannelServer用于和服务端(客户端)进行双向通信。
这两个方法都是接口类型,其实现具体如下:
1 2 3 4 5 6 7 8 9 10 11 |
type HelloService_ChannelClient interface { Send(*String) error Recv() (*String, error) grpc.ClientStream } type HelloService_ChannelServer interface { Send(*String) error Recv() (*String, error) grpc.ServerStream } |
客户端和服务端流辅助接口均定义了Send、Recv方法,用于流数据的双向通信。
基于服务端的HelloServiceServer接口可以重新实现HelloService服务,服务端完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
package main import ( "context" "fmt" "google.golang.org/grpc" "io" "log" "net" ) type HelloServiceImpl struct{} func (p *HelloServiceImpl) Hello( ctx context.Context, args *String, ) (*String, error) { fmt.Println("HelloService.Hello() recv: ", args.GetValue()) reply := &String{Value: "hello: " + args.GetValue()} return reply, nil } func (p *HelloServiceImpl) Channel( stream HelloService_ChannelServer, ) error { for { args, err := stream.Recv() if err != nil { //如果遇到io.EOF表示客户端流被关闭。 if err == io.EOF { return nil } return err } fmt.Println("HelloService.Channel() recv: ", args.GetValue()) reply := &String{Value: "hello: " + args.GetValue()} err = stream.Send(reply) if err != nil { return err } } } func main() { grpcServer := grpc.NewServer() RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl)) listener, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal("listen err: ", err) } grpcServer.Serve(listener) } |
服务端代码逻辑是这样:
1. 通过grpc.NewServer()构造一个gRPC服务对象
2. 通过gRPC插件生成的RegisterHelloServiceServer函数注册实现的HelloServiceImpl服务。
2. 通过 grpcServer.Serve(lis) 在一个监听端口上提供gRPC服务。
客户端代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
package main import ( "context" "fmt" "google.golang.org/grpc" "io" "log" "time" ) func main() { // 远端服务器地址是192.168.1.101~ Ubuntu AMD 服务器 // Dial函数负责和gRPC服务建立链接 conn, err := grpc.Dial("192.168.1.101:1234", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() // 基于已建立的链接构造HelloServiceClient对象 client := NewHelloServiceClient(conn) // 调用简单模式接口 reply, err := client.Hello(context.Background(), &String{Value: "hello"}) if err != nil { log.Fatal(err) } fmt.Println(reply.GetValue()) // 创建CancelContext ctx, cancel := context.WithCancel(context.Background()) // 这里ctx的作用就是控制流关闭的 stream, err := client.Channel(ctx) if err != nil { log.Fatal(err) } // 创建Goroutine每1s发送数据,stream关闭后就不再发送。 go func(ctx context.Context) { send: for { select { case <-ctx.Done(): fmt.Println("Canceled! Stop Send.") break send default: if err := stream.Send(&String{Value: "hi"}); err != nil { log.Fatal(err) } } time.Sleep(time.Second) } }(ctx) // 主线程接收数据,接收3次数据后,主动断开连接。 count := 0 recv: for { select { case <-ctx.Done(): fmt.Println("Canceled! Stop Recv.") break recv default: reply, err := stream.Recv() if err != nil { if err == io.EOF { break recv } log.Fatal(err) } fmt.Println(reply.GetValue()) if count += 1; count == 3 { cancel() } } } } |
客户端代码逻辑如下:
grpc.Dial 负责和gRPC服务建立链接,然后NewHelloServiceClient函数基于已经建立的链接构造HelloServiceClient对象。
返回的client是一个HelloServiceClient接口对象,通过接口定义的方法就可以调用服务端对应的gRPC服务提供的方法。
gRPC和标准库RPC框架有一个区别,gRPC生成的接口并不支持异步调用,不过我们可以在多个Goroutine之间安全地共享gRPC底层的HTTP/2链接,因此可以通过在另一个Goroutine阻塞调用的方式模拟异步调用。
RPC是远程函数调用,因此每次调用的函数参数和返回值不能太大,否则将严重影响每次调用的响应时间。因此传统RPC方法调用对于上传下载较大数据量场景并不合适。
同时传统RPC模式也不适用于对时间不确定的订阅和发布模式。为此,gRPC框架针对服务器端和客户端分别提供了流特性。客户端或服务端的单项流是双向流的特殊情况。