1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package grpcproxy
16
17 import (
18 "context"
19 "io"
20 "sync"
21 "sync/atomic"
22 "time"
23
24 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
25 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
26 "go.etcd.io/etcd/client/v3"
27
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/metadata"
31 "google.golang.org/grpc/status"
32 )
33
34 type leaseProxy struct {
35
36 leaseClient pb.LeaseClient
37
38 lessor clientv3.Lease
39
40 ctx context.Context
41
42 leader *leader
43
44
45 mu sync.RWMutex
46
47
48 wg sync.WaitGroup
49 }
50
51 func NewLeaseProxy(ctx context.Context, c *clientv3.Client) (pb.LeaseServer, <-chan struct{}) {
52 cctx, cancel := context.WithCancel(ctx)
53 lp := &leaseProxy{
54 leaseClient: pb.NewLeaseClient(c.ActiveConnection()),
55 lessor: c.Lease,
56 ctx: cctx,
57 leader: newLeader(cctx, c.Watcher),
58 }
59 ch := make(chan struct{})
60 go func() {
61 defer close(ch)
62 <-lp.leader.stopNotify()
63 lp.mu.Lock()
64 select {
65 case <-lp.ctx.Done():
66 case <-lp.leader.disconnectNotify():
67 cancel()
68 }
69 <-lp.ctx.Done()
70 lp.mu.Unlock()
71 lp.wg.Wait()
72 }()
73 return lp, ch
74 }
75
76 func (lp *leaseProxy) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
77 rp, err := lp.leaseClient.LeaseGrant(ctx, cr, grpc.WaitForReady(true))
78 if err != nil {
79 return nil, err
80 }
81 lp.leader.gotLeader()
82 return rp, nil
83 }
84
85 func (lp *leaseProxy) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
86 r, err := lp.lessor.Revoke(ctx, clientv3.LeaseID(rr.ID))
87 if err != nil {
88 return nil, err
89 }
90 lp.leader.gotLeader()
91 return (*pb.LeaseRevokeResponse)(r), nil
92 }
93
94 func (lp *leaseProxy) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
95 var (
96 r *clientv3.LeaseTimeToLiveResponse
97 err error
98 )
99 if rr.Keys {
100 r, err = lp.lessor.TimeToLive(ctx, clientv3.LeaseID(rr.ID), clientv3.WithAttachedKeys())
101 } else {
102 r, err = lp.lessor.TimeToLive(ctx, clientv3.LeaseID(rr.ID))
103 }
104 if err != nil {
105 return nil, err
106 }
107 rp := &pb.LeaseTimeToLiveResponse{
108 Header: r.ResponseHeader,
109 ID: int64(r.ID),
110 TTL: r.TTL,
111 GrantedTTL: r.GrantedTTL,
112 Keys: r.Keys,
113 }
114 return rp, err
115 }
116
117 func (lp *leaseProxy) LeaseLeases(ctx context.Context, rr *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
118 r, err := lp.lessor.Leases(ctx)
119 if err != nil {
120 return nil, err
121 }
122 leases := make([]*pb.LeaseStatus, len(r.Leases))
123 for i := range r.Leases {
124 leases[i] = &pb.LeaseStatus{ID: int64(r.Leases[i].ID)}
125 }
126 rp := &pb.LeaseLeasesResponse{
127 Header: r.ResponseHeader,
128 Leases: leases,
129 }
130 return rp, err
131 }
132
133 func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
134 lp.mu.Lock()
135 select {
136 case <-lp.ctx.Done():
137 lp.mu.Unlock()
138 return lp.ctx.Err()
139 default:
140 lp.wg.Add(1)
141 }
142 lp.mu.Unlock()
143
144 ctx, cancel := context.WithCancel(stream.Context())
145 lps := leaseProxyStream{
146 stream: stream,
147 lessor: lp.lessor,
148 keepAliveLeases: make(map[int64]*atomicCounter),
149 respc: make(chan *pb.LeaseKeepAliveResponse),
150 ctx: ctx,
151 cancel: cancel,
152 }
153
154 errc := make(chan error, 2)
155
156 var lostLeaderC <-chan struct{}
157 if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
158 v := md[rpctypes.MetadataRequireLeaderKey]
159 if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
160 lostLeaderC = lp.leader.lostNotify()
161
162
163 select {
164 case <-lostLeaderC:
165 lp.wg.Done()
166 return rpctypes.ErrNoLeader
167 default:
168 }
169 }
170 }
171 stopc := make(chan struct{}, 3)
172 go func() {
173 defer func() { stopc <- struct{}{} }()
174 if err := lps.recvLoop(); err != nil {
175 errc <- err
176 }
177 }()
178
179 go func() {
180 defer func() { stopc <- struct{}{} }()
181 if err := lps.sendLoop(); err != nil {
182 errc <- err
183 }
184 }()
185
186
187 go func() {
188 defer func() { stopc <- struct{}{} }()
189 select {
190 case <-lostLeaderC:
191 case <-ctx.Done():
192 case <-lp.ctx.Done():
193 }
194 }()
195
196 var err error
197 select {
198 case <-stopc:
199 stopc <- struct{}{}
200 case err = <-errc:
201 }
202 cancel()
203
204
205
206 go func() {
207 <-stopc
208 <-stopc
209 <-stopc
210 lps.close()
211 close(errc)
212 lp.wg.Done()
213 }()
214
215 select {
216 case <-lostLeaderC:
217 return rpctypes.ErrNoLeader
218 case <-lp.leader.disconnectNotify():
219 return status.Error(codes.Canceled, "the client connection is closing")
220 default:
221 if err != nil {
222 return err
223 }
224 return ctx.Err()
225 }
226 }
227
228 type leaseProxyStream struct {
229 stream pb.Lease_LeaseKeepAliveServer
230
231 lessor clientv3.Lease
232
233 wg sync.WaitGroup
234
235 mu sync.RWMutex
236
237 keepAliveLeases map[int64]*atomicCounter
238
239 respc chan *pb.LeaseKeepAliveResponse
240
241 ctx context.Context
242 cancel context.CancelFunc
243 }
244
245 func (lps *leaseProxyStream) recvLoop() error {
246 for {
247 rr, err := lps.stream.Recv()
248 if err == io.EOF {
249 return nil
250 }
251 if err != nil {
252 return err
253 }
254 lps.mu.Lock()
255 neededResps, ok := lps.keepAliveLeases[rr.ID]
256 if !ok {
257 neededResps = &atomicCounter{}
258 lps.keepAliveLeases[rr.ID] = neededResps
259 lps.wg.Add(1)
260 go func() {
261 defer lps.wg.Done()
262 if err := lps.keepAliveLoop(rr.ID, neededResps); err != nil {
263 lps.cancel()
264 }
265 }()
266 }
267 neededResps.add(1)
268 lps.mu.Unlock()
269 }
270 }
271
272 func (lps *leaseProxyStream) keepAliveLoop(leaseID int64, neededResps *atomicCounter) error {
273 cctx, ccancel := context.WithCancel(lps.ctx)
274 defer ccancel()
275 respc, err := lps.lessor.KeepAlive(cctx, clientv3.LeaseID(leaseID))
276 if err != nil {
277 return err
278 }
279
280 var ticker <-chan time.Time
281 for {
282 select {
283 case <-ticker:
284 lps.mu.Lock()
285
286
287 if neededResps.get() > 0 {
288 lps.mu.Unlock()
289 ticker = nil
290 continue
291 }
292 delete(lps.keepAliveLeases, leaseID)
293 lps.mu.Unlock()
294 return nil
295 case rp, ok := <-respc:
296 if !ok {
297 lps.mu.Lock()
298 delete(lps.keepAliveLeases, leaseID)
299 lps.mu.Unlock()
300 if neededResps.get() == 0 {
301 return nil
302 }
303 ttlResp, err := lps.lessor.TimeToLive(cctx, clientv3.LeaseID(leaseID))
304 if err != nil {
305 return err
306 }
307 r := &pb.LeaseKeepAliveResponse{
308 Header: ttlResp.ResponseHeader,
309 ID: int64(ttlResp.ID),
310 TTL: ttlResp.TTL,
311 }
312 for neededResps.get() > 0 {
313 select {
314 case lps.respc <- r:
315 neededResps.add(-1)
316 case <-lps.ctx.Done():
317 return nil
318 }
319 }
320 return nil
321 }
322 if neededResps.get() == 0 {
323 continue
324 }
325 ticker = time.After(time.Duration(rp.TTL) * time.Second)
326 r := &pb.LeaseKeepAliveResponse{
327 Header: rp.ResponseHeader,
328 ID: int64(rp.ID),
329 TTL: rp.TTL,
330 }
331 lps.replyToClient(r, neededResps)
332 }
333 }
334 }
335
336 func (lps *leaseProxyStream) replyToClient(r *pb.LeaseKeepAliveResponse, neededResps *atomicCounter) {
337 timer := time.After(500 * time.Millisecond)
338 for neededResps.get() > 0 {
339 select {
340 case lps.respc <- r:
341 neededResps.add(-1)
342 case <-timer:
343 return
344 case <-lps.ctx.Done():
345 return
346 }
347 }
348 }
349
350 func (lps *leaseProxyStream) sendLoop() error {
351 for {
352 select {
353 case lrp, ok := <-lps.respc:
354 if !ok {
355 return nil
356 }
357 if err := lps.stream.Send(lrp); err != nil {
358 return err
359 }
360 case <-lps.ctx.Done():
361 return lps.ctx.Err()
362 }
363 }
364 }
365
366 func (lps *leaseProxyStream) close() {
367 lps.cancel()
368 lps.wg.Wait()
369
370
371 close(lps.respc)
372 }
373
374 type atomicCounter struct {
375 counter int64
376 }
377
378 func (ac *atomicCounter) add(delta int64) {
379 atomic.AddInt64(&ac.counter, delta)
380 }
381
382 func (ac *atomicCounter) get() int64 {
383 return atomic.LoadInt64(&ac.counter)
384 }
385
View as plain text