1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package confchange
16
17 import (
18 "errors"
19 "fmt"
20 "strings"
21
22 "go.etcd.io/etcd/raft/v3/quorum"
23 pb "go.etcd.io/etcd/raft/v3/raftpb"
24 "go.etcd.io/etcd/raft/v3/tracker"
25 )
26
27
28
29
30
31 type Changer struct {
32 Tracker tracker.ProgressTracker
33 LastIndex uint64
34 }
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
52 cfg, prs, err := c.checkAndCopy()
53 if err != nil {
54 return c.err(err)
55 }
56 if joint(cfg) {
57 err := errors.New("config is already joint")
58 return c.err(err)
59 }
60 if len(incoming(cfg.Voters)) == 0 {
61
62
63 err := errors.New("can't make a zero-voter config joint")
64 return c.err(err)
65 }
66
67 *outgoingPtr(&cfg.Voters) = quorum.MajorityConfig{}
68
69 for id := range incoming(cfg.Voters) {
70 outgoing(cfg.Voters)[id] = struct{}{}
71 }
72
73 if err := c.apply(&cfg, prs, ccs...); err != nil {
74 return c.err(err)
75 }
76 cfg.AutoLeave = autoLeave
77 return checkAndReturn(cfg, prs)
78 }
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
95 cfg, prs, err := c.checkAndCopy()
96 if err != nil {
97 return c.err(err)
98 }
99 if !joint(cfg) {
100 err := errors.New("can't leave a non-joint config")
101 return c.err(err)
102 }
103 if len(outgoing(cfg.Voters)) == 0 {
104 err := fmt.Errorf("configuration is not joint: %v", cfg)
105 return c.err(err)
106 }
107 for id := range cfg.LearnersNext {
108 nilAwareAdd(&cfg.Learners, id)
109 prs[id].IsLearner = true
110 }
111 cfg.LearnersNext = nil
112
113 for id := range outgoing(cfg.Voters) {
114 _, isVoter := incoming(cfg.Voters)[id]
115 _, isLearner := cfg.Learners[id]
116
117 if !isVoter && !isLearner {
118 delete(prs, id)
119 }
120 }
121 *outgoingPtr(&cfg.Voters) = nil
122 cfg.AutoLeave = false
123
124 return checkAndReturn(cfg, prs)
125 }
126
127
128
129
130
131
132 func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
133 cfg, prs, err := c.checkAndCopy()
134 if err != nil {
135 return c.err(err)
136 }
137 if joint(cfg) {
138 err := errors.New("can't apply simple config change in joint config")
139 return c.err(err)
140 }
141 if err := c.apply(&cfg, prs, ccs...); err != nil {
142 return c.err(err)
143 }
144 if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
145 return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config")
146 }
147
148 return checkAndReturn(cfg, prs)
149 }
150
151
152
153
154 func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChangeSingle) error {
155 for _, cc := range ccs {
156 if cc.NodeID == 0 {
157
158
159
160 continue
161 }
162 switch cc.Type {
163 case pb.ConfChangeAddNode:
164 c.makeVoter(cfg, prs, cc.NodeID)
165 case pb.ConfChangeAddLearnerNode:
166 c.makeLearner(cfg, prs, cc.NodeID)
167 case pb.ConfChangeRemoveNode:
168 c.remove(cfg, prs, cc.NodeID)
169 case pb.ConfChangeUpdateNode:
170 default:
171 return fmt.Errorf("unexpected conf type %d", cc.Type)
172 }
173 }
174 if len(incoming(cfg.Voters)) == 0 {
175 return errors.New("removed all voters")
176 }
177 return nil
178 }
179
180
181
182 func (c Changer) makeVoter(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
183 pr := prs[id]
184 if pr == nil {
185 c.initProgress(cfg, prs, id, false )
186 return
187 }
188
189 pr.IsLearner = false
190 nilAwareDelete(&cfg.Learners, id)
191 nilAwareDelete(&cfg.LearnersNext, id)
192 incoming(cfg.Voters)[id] = struct{}{}
193 }
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208 func (c Changer) makeLearner(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
209 pr := prs[id]
210 if pr == nil {
211 c.initProgress(cfg, prs, id, true )
212 return
213 }
214 if pr.IsLearner {
215 return
216 }
217
218 c.remove(cfg, prs, id)
219
220 prs[id] = pr
221
222
223
224
225
226 if _, onRight := outgoing(cfg.Voters)[id]; onRight {
227 nilAwareAdd(&cfg.LearnersNext, id)
228 } else {
229 pr.IsLearner = true
230 nilAwareAdd(&cfg.Learners, id)
231 }
232 }
233
234
235 func (c Changer) remove(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
236 if _, ok := prs[id]; !ok {
237 return
238 }
239
240 delete(incoming(cfg.Voters), id)
241 nilAwareDelete(&cfg.Learners, id)
242 nilAwareDelete(&cfg.LearnersNext, id)
243
244
245 if _, onRight := outgoing(cfg.Voters)[id]; !onRight {
246 delete(prs, id)
247 }
248 }
249
250
251 func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id uint64, isLearner bool) {
252 if !isLearner {
253 incoming(cfg.Voters)[id] = struct{}{}
254 } else {
255 nilAwareAdd(&cfg.Learners, id)
256 }
257 prs[id] = &tracker.Progress{
258
259
260
261
262
263
264
265
266 Next: c.LastIndex,
267 Match: 0,
268 Inflights: tracker.NewInflights(c.Tracker.MaxInflight),
269 IsLearner: isLearner,
270
271
272
273 RecentActive: true,
274 }
275 }
276
277
278
279
280 func checkInvariants(cfg tracker.Config, prs tracker.ProgressMap) error {
281
282
283
284
285
286
287 for _, ids := range []map[uint64]struct{}{
288 cfg.Voters.IDs(),
289 cfg.Learners,
290 cfg.LearnersNext,
291 } {
292 for id := range ids {
293 if _, ok := prs[id]; !ok {
294 return fmt.Errorf("no progress for %d", id)
295 }
296 }
297 }
298
299
300
301 for id := range cfg.LearnersNext {
302 if _, ok := outgoing(cfg.Voters)[id]; !ok {
303 return fmt.Errorf("%d is in LearnersNext, but not Voters[1]", id)
304 }
305 if prs[id].IsLearner {
306 return fmt.Errorf("%d is in LearnersNext, but is already marked as learner", id)
307 }
308 }
309
310 for id := range cfg.Learners {
311 if _, ok := outgoing(cfg.Voters)[id]; ok {
312 return fmt.Errorf("%d is in Learners and Voters[1]", id)
313 }
314 if _, ok := incoming(cfg.Voters)[id]; ok {
315 return fmt.Errorf("%d is in Learners and Voters[0]", id)
316 }
317 if !prs[id].IsLearner {
318 return fmt.Errorf("%d is in Learners, but is not marked as learner", id)
319 }
320 }
321
322 if !joint(cfg) {
323
324 if outgoing(cfg.Voters) != nil {
325 return fmt.Errorf("cfg.Voters[1] must be nil when not joint")
326 }
327 if cfg.LearnersNext != nil {
328 return fmt.Errorf("cfg.LearnersNext must be nil when not joint")
329 }
330 if cfg.AutoLeave {
331 return fmt.Errorf("AutoLeave must be false when not joint")
332 }
333 }
334
335 return nil
336 }
337
338
339
340
341 func (c Changer) checkAndCopy() (tracker.Config, tracker.ProgressMap, error) {
342 cfg := c.Tracker.Config.Clone()
343 prs := tracker.ProgressMap{}
344
345 for id, pr := range c.Tracker.Progress {
346
347 ppr := *pr
348 prs[id] = &ppr
349 }
350 return checkAndReturn(cfg, prs)
351 }
352
353
354
355 func checkAndReturn(cfg tracker.Config, prs tracker.ProgressMap) (tracker.Config, tracker.ProgressMap, error) {
356 if err := checkInvariants(cfg, prs); err != nil {
357 return tracker.Config{}, tracker.ProgressMap{}, err
358 }
359 return cfg, prs, nil
360 }
361
362
363 func (c Changer) err(err error) (tracker.Config, tracker.ProgressMap, error) {
364 return tracker.Config{}, nil, err
365 }
366
367
368 func nilAwareAdd(m *map[uint64]struct{}, id uint64) {
369 if *m == nil {
370 *m = map[uint64]struct{}{}
371 }
372 (*m)[id] = struct{}{}
373 }
374
375
376 func nilAwareDelete(m *map[uint64]struct{}, id uint64) {
377 if *m == nil {
378 return
379 }
380 delete(*m, id)
381 if len(*m) == 0 {
382 *m = nil
383 }
384 }
385
386
387
388 func symdiff(l, r map[uint64]struct{}) int {
389 var n int
390 pairs := [][2]quorum.MajorityConfig{
391 {l, r},
392 {r, l},
393 }
394 for _, p := range pairs {
395 for id := range p[0] {
396 if _, ok := p[1][id]; !ok {
397 n++
398 }
399 }
400 }
401 return n
402 }
403
404 func joint(cfg tracker.Config) bool {
405 return len(outgoing(cfg.Voters)) > 0
406 }
407
408 func incoming(voters quorum.JointConfig) quorum.MajorityConfig { return voters[0] }
409 func outgoing(voters quorum.JointConfig) quorum.MajorityConfig { return voters[1] }
410 func outgoingPtr(voters *quorum.JointConfig) *quorum.MajorityConfig { return &voters[1] }
411
412
413
414 func Describe(ccs ...pb.ConfChangeSingle) string {
415 var buf strings.Builder
416 for _, cc := range ccs {
417 if buf.Len() > 0 {
418 buf.WriteByte(' ')
419 }
420 fmt.Fprintf(&buf, "%s(%d)", cc.Type, cc.NodeID)
421 }
422 return buf.String()
423 }
424
View as plain text