...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package grpcproxy
16
17 import (
18 "context"
19 "sync"
20 "time"
21
22 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
23 clientv3 "go.etcd.io/etcd/client/v3"
24
25 "go.uber.org/zap"
26 )
27
28
29 type watchBroadcast struct {
30
31 cancel context.CancelFunc
32 donec chan struct{}
33
34
35 mu sync.RWMutex
36
37 nextrev int64
38
39 receivers map[*watcher]struct{}
40
41 responses int
42 lg *zap.Logger
43 }
44
45 func newWatchBroadcast(lg *zap.Logger, wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast {
46 cctx, cancel := context.WithCancel(wp.ctx)
47 wb := &watchBroadcast{
48 cancel: cancel,
49 nextrev: w.nextrev,
50 receivers: make(map[*watcher]struct{}),
51 donec: make(chan struct{}),
52 lg: lg,
53 }
54 wb.add(w)
55 go func() {
56 defer close(wb.donec)
57
58 opts := []clientv3.OpOption{
59 clientv3.WithRange(w.wr.end),
60 clientv3.WithProgressNotify(),
61 clientv3.WithRev(wb.nextrev),
62 clientv3.WithPrevKV(),
63 clientv3.WithCreatedNotify(),
64 }
65
66 cctx = withClientAuthToken(cctx, w.wps.stream.Context())
67
68 wch := wp.cw.Watch(cctx, w.wr.key, opts...)
69 wp.lg.Debug("watch", zap.String("key", w.wr.key))
70
71 for wr := range wch {
72 wb.bcast(wr)
73 update(wb)
74 }
75 }()
76 return wb
77 }
78
79 func (wb *watchBroadcast) bcast(wr clientv3.WatchResponse) {
80 wb.mu.Lock()
81 defer wb.mu.Unlock()
82
83 if wb.responses > 0 || wb.nextrev == 0 {
84 wb.nextrev = wr.Header.Revision + 1
85 }
86 wb.responses++
87 for r := range wb.receivers {
88 r.send(wr)
89 }
90 if len(wb.receivers) > 0 {
91 eventsCoalescing.Add(float64(len(wb.receivers) - 1))
92 }
93 }
94
95
96
97 func (wb *watchBroadcast) add(w *watcher) bool {
98 wb.mu.Lock()
99 defer wb.mu.Unlock()
100 if wb.nextrev > w.nextrev || (wb.nextrev == 0 && w.nextrev != 0) {
101
102
103 return false
104 }
105 if wb.responses == 0 {
106
107 wb.receivers[w] = struct{}{}
108 return true
109 }
110
111 ok := w.post(&pb.WatchResponse{
112 Header: &pb.ResponseHeader{
113
114
115 Revision: w.nextrev,
116
117 },
118 WatchId: w.id,
119 Created: true,
120 })
121 if !ok {
122 return false
123 }
124 wb.receivers[w] = struct{}{}
125 watchersCoalescing.Inc()
126
127 return true
128 }
129 func (wb *watchBroadcast) delete(w *watcher) {
130 wb.mu.Lock()
131 defer wb.mu.Unlock()
132 if _, ok := wb.receivers[w]; !ok {
133 panic("deleting missing watcher from broadcast")
134 }
135 delete(wb.receivers, w)
136 if len(wb.receivers) > 0 {
137
138 watchersCoalescing.Dec()
139 }
140 }
141
142 func (wb *watchBroadcast) size() int {
143 wb.mu.RLock()
144 defer wb.mu.RUnlock()
145 return len(wb.receivers)
146 }
147
148 func (wb *watchBroadcast) empty() bool { return wb.size() == 0 }
149
150 func (wb *watchBroadcast) stop() {
151 if !wb.empty() {
152
153 watchersCoalescing.Sub(float64(wb.size() - 1))
154 }
155
156 wb.cancel()
157
158 select {
159 case <-wb.donec:
160
161
162
163 case <-time.After(time.Second):
164 wb.lg.Error("failed to cancel etcd watcher")
165 }
166 }
167
View as plain text