...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package grpcproxy
16
17 import (
18 "context"
19 "math"
20 "sync"
21
22 "go.etcd.io/etcd/client/v3"
23
24 "golang.org/x/time/rate"
25 )
26
27 const (
28 lostLeaderKey = "__lostleader"
29 retryPerSecond = 10
30 )
31
32 type leader struct {
33 ctx context.Context
34 w clientv3.Watcher
35
36 mu sync.RWMutex
37 leaderc chan struct{}
38 disconnc chan struct{}
39 donec chan struct{}
40 }
41
42 func newLeader(ctx context.Context, w clientv3.Watcher) *leader {
43 l := &leader{
44 ctx: clientv3.WithRequireLeader(ctx),
45 w: w,
46 leaderc: make(chan struct{}),
47 disconnc: make(chan struct{}),
48 donec: make(chan struct{}),
49 }
50
51 close(l.leaderc)
52 go l.recvLoop()
53 return l
54 }
55
56 func (l *leader) recvLoop() {
57 defer close(l.donec)
58
59 limiter := rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond)
60 rev := int64(math.MaxInt64 - 2)
61 for limiter.Wait(l.ctx) == nil {
62 wch := l.w.Watch(l.ctx, lostLeaderKey, clientv3.WithRev(rev), clientv3.WithCreatedNotify())
63 cresp, ok := <-wch
64 if !ok {
65 l.loseLeader()
66 continue
67 }
68 if cresp.Err() != nil {
69 l.loseLeader()
70 if clientv3.IsConnCanceled(cresp.Err()) {
71 close(l.disconnc)
72 return
73 }
74 continue
75 }
76 l.gotLeader()
77 <-wch
78 l.loseLeader()
79 }
80 }
81
82 func (l *leader) loseLeader() {
83 l.mu.RLock()
84 defer l.mu.RUnlock()
85 select {
86 case <-l.leaderc:
87 default:
88 close(l.leaderc)
89 }
90 }
91
92
93 func (l *leader) gotLeader() {
94 l.mu.Lock()
95 defer l.mu.Unlock()
96 select {
97 case <-l.leaderc:
98 l.leaderc = make(chan struct{})
99 default:
100 }
101 }
102
103 func (l *leader) disconnectNotify() <-chan struct{} { return l.disconnc }
104
105 func (l *leader) stopNotify() <-chan struct{} { return l.donec }
106
107
108
109 func (l *leader) lostNotify() <-chan struct{} {
110 l.mu.RLock()
111 defer l.mu.RUnlock()
112 return l.leaderc
113 }
114
View as plain text