来自Go语言高级编程,4.3.2章节
通过RPC构造一个简单的KV数据库,然后实现一系列方法,具体看代码即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
package main import ( "errors" "fmt" "log" "math/rand" "net" "net/rpc" "sync" "time" ) type KVStoreService struct { m map[string]string //存储数据 filter map[string]func(key string) //Watch调用时过滤器函数列表 mu sync.RWMutex //读写锁做多线程安全 } func NewKVStoreService() *KVStoreService { return &KVStoreService{ m: make(map[string]string), filter: make(map[string]func(key string)), } } func (p *KVStoreService) Get( key string, value *string, ) error { p.mu.RLock() defer p.mu.RUnlock() if v, ok := p.m[key]; ok { *value = v return nil } return errors.New(key + " not found.") } func (p *KVStoreService) Set( kv [2]string, reply *struct{}, ) error { p.mu.Lock() defer p.mu.Unlock() key, value := kv[0], kv[1] oldValue := p.m[key] if oldValue != value { //当值产生变化后,依次调用每一个过滤器函数 for _, fn := range p.filter { fn(key) } } p.m[key] = value return nil } func (p *KVStoreService) Watch( timeoutSecond int, //超时时间 keyChanged *string, //监控key ) error { id := fmt.Sprintf("watch-%s-%03d", time.Now(), rand.Int()) ch := make(chan string, 10) p.mu.Lock() p.filter[id] = func(key string) { ch <- key } p.mu.Unlock() select { case <-time.After(time.Duration(timeoutSecond) * time.Second): return errors.New("timeout") case key := <-ch: *keyChanged = key return nil } return nil } func main() { // 将KVStroeService对象注册为一个RPC服务 // 将对象中所有满足RPC规则的对象方法注册为 RPC函数 // 所有注册方法会放在KVStoreService服务空间执行 _ = rpc.RegisterName("KVStoreService", NewKVStoreService()) listener, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal(err) } conn, err := listener.Accept() if err != nil { log.Fatal(err) } defer conn.Close() rpc.ServeConn(conn) } |
然后接着看客户端的实现方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
package main import ( "fmt" "log" "net/rpc" ) func main() { quit := make(chan struct{}) client, err := rpc.Dial("tcp", "192.168.1.101:1234") if err != nil { log.Fatal(err) } //启动独立的Goroutine监控key的变化,同步阻塞,直到key变化或者超时 go func() { var keyChanged string err := client.Call("KVStoreService.Watch", 30, &keyChanged) if err != nil { log.Fatal(err) } fmt.Println("watch: ", keyChanged) quit <- struct{}{} }() var key string err = client.Call( "KVStoreService.Get", "abc", &key) if err != nil { log.Println(err) } else { fmt.Println("get key: ", key) } err = client.Call( "KVStoreService.Set", [2]string{"abc", "abc-value"}, new(struct{})) if err != nil { log.Fatal(err) } err = client.Call("KVStoreService.Get", "abc", &key) if err != nil { log.Println(err) } else { fmt.Println("get key: ", key) } <-quit } |
以上大致就是两边的实现方案了。
比较好理解。
【Go】基于RPC实现Watch功能