发布订阅是一个常见的设计模式,开源社区中已经存在很多该模式的实现。
我们根据gRPC框架实现一个发布订阅模式。 —— Go语言高级编程
之前学习第一章的时候,我们自己实现了一个发布订阅模型,可以回忆一下:
这次我们使用的是docker项目中提供的pubsub实现,下面是基于pubsub包实现的本地发布订阅代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
package main import ( "fmt" "github.com/docker/docker/pkg/pubsub" "strings" "time" ) func main() { p := pubsub.NewPublisher(100*time.Microsecond, 10) golang := p.SubscribeTopic(func(v interface{}) bool { if key, ok := v.(string); ok { if strings.HasPrefix(key, "golang:") { return true } } return false }) docker := p.SubscribeTopic(func(v interface{}) bool { if key, ok := v.(string); ok { if strings.HasPrefix(key, "docker:") { return true } } return false }) go p.Publish("wang") go p.Publish("golang: https://golang.org") go p.Publish("docker: https://www.docker.com") time.Sleep(time.Second * 2) go func() { fmt.Println("golang topic:", <-golang) }() go func() { fmt.Println("docker topic:", <-docker) }() time.Sleep(time.Second * 3) fmt.Println("end") } |
有了这段代码之后,我们再来实现一个基于gRPC的网络版发布订阅模式。
先写protobuf代码:
1 2 3 4 5 6 7 8 9 10 11 12 |
syntax = "proto3"; package main; message String { string value = 1; } service PubsubService { rpc Publish (String) returns (String); rpc Subscribe (String) returns (stream String); } |
这里Subscribe函数是一个单向流,它由服务端发送给客户端。
代码生成命令这里就不重复了。
然后开始写服务端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
package main import ( "context" "fmt" "log" "net" "strings" "time" //"AdvancedGo/pubsub/pubsub" //导入本地包 pubsub "github.com/docker/docker/pkg/pubsub" grpc "google.golang.org/grpc" ) type PubsubService struct { pub *pubsub.Publisher } func NewPubsubService() *PubsubService { return &PubsubService{ pub: pubsub.NewPublisher(100*time.Millisecond, 10), } } func (p *PubsubService) Publish( ctx context.Context, arg *String, ) (*String, error) { str := arg.GetValue() p.pub.Publish(str) return &String{Value: str}, nil } func (p *PubsubService) Subscribe( arg *String, stream PubsubService_SubscribeServer, ) error { ch := p.pub.SubscribeTopic(func(v interface{}) bool { if key, ok := v.(string); ok { if strings.HasPrefix(key, arg.GetValue()) { return true } } return false }) for v := range ch { if err := stream.Send(&String{Value: v.(string)}); err != nil { return err } } return nil } func main() { grpcServer := grpc.NewServer() RegisterPubsubServiceServer(grpcServer, NewPubsubService()) listener, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal("listen err: ", err) } else { fmt.Println("Service Start.") } grpcServer.Serve(listener) } |
这里需要注意的…就是导入本地包那里,后面有空专门讲一下。
发布客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
package main import ( "context" "fmt" "log" grpc "google.golang.org/grpc" ) func main() { conn, err := grpc.Dial("192.168.1.101:1234", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() client := NewPubsubServiceClient(conn) str, err := client.Publish( context.Background(), &String{Value: "golang: hello Go"}, ) if err != nil { log.Fatal(err) } else { fmt.Println("Publish: ", str.GetValue()) } str, err = client.Publish( context.Background(), &String{Value: "docker: hello Docker"}, ) if err != nil { log.Fatal(err) } else { fmt.Println("Publish: ", str.GetValue()) } } |
接受客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
package main import ( "context" "fmt" "io" "log" grpc "google.golang.org/grpc" ) func main() { conn, err := grpc.Dial("192.168.1.101:1234", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() client := NewPubsubServiceClient(conn) stream, err := client.Subscribe( context.Background(), &String{Value: "golang:"}, ) if err != nil { log.Fatal(err) } for { reply, err := stream.Recv() if err != nil { if err == io.EOF { break } log.Fatal(err) } fmt.Println(reply.GetValue()) } } |
大致就这样吧。
【Go】基于gRPC框架的发布订阅模式
写的挺好的!