1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package concurrency
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21
22 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
23 "go.etcd.io/etcd/api/v3/mvccpb"
24 v3 "go.etcd.io/etcd/client/v3"
25 )
26
27 var (
28 ErrElectionNotLeader = errors.New("election: not leader")
29 ErrElectionNoLeader = errors.New("election: no leader")
30 )
31
32 type Election struct {
33 session *Session
34
35 keyPrefix string
36
37 leaderKey string
38 leaderRev int64
39 leaderSession *Session
40 hdr *pb.ResponseHeader
41 }
42
43
44 func NewElection(s *Session, pfx string) *Election {
45 return &Election{session: s, keyPrefix: pfx + "/"}
46 }
47
48
49 func ResumeElection(s *Session, pfx string, leaderKey string, leaderRev int64) *Election {
50 return &Election{
51 keyPrefix: pfx,
52 session: s,
53 leaderKey: leaderKey,
54 leaderRev: leaderRev,
55 leaderSession: s,
56 }
57 }
58
59
60
61
62
63
64
65
66
67
68
69 func (e *Election) Campaign(ctx context.Context, val string) error {
70 s := e.session
71 client := e.session.Client()
72
73 k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
74 txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
75 txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
76 txn = txn.Else(v3.OpGet(k))
77 resp, err := txn.Commit()
78 if err != nil {
79 return err
80 }
81 e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
82 if !resp.Succeeded {
83 kv := resp.Responses[0].GetResponseRange().Kvs[0]
84 e.leaderRev = kv.CreateRevision
85 if string(kv.Value) != val {
86 if err = e.Proclaim(ctx, val); err != nil {
87 e.Resign(ctx)
88 return err
89 }
90 }
91 }
92
93 _, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
94 if err != nil {
95
96 select {
97 case <-ctx.Done():
98 e.Resign(client.Ctx())
99 default:
100 e.leaderSession = nil
101 }
102 return err
103 }
104 e.hdr = resp.Header
105
106 return nil
107 }
108
109
110 func (e *Election) Proclaim(ctx context.Context, val string) error {
111 if e.leaderSession == nil {
112 return ErrElectionNotLeader
113 }
114 client := e.session.Client()
115 cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
116 txn := client.Txn(ctx).If(cmp)
117 txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease())))
118 tresp, terr := txn.Commit()
119 if terr != nil {
120 return terr
121 }
122 if !tresp.Succeeded {
123 e.leaderKey = ""
124 return ErrElectionNotLeader
125 }
126
127 e.hdr = tresp.Header
128 return nil
129 }
130
131
132 func (e *Election) Resign(ctx context.Context) (err error) {
133 if e.leaderSession == nil {
134 return nil
135 }
136 client := e.session.Client()
137 cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
138 resp, err := client.Txn(ctx).If(cmp).Then(v3.OpDelete(e.leaderKey)).Commit()
139 if err == nil {
140 e.hdr = resp.Header
141 }
142 e.leaderKey = ""
143 e.leaderSession = nil
144 return err
145 }
146
147
148 func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) {
149 client := e.session.Client()
150 resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
151 if err != nil {
152 return nil, err
153 } else if len(resp.Kvs) == 0 {
154
155 return nil, ErrElectionNoLeader
156 }
157 return resp, nil
158 }
159
160
161
162
163
164
165
166
167 func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse {
168 retc := make(chan v3.GetResponse)
169 go e.observe(ctx, retc)
170 return retc
171 }
172
173 func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
174 client := e.session.Client()
175
176 defer close(ch)
177 for {
178 resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
179 if err != nil {
180 return
181 }
182
183 var kv *mvccpb.KeyValue
184 var hdr *pb.ResponseHeader
185
186 if len(resp.Kvs) == 0 {
187 cctx, cancel := context.WithCancel(ctx)
188
189 opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
190 wch := client.Watch(cctx, e.keyPrefix, opts...)
191 for kv == nil {
192 wr, ok := <-wch
193 if !ok || wr.Err() != nil {
194 cancel()
195 return
196 }
197
198 for _, ev := range wr.Events {
199 if ev.Type == mvccpb.PUT {
200 hdr, kv = &wr.Header, ev.Kv
201
202
203 hdr.Revision = kv.ModRevision
204 break
205 }
206 }
207 }
208 cancel()
209 } else {
210 hdr, kv = resp.Header, resp.Kvs[0]
211 }
212
213 select {
214 case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:
215 case <-ctx.Done():
216 return
217 }
218
219 cctx, cancel := context.WithCancel(ctx)
220 wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
221 keyDeleted := false
222 for !keyDeleted {
223 wr, ok := <-wch
224 if !ok {
225 cancel()
226 return
227 }
228 for _, ev := range wr.Events {
229 if ev.Type == mvccpb.DELETE {
230 keyDeleted = true
231 break
232 }
233 resp.Header = &wr.Header
234 resp.Kvs = []*mvccpb.KeyValue{ev.Kv}
235 select {
236 case ch <- *resp:
237 case <-cctx.Done():
238 cancel()
239 return
240 }
241 }
242 }
243 cancel()
244 }
245 }
246
247
248 func (e *Election) Key() string { return e.leaderKey }
249
250
251 func (e *Election) Rev() int64 { return e.leaderRev }
252
253
254 func (e *Election) Header() *pb.ResponseHeader { return e.hdr }
255
View as plain text