1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package grpcproxy
16
17 import (
18 "context"
19 "sync"
20
21 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
22 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
23 "go.etcd.io/etcd/client/v3"
24 "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
25
26 "go.uber.org/zap"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/metadata"
29 "google.golang.org/grpc/status"
30 )
31
32 type watchProxy struct {
33 cw clientv3.Watcher
34 ctx context.Context
35
36 leader *leader
37
38 ranges *watchRanges
39
40
41 mu sync.Mutex
42
43
44 wg sync.WaitGroup
45
46
47 kv clientv3.KV
48 lg *zap.Logger
49 }
50
51 func NewWatchProxy(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
52 cctx, cancel := context.WithCancel(ctx)
53 wp := &watchProxy{
54 cw: c.Watcher,
55 ctx: cctx,
56 leader: newLeader(cctx, c.Watcher),
57
58 kv: c.KV,
59 lg: lg,
60 }
61 wp.ranges = newWatchRanges(wp)
62 ch := make(chan struct{})
63 go func() {
64 defer close(ch)
65 <-wp.leader.stopNotify()
66 wp.mu.Lock()
67 select {
68 case <-wp.ctx.Done():
69 case <-wp.leader.disconnectNotify():
70 cancel()
71 }
72 <-wp.ctx.Done()
73 wp.mu.Unlock()
74 wp.wg.Wait()
75 wp.ranges.stop()
76 }()
77 return wp, ch
78 }
79
80 func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
81 wp.mu.Lock()
82 select {
83 case <-wp.ctx.Done():
84 wp.mu.Unlock()
85 select {
86 case <-wp.leader.disconnectNotify():
87 return status.Error(codes.Canceled, "the client connection is closing")
88 default:
89 return wp.ctx.Err()
90 }
91 default:
92 wp.wg.Add(1)
93 }
94 wp.mu.Unlock()
95
96 ctx, cancel := context.WithCancel(stream.Context())
97 wps := &watchProxyStream{
98 ranges: wp.ranges,
99 watchers: make(map[int64]*watcher),
100 stream: stream,
101 watchCh: make(chan *pb.WatchResponse, 1024),
102 ctx: ctx,
103 cancel: cancel,
104 kv: wp.kv,
105 lg: wp.lg,
106 }
107
108 var lostLeaderC <-chan struct{}
109 if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
110 v := md[rpctypes.MetadataRequireLeaderKey]
111 if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
112 lostLeaderC = wp.leader.lostNotify()
113
114
115 select {
116 case <-lostLeaderC:
117 wp.wg.Done()
118 return rpctypes.ErrNoLeader
119 default:
120 }
121 }
122 }
123
124
125
126 stopc := make(chan struct{}, 3)
127 go func() {
128 defer func() { stopc <- struct{}{} }()
129 wps.recvLoop()
130 }()
131 go func() {
132 defer func() { stopc <- struct{}{} }()
133 wps.sendLoop()
134 }()
135
136 go func() {
137 defer func() { stopc <- struct{}{} }()
138 select {
139 case <-lostLeaderC:
140 case <-ctx.Done():
141 case <-wp.ctx.Done():
142 }
143 }()
144
145 <-stopc
146 cancel()
147
148
149
150 go func() {
151 <-stopc
152 <-stopc
153 wps.close()
154 wp.wg.Done()
155 }()
156
157 select {
158 case <-lostLeaderC:
159 return rpctypes.ErrNoLeader
160 case <-wp.leader.disconnectNotify():
161 return status.Error(codes.Canceled, "the client connection is closing")
162 default:
163 return wps.ctx.Err()
164 }
165 }
166
167
168 type watchProxyStream struct {
169 ranges *watchRanges
170
171
172 mu sync.Mutex
173
174 watchers map[int64]*watcher
175
176 nextWatcherID int64
177
178 stream pb.Watch_WatchServer
179
180
181 watchCh chan *pb.WatchResponse
182
183 ctx context.Context
184 cancel context.CancelFunc
185
186
187 kv clientv3.KV
188 lg *zap.Logger
189 }
190
191 func (wps *watchProxyStream) close() {
192 var wg sync.WaitGroup
193 wps.cancel()
194 wps.mu.Lock()
195 wg.Add(len(wps.watchers))
196 for _, wpsw := range wps.watchers {
197 go func(w *watcher) {
198 wps.ranges.delete(w)
199 wg.Done()
200 }(wpsw)
201 }
202 wps.watchers = nil
203 wps.mu.Unlock()
204
205 wg.Wait()
206
207 close(wps.watchCh)
208 }
209
210 func (wps *watchProxyStream) checkPermissionForWatch(key, rangeEnd []byte) error {
211 if len(key) == 0 {
212
213
214 key = []byte{0}
215 rangeEnd = []byte{0}
216 }
217 req := &pb.RangeRequest{
218 Serializable: true,
219 Key: key,
220 RangeEnd: rangeEnd,
221 CountOnly: true,
222 Limit: 1,
223 }
224 _, err := wps.kv.Do(wps.ctx, RangeRequestToOp(req))
225 return err
226 }
227
228 func (wps *watchProxyStream) recvLoop() error {
229 for {
230 req, err := wps.stream.Recv()
231 if err != nil {
232 return err
233 }
234 switch uv := req.RequestUnion.(type) {
235 case *pb.WatchRequest_CreateRequest:
236 cr := uv.CreateRequest
237
238 if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil {
239 wps.watchCh <- &pb.WatchResponse{
240 Header: &pb.ResponseHeader{},
241 WatchId: clientv3.InvalidWatchID,
242 Created: true,
243 Canceled: true,
244 CancelReason: err.Error(),
245 }
246 continue
247 }
248
249 wps.mu.Lock()
250 w := &watcher{
251 wr: watchRange{string(cr.Key), string(cr.RangeEnd)},
252 id: wps.nextWatcherID,
253 wps: wps,
254
255 nextrev: cr.StartRevision,
256 progress: cr.ProgressNotify,
257 prevKV: cr.PrevKv,
258 filters: v3rpc.FiltersFromRequest(cr),
259 }
260 if !w.wr.valid() {
261 w.post(&pb.WatchResponse{WatchId: clientv3.InvalidWatchID, Created: true, Canceled: true})
262 wps.mu.Unlock()
263 continue
264 }
265 wps.nextWatcherID++
266 w.nextrev = cr.StartRevision
267 wps.watchers[w.id] = w
268 wps.ranges.add(w)
269 wps.mu.Unlock()
270 wps.lg.Debug("create watcher", zap.String("key", w.wr.key), zap.String("end", w.wr.end), zap.Int64("watcherId", wps.nextWatcherID))
271 case *pb.WatchRequest_CancelRequest:
272 wps.delete(uv.CancelRequest.WatchId)
273 wps.lg.Debug("cancel watcher", zap.Int64("watcherId", uv.CancelRequest.WatchId))
274 default:
275
276 wps.lg.Error("not supported request type by gRPC proxy", zap.Stringer("request", req))
277 }
278 }
279 }
280
281 func (wps *watchProxyStream) sendLoop() {
282 for {
283 select {
284 case wresp, ok := <-wps.watchCh:
285 if !ok {
286 return
287 }
288 if err := wps.stream.Send(wresp); err != nil {
289 return
290 }
291 case <-wps.ctx.Done():
292 return
293 }
294 }
295 }
296
297 func (wps *watchProxyStream) delete(id int64) {
298 wps.mu.Lock()
299 defer wps.mu.Unlock()
300
301 w, ok := wps.watchers[id]
302 if !ok {
303 return
304 }
305 wps.ranges.delete(w)
306 delete(wps.watchers, id)
307 resp := &pb.WatchResponse{
308 Header: &w.lastHeader,
309 WatchId: id,
310 Canceled: true,
311 }
312 wps.watchCh <- resp
313 }
314
View as plain text