1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package raft
16
17 import (
18 "context"
19 "errors"
20
21 pb "go.etcd.io/etcd/raft/v3/raftpb"
22 )
23
24 type SnapshotStatus int
25
26 const (
27 SnapshotFinish SnapshotStatus = 1
28 SnapshotFailure SnapshotStatus = 2
29 )
30
31 var (
32 emptyState = pb.HardState{}
33
34
35 ErrStopped = errors.New("raft: stopped")
36 )
37
38
39
40 type SoftState struct {
41 Lead uint64
42 RaftState StateType
43 }
44
45 func (a *SoftState) equal(b *SoftState) bool {
46 return a.Lead == b.Lead && a.RaftState == b.RaftState
47 }
48
49
50
51
52 type Ready struct {
53
54
55
56 *SoftState
57
58
59
60
61 pb.HardState
62
63
64
65
66
67 ReadStates []ReadState
68
69
70
71 Entries []pb.Entry
72
73
74 Snapshot pb.Snapshot
75
76
77
78
79 CommittedEntries []pb.Entry
80
81
82
83
84
85 Messages []pb.Message
86
87
88
89 MustSync bool
90 }
91
92 func isHardStateEqual(a, b pb.HardState) bool {
93 return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit
94 }
95
96
97 func IsEmptyHardState(st pb.HardState) bool {
98 return isHardStateEqual(st, emptyState)
99 }
100
101
102 func IsEmptySnap(sp pb.Snapshot) bool {
103 return sp.Metadata.Index == 0
104 }
105
106 func (rd Ready) containsUpdates() bool {
107 return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
108 !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
109 len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
110 }
111
112
113
114
115 func (rd Ready) appliedCursor() uint64 {
116 if n := len(rd.CommittedEntries); n > 0 {
117 return rd.CommittedEntries[n-1].Index
118 }
119 if index := rd.Snapshot.Metadata.Index; index > 0 {
120 return index
121 }
122 return 0
123 }
124
125
126 type Node interface {
127
128
129 Tick()
130
131 Campaign(ctx context.Context) error
132
133
134 Propose(ctx context.Context, data []byte) error
135
136
137
138
139
140
141
142
143
144
145
146
147 ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error
148
149
150 Step(ctx context.Context, msg pb.Message) error
151
152
153
154
155
156
157 Ready() <-chan Ready
158
159
160
161
162
163
164
165
166
167
168 Advance()
169
170
171
172
173
174
175
176
177 ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState
178
179
180 TransferLeadership(ctx context.Context, lead, transferee uint64)
181
182
183
184
185
186
187
188 ReadIndex(ctx context.Context, rctx []byte) error
189
190
191 Status() Status
192
193 ReportUnreachable(id uint64)
194
195
196
197
198
199
200
201
202
203
204 ReportSnapshot(id uint64, status SnapshotStatus)
205
206 Stop()
207 }
208
209 type Peer struct {
210 ID uint64
211 Context []byte
212 }
213
214
215
216
217
218 func StartNode(c *Config, peers []Peer) Node {
219 if len(peers) == 0 {
220 panic("no peers given; use RestartNode instead")
221 }
222 rn, err := NewRawNode(c)
223 if err != nil {
224 panic(err)
225 }
226 rn.Bootstrap(peers)
227
228 n := newNode(rn)
229
230 go n.run()
231 return &n
232 }
233
234
235
236
237
238 func RestartNode(c *Config) Node {
239 rn, err := NewRawNode(c)
240 if err != nil {
241 panic(err)
242 }
243 n := newNode(rn)
244 go n.run()
245 return &n
246 }
247
248 type msgWithResult struct {
249 m pb.Message
250 result chan error
251 }
252
253
254 type node struct {
255 propc chan msgWithResult
256 recvc chan pb.Message
257 confc chan pb.ConfChangeV2
258 confstatec chan pb.ConfState
259 readyc chan Ready
260 advancec chan struct{}
261 tickc chan struct{}
262 done chan struct{}
263 stop chan struct{}
264 status chan chan Status
265
266 rn *RawNode
267 }
268
269 func newNode(rn *RawNode) node {
270 return node{
271 propc: make(chan msgWithResult),
272 recvc: make(chan pb.Message),
273 confc: make(chan pb.ConfChangeV2),
274 confstatec: make(chan pb.ConfState),
275 readyc: make(chan Ready),
276 advancec: make(chan struct{}),
277
278
279
280 tickc: make(chan struct{}, 128),
281 done: make(chan struct{}),
282 stop: make(chan struct{}),
283 status: make(chan chan Status),
284 rn: rn,
285 }
286 }
287
288 func (n *node) Stop() {
289 select {
290 case n.stop <- struct{}{}:
291
292 case <-n.done:
293
294 return
295 }
296
297 <-n.done
298 }
299
300 func (n *node) run() {
301 var propc chan msgWithResult
302 var readyc chan Ready
303 var advancec chan struct{}
304 var rd Ready
305
306 r := n.rn.raft
307
308 lead := None
309
310 for {
311 if advancec != nil {
312 readyc = nil
313 } else if n.rn.HasReady() {
314
315
316
317
318
319
320
321
322 rd = n.rn.readyWithoutAccept()
323 readyc = n.readyc
324 }
325
326 if lead != r.lead {
327 if r.hasLeader() {
328 if lead == None {
329 r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
330 } else {
331 r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
332 }
333 propc = n.propc
334 } else {
335 r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
336 propc = nil
337 }
338 lead = r.lead
339 }
340
341 select {
342
343
344
345 case pm := <-propc:
346 m := pm.m
347 m.From = r.id
348 err := r.Step(m)
349 if pm.result != nil {
350 pm.result <- err
351 close(pm.result)
352 }
353 case m := <-n.recvc:
354
355 if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
356 r.Step(m)
357 }
358 case cc := <-n.confc:
359 _, okBefore := r.prs.Progress[r.id]
360 cs := r.applyConfChange(cc)
361
362
363
364
365
366
367
368
369
370 if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
371 var found bool
372 outer:
373 for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
374 for _, id := range sl {
375 if id == r.id {
376 found = true
377 break outer
378 }
379 }
380 }
381 if !found {
382 propc = nil
383 }
384 }
385 select {
386 case n.confstatec <- cs:
387 case <-n.done:
388 }
389 case <-n.tickc:
390 n.rn.Tick()
391 case readyc <- rd:
392 n.rn.acceptReady(rd)
393 advancec = n.advancec
394 case <-advancec:
395 n.rn.Advance(rd)
396 rd = Ready{}
397 advancec = nil
398 case c := <-n.status:
399 c <- getStatus(r)
400 case <-n.stop:
401 close(n.done)
402 return
403 }
404 }
405 }
406
407
408
409 func (n *node) Tick() {
410 select {
411 case n.tickc <- struct{}{}:
412 case <-n.done:
413 default:
414 n.rn.raft.logger.Warningf("%x A tick missed to fire. Node blocks too long!", n.rn.raft.id)
415 }
416 }
417
418 func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) }
419
420 func (n *node) Propose(ctx context.Context, data []byte) error {
421 return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
422 }
423
424 func (n *node) Step(ctx context.Context, m pb.Message) error {
425
426 if IsLocalMsg(m.Type) {
427
428 return nil
429 }
430 return n.step(ctx, m)
431 }
432
433 func confChangeToMsg(c pb.ConfChangeI) (pb.Message, error) {
434 typ, data, err := pb.MarshalConfChange(c)
435 if err != nil {
436 return pb.Message{}, err
437 }
438 return pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: typ, Data: data}}}, nil
439 }
440
441 func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error {
442 msg, err := confChangeToMsg(cc)
443 if err != nil {
444 return err
445 }
446 return n.Step(ctx, msg)
447 }
448
449 func (n *node) step(ctx context.Context, m pb.Message) error {
450 return n.stepWithWaitOption(ctx, m, false)
451 }
452
453 func (n *node) stepWait(ctx context.Context, m pb.Message) error {
454 return n.stepWithWaitOption(ctx, m, true)
455 }
456
457
458
459 func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
460 if m.Type != pb.MsgProp {
461 select {
462 case n.recvc <- m:
463 return nil
464 case <-ctx.Done():
465 return ctx.Err()
466 case <-n.done:
467 return ErrStopped
468 }
469 }
470 ch := n.propc
471 pm := msgWithResult{m: m}
472 if wait {
473 pm.result = make(chan error, 1)
474 }
475 select {
476 case ch <- pm:
477 if !wait {
478 return nil
479 }
480 case <-ctx.Done():
481 return ctx.Err()
482 case <-n.done:
483 return ErrStopped
484 }
485 select {
486 case err := <-pm.result:
487 if err != nil {
488 return err
489 }
490 case <-ctx.Done():
491 return ctx.Err()
492 case <-n.done:
493 return ErrStopped
494 }
495 return nil
496 }
497
498 func (n *node) Ready() <-chan Ready { return n.readyc }
499
500 func (n *node) Advance() {
501 select {
502 case n.advancec <- struct{}{}:
503 case <-n.done:
504 }
505 }
506
507 func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
508 var cs pb.ConfState
509 select {
510 case n.confc <- cc.AsV2():
511 case <-n.done:
512 }
513 select {
514 case cs = <-n.confstatec:
515 case <-n.done:
516 }
517 return &cs
518 }
519
520 func (n *node) Status() Status {
521 c := make(chan Status)
522 select {
523 case n.status <- c:
524 return <-c
525 case <-n.done:
526 return Status{}
527 }
528 }
529
530 func (n *node) ReportUnreachable(id uint64) {
531 select {
532 case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}:
533 case <-n.done:
534 }
535 }
536
537 func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
538 rej := status == SnapshotFailure
539
540 select {
541 case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}:
542 case <-n.done:
543 }
544 }
545
546 func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
547 select {
548
549 case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:
550 case <-n.done:
551 case <-ctx.Done():
552 }
553 }
554
555 func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
556 return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
557 }
558
559 func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
560 rd := Ready{
561 Entries: r.raftLog.unstableEntries(),
562 CommittedEntries: r.raftLog.nextEnts(),
563 Messages: r.msgs,
564 }
565 if softSt := r.softState(); !softSt.equal(prevSoftSt) {
566 rd.SoftState = softSt
567 }
568 if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
569 rd.HardState = hardSt
570 }
571 if r.raftLog.unstable.snapshot != nil {
572 rd.Snapshot = *r.raftLog.unstable.snapshot
573 }
574 if len(r.readStates) != 0 {
575 rd.ReadStates = r.readStates
576 }
577 rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
578 return rd
579 }
580
581
582
583 func MustSync(st, prevst pb.HardState, entsnum int) bool {
584
585
586
587
588
589 return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
590 }
591
View as plain text