...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package namespace
16
17 import (
18 "context"
19 "sync"
20
21 "go.etcd.io/etcd/client/v3"
22 )
23
24 type watcherPrefix struct {
25 clientv3.Watcher
26 pfx string
27
28 wg sync.WaitGroup
29 stopc chan struct{}
30 stopOnce sync.Once
31 }
32
33
34
35
36 func NewWatcher(w clientv3.Watcher, prefix string) clientv3.Watcher {
37 return &watcherPrefix{Watcher: w, pfx: prefix, stopc: make(chan struct{})}
38 }
39
40 func (w *watcherPrefix) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
41
42 op := clientv3.OpGet(key, opts...)
43 end := op.RangeBytes()
44 pfxBegin, pfxEnd := prefixInterval(w.pfx, []byte(key), end)
45 if pfxEnd != nil {
46 opts = append(opts, clientv3.WithRange(string(pfxEnd)))
47 }
48
49 wch := w.Watcher.Watch(ctx, string(pfxBegin), opts...)
50
51
52 pfxWch := make(chan clientv3.WatchResponse)
53 w.wg.Add(1)
54 go func() {
55 defer func() {
56 close(pfxWch)
57 w.wg.Done()
58 }()
59 for wr := range wch {
60 for i := range wr.Events {
61 wr.Events[i].Kv.Key = wr.Events[i].Kv.Key[len(w.pfx):]
62 if wr.Events[i].PrevKv != nil {
63 wr.Events[i].PrevKv.Key = wr.Events[i].Kv.Key
64 }
65 }
66 select {
67 case pfxWch <- wr:
68 case <-ctx.Done():
69 return
70 case <-w.stopc:
71 return
72 }
73 }
74 }()
75 return pfxWch
76 }
77
78 func (w *watcherPrefix) Close() error {
79 err := w.Watcher.Close()
80 w.stopOnce.Do(func() { close(w.stopc) })
81 w.wg.Wait()
82 return err
83 }
84
View as plain text