1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package clientv3
16
17 import (
18 "context"
19 "sync"
20 "time"
21
22 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
23 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
24
25 "go.uber.org/zap"
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/metadata"
28 )
29
30 type (
31 LeaseRevokeResponse pb.LeaseRevokeResponse
32 LeaseID int64
33 )
34
35
36 type LeaseGrantResponse struct {
37 *pb.ResponseHeader
38 ID LeaseID
39 TTL int64
40 Error string
41 }
42
43
44 type LeaseKeepAliveResponse struct {
45 *pb.ResponseHeader
46 ID LeaseID
47 TTL int64
48 }
49
50
51 type LeaseTimeToLiveResponse struct {
52 *pb.ResponseHeader
53 ID LeaseID `json:"id"`
54
55
56 TTL int64 `json:"ttl"`
57
58
59 GrantedTTL int64 `json:"granted-ttl"`
60
61
62 Keys [][]byte `json:"keys"`
63 }
64
65
66 type LeaseStatus struct {
67 ID LeaseID `json:"id"`
68
69 }
70
71
72 type LeaseLeasesResponse struct {
73 *pb.ResponseHeader
74 Leases []LeaseStatus `json:"leases"`
75 }
76
77 const (
78
79
80 defaultTTL = 5 * time.Second
81
82 NoLease LeaseID = 0
83
84
85 retryConnWait = 500 * time.Millisecond
86 )
87
88
89
90
91 var LeaseResponseChSize = 16
92
93
94
95
96 type ErrKeepAliveHalted struct {
97 Reason error
98 }
99
100 func (e ErrKeepAliveHalted) Error() string {
101 s := "etcdclient: leases keep alive halted"
102 if e.Reason != nil {
103 s += ": " + e.Reason.Error()
104 }
105 return s
106 }
107
108 type Lease interface {
109
110 Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
111
112
113 Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
114
115
116 TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
117
118
119 Leases(ctx context.Context) (*LeaseLeasesResponse, error)
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
137
138
139
140
141
142
143 KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
144
145
146
147 Close() error
148 }
149
150 type lessor struct {
151 mu sync.Mutex
152
153
154 donec chan struct{}
155 loopErr error
156
157 remote pb.LeaseClient
158
159 stream pb.Lease_LeaseKeepAliveClient
160 streamCancel context.CancelFunc
161
162 stopCtx context.Context
163 stopCancel context.CancelFunc
164
165 keepAlives map[LeaseID]*keepAlive
166
167
168
169 firstKeepAliveTimeout time.Duration
170
171
172 firstKeepAliveOnce sync.Once
173
174 callOpts []grpc.CallOption
175
176 lg *zap.Logger
177 }
178
179
180 type keepAlive struct {
181 chs []chan<- *LeaseKeepAliveResponse
182 ctxs []context.Context
183
184 deadline time.Time
185
186 nextKeepAlive time.Time
187
188 donec chan struct{}
189 }
190
191 func NewLease(c *Client) Lease {
192 return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
193 }
194
195 func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
196 l := &lessor{
197 donec: make(chan struct{}),
198 keepAlives: make(map[LeaseID]*keepAlive),
199 remote: remote,
200 firstKeepAliveTimeout: keepAliveTimeout,
201 lg: c.lg,
202 }
203 if l.firstKeepAliveTimeout == time.Second {
204 l.firstKeepAliveTimeout = defaultTTL
205 }
206 if c != nil {
207 l.callOpts = c.callOpts
208 }
209 reqLeaderCtx := WithRequireLeader(context.Background())
210 l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
211 return l
212 }
213
214 func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
215 r := &pb.LeaseGrantRequest{TTL: ttl}
216 resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
217 if err == nil {
218 gresp := &LeaseGrantResponse{
219 ResponseHeader: resp.GetHeader(),
220 ID: LeaseID(resp.ID),
221 TTL: resp.TTL,
222 Error: resp.Error,
223 }
224 return gresp, nil
225 }
226 return nil, toErr(ctx, err)
227 }
228
229 func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
230 r := &pb.LeaseRevokeRequest{ID: int64(id)}
231 resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
232 if err == nil {
233 return (*LeaseRevokeResponse)(resp), nil
234 }
235 return nil, toErr(ctx, err)
236 }
237
238 func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
239 r := toLeaseTimeToLiveRequest(id, opts...)
240 resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
241 if err != nil {
242 return nil, toErr(ctx, err)
243 }
244 gresp := &LeaseTimeToLiveResponse{
245 ResponseHeader: resp.GetHeader(),
246 ID: LeaseID(resp.ID),
247 TTL: resp.TTL,
248 GrantedTTL: resp.GrantedTTL,
249 Keys: resp.Keys,
250 }
251 return gresp, nil
252 }
253
254 func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
255 resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...)
256 if err == nil {
257 leases := make([]LeaseStatus, len(resp.Leases))
258 for i := range resp.Leases {
259 leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)}
260 }
261 return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
262 }
263 return nil, toErr(ctx, err)
264 }
265
266 func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
267 ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
268
269 l.mu.Lock()
270
271 select {
272 case <-l.donec:
273 err := l.loopErr
274 l.mu.Unlock()
275 close(ch)
276 return ch, ErrKeepAliveHalted{Reason: err}
277 default:
278 }
279 ka, ok := l.keepAlives[id]
280 if !ok {
281
282 ka = &keepAlive{
283 chs: []chan<- *LeaseKeepAliveResponse{ch},
284 ctxs: []context.Context{ctx},
285 deadline: time.Now().Add(l.firstKeepAliveTimeout),
286 nextKeepAlive: time.Now(),
287 donec: make(chan struct{}),
288 }
289 l.keepAlives[id] = ka
290 } else {
291
292 ka.ctxs = append(ka.ctxs, ctx)
293 ka.chs = append(ka.chs, ch)
294 }
295 l.mu.Unlock()
296
297 if ctx.Done() != nil {
298 go l.keepAliveCtxCloser(ctx, id, ka.donec)
299 }
300 l.firstKeepAliveOnce.Do(func() {
301 go l.recvKeepAliveLoop()
302 go l.deadlineLoop()
303 })
304
305 return ch, nil
306 }
307
308 func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
309 for {
310 resp, err := l.keepAliveOnce(ctx, id)
311 if err == nil {
312 if resp.TTL <= 0 {
313 err = rpctypes.ErrLeaseNotFound
314 }
315 return resp, err
316 }
317 if isHaltErr(ctx, err) {
318 return nil, toErr(ctx, err)
319 }
320 }
321 }
322
323 func (l *lessor) Close() error {
324 l.stopCancel()
325
326 l.firstKeepAliveOnce.Do(func() { close(l.donec) })
327 <-l.donec
328 return nil
329 }
330
331 func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) {
332 select {
333 case <-donec:
334 return
335 case <-l.donec:
336 return
337 case <-ctx.Done():
338 }
339
340 l.mu.Lock()
341 defer l.mu.Unlock()
342
343 ka, ok := l.keepAlives[id]
344 if !ok {
345 return
346 }
347
348
349 for i, c := range ka.ctxs {
350 if c == ctx {
351 close(ka.chs[i])
352 ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
353 ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
354 break
355 }
356 }
357
358 if len(ka.chs) == 0 {
359 delete(l.keepAlives, id)
360 }
361 }
362
363
364
365 func (l *lessor) closeRequireLeader() {
366 l.mu.Lock()
367 defer l.mu.Unlock()
368 for _, ka := range l.keepAlives {
369 reqIdxs := 0
370
371 for i, ctx := range ka.ctxs {
372 md, ok := metadata.FromOutgoingContext(ctx)
373 if !ok {
374 continue
375 }
376 ks := md[rpctypes.MetadataRequireLeaderKey]
377 if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader {
378 continue
379 }
380 close(ka.chs[i])
381 ka.chs[i] = nil
382 reqIdxs++
383 }
384 if reqIdxs == 0 {
385 continue
386 }
387
388 newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
389 newCtxs := make([]context.Context, len(newChs))
390 newIdx := 0
391 for i := range ka.chs {
392 if ka.chs[i] == nil {
393 continue
394 }
395 newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
396 newIdx++
397 }
398 ka.chs, ka.ctxs = newChs, newCtxs
399 }
400 }
401
402 func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (karesp *LeaseKeepAliveResponse, ferr error) {
403 cctx, cancel := context.WithCancel(ctx)
404 defer cancel()
405
406 stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
407 if err != nil {
408 return nil, toErr(ctx, err)
409 }
410
411 defer func() {
412 if err := stream.CloseSend(); err != nil {
413 if ferr == nil {
414 ferr = toErr(ctx, err)
415 }
416 return
417 }
418 }()
419
420 err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
421 if err != nil {
422 return nil, toErr(ctx, err)
423 }
424
425 resp, rerr := stream.Recv()
426 if rerr != nil {
427 return nil, toErr(ctx, rerr)
428 }
429
430 karesp = &LeaseKeepAliveResponse{
431 ResponseHeader: resp.GetHeader(),
432 ID: LeaseID(resp.ID),
433 TTL: resp.TTL,
434 }
435 return karesp, nil
436 }
437
438 func (l *lessor) recvKeepAliveLoop() (gerr error) {
439 defer func() {
440 l.mu.Lock()
441 close(l.donec)
442 l.loopErr = gerr
443 for _, ka := range l.keepAlives {
444 ka.close()
445 }
446 l.keepAlives = make(map[LeaseID]*keepAlive)
447 l.mu.Unlock()
448 }()
449
450 for {
451 stream, err := l.resetRecv()
452 if err != nil {
453 if canceledByCaller(l.stopCtx, err) {
454 return err
455 }
456 } else {
457 for {
458 resp, err := stream.Recv()
459 if err != nil {
460 if canceledByCaller(l.stopCtx, err) {
461 return err
462 }
463
464 if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
465 l.closeRequireLeader()
466 }
467 break
468 }
469
470 l.recvKeepAlive(resp)
471 }
472 }
473
474 select {
475 case <-time.After(retryConnWait):
476 case <-l.stopCtx.Done():
477 return l.stopCtx.Err()
478 }
479 }
480 }
481
482
483 func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
484 sctx, cancel := context.WithCancel(l.stopCtx)
485 stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, withMax(0))...)
486 if err != nil {
487 cancel()
488 return nil, err
489 }
490
491 l.mu.Lock()
492 defer l.mu.Unlock()
493 if l.stream != nil && l.streamCancel != nil {
494 l.streamCancel()
495 }
496
497 l.streamCancel = cancel
498 l.stream = stream
499
500 go l.sendKeepAliveLoop(stream)
501 return stream, nil
502 }
503
504
505 func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
506 karesp := &LeaseKeepAliveResponse{
507 ResponseHeader: resp.GetHeader(),
508 ID: LeaseID(resp.ID),
509 TTL: resp.TTL,
510 }
511
512 l.mu.Lock()
513 defer l.mu.Unlock()
514
515 ka, ok := l.keepAlives[karesp.ID]
516 if !ok {
517 return
518 }
519
520 if karesp.TTL <= 0 {
521
522 delete(l.keepAlives, karesp.ID)
523 ka.close()
524 return
525 }
526
527
528 nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
529 ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
530 for _, ch := range ka.chs {
531 select {
532 case ch <- karesp:
533 default:
534 if l.lg != nil {
535 l.lg.Warn("lease keepalive response queue is full; dropping response send",
536 zap.Int("queue-size", len(ch)),
537 zap.Int("queue-capacity", cap(ch)),
538 )
539 }
540 }
541
542 ka.nextKeepAlive = nextKeepAlive
543 }
544 }
545
546
547
548 func (l *lessor) deadlineLoop() {
549 for {
550 select {
551 case <-time.After(time.Second):
552 case <-l.donec:
553 return
554 }
555 now := time.Now()
556 l.mu.Lock()
557 for id, ka := range l.keepAlives {
558 if ka.deadline.Before(now) {
559
560 ka.close()
561 delete(l.keepAlives, id)
562 }
563 }
564 l.mu.Unlock()
565 }
566 }
567
568
569 func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
570 for {
571 var tosend []LeaseID
572
573 now := time.Now()
574 l.mu.Lock()
575 for id, ka := range l.keepAlives {
576 if ka.nextKeepAlive.Before(now) {
577 tosend = append(tosend, id)
578 }
579 }
580 l.mu.Unlock()
581
582 for _, id := range tosend {
583 r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
584 if err := stream.Send(r); err != nil {
585
586 return
587 }
588 }
589
590 select {
591 case <-time.After(retryConnWait):
592 case <-stream.Context().Done():
593 return
594 case <-l.donec:
595 return
596 case <-l.stopCtx.Done():
597 return
598 }
599 }
600 }
601
602 func (ka *keepAlive) close() {
603 close(ka.donec)
604 for _, ch := range ka.chs {
605 close(ch)
606 }
607 }
608
View as plain text