1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package etcdserver
16
17 import (
18 "context"
19 "encoding/json"
20 "fmt"
21 "io/ioutil"
22 "math"
23 "net/http"
24 "os"
25 "path"
26 "path/filepath"
27 "reflect"
28 "sync"
29 "testing"
30 "time"
31
32 "github.com/stretchr/testify/assert"
33 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
34 "go.etcd.io/etcd/api/v3/membershippb"
35 "go.etcd.io/etcd/client/pkg/v3/fileutil"
36 "go.etcd.io/etcd/client/pkg/v3/testutil"
37 "go.etcd.io/etcd/client/pkg/v3/types"
38 "go.etcd.io/etcd/pkg/v3/idutil"
39 "go.etcd.io/etcd/pkg/v3/pbutil"
40 "go.etcd.io/etcd/pkg/v3/wait"
41 "go.etcd.io/etcd/raft/v3"
42 "go.etcd.io/etcd/raft/v3/raftpb"
43 "go.etcd.io/etcd/server/v3/auth"
44 "go.etcd.io/etcd/server/v3/config"
45 "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
46 "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
47 "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
48 "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
49 "go.etcd.io/etcd/server/v3/etcdserver/cindex"
50 "go.etcd.io/etcd/server/v3/lease"
51 "go.etcd.io/etcd/server/v3/mock/mockstorage"
52 "go.etcd.io/etcd/server/v3/mock/mockstore"
53 "go.etcd.io/etcd/server/v3/mock/mockwait"
54 "go.etcd.io/etcd/server/v3/mvcc"
55 betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
56 "go.uber.org/zap"
57 "go.uber.org/zap/zaptest"
58 )
59
60
61
62 func TestDoLocalAction(t *testing.T) {
63 tests := []struct {
64 req pb.Request
65
66 wresp Response
67 werr error
68 wactions []testutil.Action
69 }{
70 {
71 pb.Request{Method: "GET", ID: 1, Wait: true},
72 Response{Watcher: v2store.NewNopWatcher()}, nil, []testutil.Action{{Name: "Watch"}},
73 },
74 {
75 pb.Request{Method: "GET", ID: 1},
76 Response{Event: &v2store.Event{}}, nil,
77 []testutil.Action{
78 {
79 Name: "Get",
80 Params: []interface{}{"", false, false},
81 },
82 },
83 },
84 {
85 pb.Request{Method: "HEAD", ID: 1},
86 Response{Event: &v2store.Event{}}, nil,
87 []testutil.Action{
88 {
89 Name: "Get",
90 Params: []interface{}{"", false, false},
91 },
92 },
93 },
94 {
95 pb.Request{Method: "BADMETHOD", ID: 1},
96 Response{}, ErrUnknownMethod, []testutil.Action{},
97 },
98 }
99 for i, tt := range tests {
100 st := mockstore.NewRecorder()
101 srv := &EtcdServer{
102 lgMu: new(sync.RWMutex),
103 lg: zap.NewExample(),
104 v2store: st,
105 reqIDGen: idutil.NewGenerator(0, time.Time{}),
106 }
107 resp, err := srv.Do(context.Background(), tt.req)
108
109 if err != tt.werr {
110 t.Fatalf("#%d: err = %+v, want %+v", i, err, tt.werr)
111 }
112 if !reflect.DeepEqual(resp, tt.wresp) {
113 t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
114 }
115 gaction := st.Action()
116 if !reflect.DeepEqual(gaction, tt.wactions) {
117 t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
118 }
119 }
120 }
121
122
123
124 func TestDoBadLocalAction(t *testing.T) {
125 storeErr := fmt.Errorf("bah")
126 tests := []struct {
127 req pb.Request
128
129 wactions []testutil.Action
130 }{
131 {
132 pb.Request{Method: "GET", ID: 1, Wait: true},
133 []testutil.Action{{Name: "Watch"}},
134 },
135 {
136 pb.Request{Method: "GET", ID: 1},
137 []testutil.Action{
138 {
139 Name: "Get",
140 Params: []interface{}{"", false, false},
141 },
142 },
143 },
144 {
145 pb.Request{Method: "HEAD", ID: 1},
146 []testutil.Action{
147 {
148 Name: "Get",
149 Params: []interface{}{"", false, false},
150 },
151 },
152 },
153 }
154 for i, tt := range tests {
155 st := mockstore.NewErrRecorder(storeErr)
156 srv := &EtcdServer{
157 lgMu: new(sync.RWMutex),
158 lg: zap.NewExample(),
159 v2store: st,
160 reqIDGen: idutil.NewGenerator(0, time.Time{}),
161 }
162 resp, err := srv.Do(context.Background(), tt.req)
163
164 if err != storeErr {
165 t.Fatalf("#%d: err = %+v, want %+v", i, err, storeErr)
166 }
167 if !reflect.DeepEqual(resp, Response{}) {
168 t.Errorf("#%d: resp = %+v, want %+v", i, resp, Response{})
169 }
170 gaction := st.Action()
171 if !reflect.DeepEqual(gaction, tt.wactions) {
172 t.Errorf("#%d: action = %+v, want %+v", i, gaction, tt.wactions)
173 }
174 }
175 }
176
177
178 func TestApplyRepeat(t *testing.T) {
179 n := newNodeConfChangeCommitterStream()
180 n.readyc <- raft.Ready{
181 SoftState: &raft.SoftState{RaftState: raft.StateLeader},
182 }
183 cl := newTestCluster(t, nil)
184 st := v2store.New()
185 cl.SetStore(v2store.New())
186 cl.AddMember(&membership.Member{ID: 1234}, true)
187 r := newRaftNode(raftNodeConfig{
188 lg: zap.NewExample(),
189 Node: n,
190 raftStorage: raft.NewMemoryStorage(),
191 storage: mockstorage.NewStorageRecorder(""),
192 transport: newNopTransporter(),
193 })
194 s := &EtcdServer{
195 lgMu: new(sync.RWMutex),
196 lg: zap.NewExample(),
197 r: *r,
198 v2store: st,
199 cluster: cl,
200 reqIDGen: idutil.NewGenerator(0, time.Time{}),
201 SyncTicker: &time.Ticker{},
202 consistIndex: cindex.NewFakeConsistentIndex(0),
203 }
204 s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
205 s.start()
206 req := &pb.Request{Method: "QGET", ID: uint64(1)}
207 ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}}
208 n.readyc <- raft.Ready{CommittedEntries: ents}
209
210 n.readyc <- raft.Ready{CommittedEntries: ents}
211
212
213 cc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
214 ents = []raftpb.Entry{{
215 Index: 2,
216 Type: raftpb.EntryConfChange,
217 Data: pbutil.MustMarshal(cc),
218 }}
219 n.readyc <- raft.Ready{CommittedEntries: ents}
220
221 act, err := n.Wait(1)
222
223 stopc := make(chan error, 1)
224 go func() {
225 _, werr := n.Wait(1)
226 stopc <- werr
227 }()
228 s.Stop()
229
230
231
232 if err != nil {
233 t.Fatal(err)
234 }
235 if len(act) == 0 {
236 t.Fatalf("expected len(act)=0, got %d", len(act))
237 }
238
239 if err = <-stopc; err != nil {
240 t.Fatalf("error on stop (%v)", err)
241 }
242 }
243
244 func TestApplyRequest(t *testing.T) {
245 tests := []struct {
246 req pb.Request
247
248 wresp Response
249 wactions []testutil.Action
250 }{
251
252 {
253 pb.Request{Method: "POST", ID: 1},
254 Response{Event: &v2store.Event{}},
255 []testutil.Action{
256 {
257 Name: "Create",
258 Params: []interface{}{"", false, "", true, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
259 },
260 },
261 },
262
263 {
264 pb.Request{Method: "POST", ID: 1, Expiration: 1337},
265 Response{Event: &v2store.Event{}},
266 []testutil.Action{
267 {
268 Name: "Create",
269 Params: []interface{}{"", false, "", true, v2store.TTLOptionSet{ExpireTime: time.Unix(0, 1337)}},
270 },
271 },
272 },
273
274 {
275 pb.Request{Method: "POST", ID: 1, Dir: true},
276 Response{Event: &v2store.Event{}},
277 []testutil.Action{
278 {
279 Name: "Create",
280 Params: []interface{}{"", true, "", true, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
281 },
282 },
283 },
284
285 {
286 pb.Request{Method: "PUT", ID: 1},
287 Response{Event: &v2store.Event{}},
288 []testutil.Action{
289 {
290 Name: "Set",
291 Params: []interface{}{"", false, "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
292 },
293 },
294 },
295
296 {
297 pb.Request{Method: "PUT", ID: 1, Dir: true},
298 Response{Event: &v2store.Event{}},
299 []testutil.Action{
300 {
301 Name: "Set",
302 Params: []interface{}{"", true, "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
303 },
304 },
305 },
306
307 {
308 pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true)},
309 Response{Event: &v2store.Event{}},
310 []testutil.Action{
311 {
312 Name: "Update",
313 Params: []interface{}{"", "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
314 },
315 },
316 },
317
318 {
319 pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false)},
320 Response{Event: &v2store.Event{}},
321 []testutil.Action{
322 {
323 Name: "Create",
324 Params: []interface{}{"", false, "", false, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
325 },
326 },
327 },
328
329 {
330 pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
331 Response{Event: &v2store.Event{}},
332 []testutil.Action{
333 {
334 Name: "CompareAndSwap",
335 Params: []interface{}{"", "", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
336 },
337 },
338 },
339
340 {
341 pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
342 Response{Event: &v2store.Event{}},
343 []testutil.Action{
344 {
345 Name: "Create",
346 Params: []interface{}{"", false, "", false, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
347 },
348 },
349 },
350
351 {
352 pb.Request{Method: "PUT", ID: 1, PrevIndex: 1},
353 Response{Event: &v2store.Event{}},
354 []testutil.Action{
355 {
356 Name: "CompareAndSwap",
357 Params: []interface{}{"", "", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
358 },
359 },
360 },
361
362 {
363 pb.Request{Method: "PUT", ID: 1, PrevValue: "bar"},
364 Response{Event: &v2store.Event{}},
365 []testutil.Action{
366 {
367 Name: "CompareAndSwap",
368 Params: []interface{}{"", "bar", uint64(0), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
369 },
370 },
371 },
372
373 {
374 pb.Request{Method: "PUT", ID: 1, PrevIndex: 1, PrevValue: "bar"},
375 Response{Event: &v2store.Event{}},
376 []testutil.Action{
377 {
378 Name: "CompareAndSwap",
379 Params: []interface{}{"", "bar", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
380 },
381 },
382 },
383
384 {
385 pb.Request{Method: "DELETE", ID: 1},
386 Response{Event: &v2store.Event{}},
387 []testutil.Action{
388 {
389 Name: "Delete",
390 Params: []interface{}{"", false, false},
391 },
392 },
393 },
394
395 {
396 pb.Request{Method: "DELETE", ID: 1, PrevIndex: 1},
397 Response{Event: &v2store.Event{}},
398 []testutil.Action{
399 {
400 Name: "CompareAndDelete",
401 Params: []interface{}{"", "", uint64(1)},
402 },
403 },
404 },
405
406 {
407 pb.Request{Method: "DELETE", ID: 1, PrevValue: "bar"},
408 Response{Event: &v2store.Event{}},
409 []testutil.Action{
410 {
411 Name: "CompareAndDelete",
412 Params: []interface{}{"", "bar", uint64(0)},
413 },
414 },
415 },
416
417 {
418 pb.Request{Method: "DELETE", ID: 1, PrevIndex: 5, PrevValue: "bar"},
419 Response{Event: &v2store.Event{}},
420 []testutil.Action{
421 {
422 Name: "CompareAndDelete",
423 Params: []interface{}{"", "bar", uint64(5)},
424 },
425 },
426 },
427
428 {
429 pb.Request{Method: "QGET", ID: 1},
430 Response{Event: &v2store.Event{}},
431 []testutil.Action{
432 {
433 Name: "Get",
434 Params: []interface{}{"", false, false},
435 },
436 },
437 },
438
439 {
440 pb.Request{Method: "SYNC", ID: 1},
441 Response{},
442 []testutil.Action{
443 {
444 Name: "DeleteExpiredKeys",
445 Params: []interface{}{time.Unix(0, 0)},
446 },
447 },
448 },
449 {
450 pb.Request{Method: "SYNC", ID: 1, Time: 12345},
451 Response{},
452 []testutil.Action{
453 {
454 Name: "DeleteExpiredKeys",
455 Params: []interface{}{time.Unix(0, 12345)},
456 },
457 },
458 },
459
460 {
461 pb.Request{Method: "BADMETHOD", ID: 1},
462 Response{Err: ErrUnknownMethod},
463 []testutil.Action{},
464 },
465 }
466
467 for i, tt := range tests {
468 st := mockstore.NewRecorder()
469 srv := &EtcdServer{
470 lgMu: new(sync.RWMutex),
471 lg: zap.NewExample(),
472 v2store: st,
473 }
474 srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
475 resp := srv.applyV2Request((*RequestV2)(&tt.req), membership.ApplyBoth)
476
477 if !reflect.DeepEqual(resp, tt.wresp) {
478 t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
479 }
480 gaction := st.Action()
481 if !reflect.DeepEqual(gaction, tt.wactions) {
482 t.Errorf("#%d: action = %#v, want %#v", i, gaction, tt.wactions)
483 }
484 }
485 }
486
487 func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
488 cl := newTestCluster(t, []*membership.Member{{ID: 1}})
489 srv := &EtcdServer{
490 lgMu: new(sync.RWMutex),
491 lg: zap.NewExample(),
492 v2store: mockstore.NewRecorder(),
493 cluster: cl,
494 }
495 srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
496
497 req := pb.Request{
498 Method: "PUT",
499 ID: 1,
500 Path: membership.MemberAttributesStorePath(1),
501 Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
502 }
503 srv.applyV2Request((*RequestV2)(&req), membership.ApplyBoth)
504 w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
505 if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
506 t.Errorf("attributes = %v, want %v", g, w)
507 }
508 }
509
510 func TestApplyConfChangeError(t *testing.T) {
511 cl := membership.NewCluster(zaptest.NewLogger(t))
512 cl.SetStore(v2store.New())
513 for i := 1; i <= 4; i++ {
514 cl.AddMember(&membership.Member{ID: types.ID(i)}, true)
515 }
516 cl.RemoveMember(4, true)
517
518 attr := membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
519 ctx, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr})
520 if err != nil {
521 t.Fatal(err)
522 }
523
524 attr = membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 4)}}
525 ctx4, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr})
526 if err != nil {
527 t.Fatal(err)
528 }
529
530 attr = membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}}
531 ctx5, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr})
532 if err != nil {
533 t.Fatal(err)
534 }
535
536 tests := []struct {
537 cc raftpb.ConfChange
538 werr error
539 }{
540 {
541 raftpb.ConfChange{
542 Type: raftpb.ConfChangeAddNode,
543 NodeID: 4,
544 Context: ctx4,
545 },
546 membership.ErrIDRemoved,
547 },
548 {
549 raftpb.ConfChange{
550 Type: raftpb.ConfChangeUpdateNode,
551 NodeID: 4,
552 Context: ctx4,
553 },
554 membership.ErrIDRemoved,
555 },
556 {
557 raftpb.ConfChange{
558 Type: raftpb.ConfChangeAddNode,
559 NodeID: 1,
560 Context: ctx,
561 },
562 membership.ErrIDExists,
563 },
564 {
565 raftpb.ConfChange{
566 Type: raftpb.ConfChangeRemoveNode,
567 NodeID: 5,
568 Context: ctx5,
569 },
570 membership.ErrIDNotFound,
571 },
572 }
573 for i, tt := range tests {
574 n := newNodeRecorder()
575 srv := &EtcdServer{
576 lgMu: new(sync.RWMutex),
577 lg: zap.NewExample(),
578 r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
579 cluster: cl,
580 }
581 _, err := srv.applyConfChange(tt.cc, nil, true)
582 if err != tt.werr {
583 t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
584 }
585 cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None, Context: tt.cc.Context}
586 w := []testutil.Action{
587 {
588 Name: "ApplyConfChange",
589 Params: []interface{}{cc},
590 },
591 }
592 if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
593 t.Errorf("#%d: action = %+v, want %+v", i, g, w)
594 }
595 }
596 }
597
598 func TestApplyConfChangeShouldStop(t *testing.T) {
599 cl := membership.NewCluster(zaptest.NewLogger(t))
600 cl.SetStore(v2store.New())
601 for i := 1; i <= 3; i++ {
602 cl.AddMember(&membership.Member{ID: types.ID(i)}, true)
603 }
604 r := newRaftNode(raftNodeConfig{
605 lg: zap.NewExample(),
606 Node: newNodeNop(),
607 transport: newNopTransporter(),
608 })
609 lg := zaptest.NewLogger(t)
610 srv := &EtcdServer{
611 lgMu: new(sync.RWMutex),
612 lg: lg,
613 id: 1,
614 r: *r,
615 cluster: cl,
616 beHooks: &backendHooks{lg: lg},
617 }
618 cc := raftpb.ConfChange{
619 Type: raftpb.ConfChangeRemoveNode,
620 NodeID: 2,
621 }
622
623 shouldStop, err := srv.applyConfChange(cc, &raftpb.ConfState{}, true)
624 if err != nil {
625 t.Fatalf("unexpected error %v", err)
626 }
627 if shouldStop {
628 t.Errorf("shouldStop = %t, want %t", shouldStop, false)
629 }
630
631
632 cc.NodeID = 1
633 shouldStop, err = srv.applyConfChange(cc, &raftpb.ConfState{}, true)
634 if err != nil {
635 t.Fatalf("unexpected error %v", err)
636 }
637 if !shouldStop {
638 t.Errorf("shouldStop = %t, want %t", shouldStop, true)
639 }
640 }
641
642
643
644 func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
645 lg := zaptest.NewLogger(t)
646
647 cl := membership.NewCluster(zaptest.NewLogger(t))
648 cl.SetStore(v2store.New())
649 cl.AddMember(&membership.Member{ID: types.ID(1)}, true)
650
651 be, _ := betesting.NewDefaultTmpBackend(t)
652 defer betesting.Close(t, be)
653 cindex.CreateMetaBucket(be.BatchTx())
654
655 ci := cindex.NewConsistentIndex(be)
656 srv := &EtcdServer{
657 lgMu: new(sync.RWMutex),
658 lg: lg,
659 id: 1,
660 r: *realisticRaftNode(lg),
661 cluster: cl,
662 w: wait.New(),
663 consistIndex: ci,
664 beHooks: &backendHooks{lg: lg, indexer: ci},
665 }
666 defer srv.r.Stop()
667
668
669 now := time.Now()
670 urls, err := types.NewURLs([]string{"http://whatever:123"})
671 if err != nil {
672 t.Fatal(err)
673 }
674 m := membership.NewMember("", urls, "", &now)
675 m.ID = types.ID(2)
676 b, err := json.Marshal(m)
677 if err != nil {
678 t.Fatal(err)
679 }
680 cc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2, Context: b}
681 ents := []raftpb.Entry{{
682 Index: 2,
683 Term: 4,
684 Type: raftpb.EntryConfChange,
685 Data: pbutil.MustMarshal(cc),
686 }}
687
688 _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
689 consistIndex := srv.consistIndex.ConsistentIndex()
690 assert.Equal(t, uint64(2), appliedi)
691
692 t.Run("verify-backend", func(t *testing.T) {
693 tx := be.BatchTx()
694 tx.Lock()
695 defer tx.Unlock()
696 srv.beHooks.OnPreCommitUnsafe(tx)
697 assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx))
698 })
699 rindex, _ := cindex.ReadConsistentIndex(be.ReadTx())
700 assert.Equal(t, consistIndex, rindex)
701 }
702
703 func realisticRaftNode(lg *zap.Logger) *raftNode {
704 storage := raft.NewMemoryStorage()
705 storage.SetHardState(raftpb.HardState{Commit: 0, Term: 0})
706 c := &raft.Config{
707 ID: 1,
708 ElectionTick: 10,
709 HeartbeatTick: 1,
710 Storage: storage,
711 MaxSizePerMsg: math.MaxUint64,
712 MaxInflightMsgs: 256,
713 }
714 n := raft.RestartNode(c)
715 r := newRaftNode(raftNodeConfig{
716 lg: lg,
717 Node: n,
718 transport: newNopTransporter(),
719 })
720 return r
721 }
722
723
724
725 func TestApplyMultiConfChangeShouldStop(t *testing.T) {
726 lg := zaptest.NewLogger(t)
727 cl := membership.NewCluster(lg)
728 cl.SetStore(v2store.New())
729 for i := 1; i <= 5; i++ {
730 cl.AddMember(&membership.Member{ID: types.ID(i)}, true)
731 }
732 r := newRaftNode(raftNodeConfig{
733 lg: lg,
734 Node: newNodeNop(),
735 transport: newNopTransporter(),
736 })
737 ci := cindex.NewFakeConsistentIndex(0)
738 srv := &EtcdServer{
739 lgMu: new(sync.RWMutex),
740 lg: lg,
741 id: 2,
742 r: *r,
743 cluster: cl,
744 w: wait.New(),
745 consistIndex: ci,
746 beHooks: &backendHooks{lg: lg, indexer: ci},
747 }
748 ents := []raftpb.Entry{}
749 for i := 1; i <= 4; i++ {
750 ent := raftpb.Entry{
751 Term: 1,
752 Index: uint64(i),
753 Type: raftpb.EntryConfChange,
754 Data: pbutil.MustMarshal(
755 &raftpb.ConfChange{
756 Type: raftpb.ConfChangeRemoveNode,
757 NodeID: uint64(i)}),
758 }
759 ents = append(ents, ent)
760 }
761
762 _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
763 if !shouldStop {
764 t.Errorf("shouldStop = %t, want %t", shouldStop, true)
765 }
766 }
767
768 func TestDoProposal(t *testing.T) {
769 tests := []pb.Request{
770 {Method: "POST", ID: 1},
771 {Method: "PUT", ID: 1},
772 {Method: "DELETE", ID: 1},
773 {Method: "GET", ID: 1, Quorum: true},
774 }
775 for i, tt := range tests {
776 st := mockstore.NewRecorder()
777 r := newRaftNode(raftNodeConfig{
778 lg: zap.NewExample(),
779 Node: newNodeCommitter(),
780 storage: mockstorage.NewStorageRecorder(""),
781 raftStorage: raft.NewMemoryStorage(),
782 transport: newNopTransporter(),
783 })
784 srv := &EtcdServer{
785 lgMu: new(sync.RWMutex),
786 lg: zap.NewExample(),
787 Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
788 r: *r,
789 v2store: st,
790 reqIDGen: idutil.NewGenerator(0, time.Time{}),
791 SyncTicker: &time.Ticker{},
792 consistIndex: cindex.NewFakeConsistentIndex(0),
793 }
794 srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
795 srv.start()
796 resp, err := srv.Do(context.Background(), tt)
797 srv.Stop()
798
799 action := st.Action()
800 if len(action) != 1 {
801 t.Errorf("#%d: len(action) = %d, want 1", i, len(action))
802 }
803 if err != nil {
804 t.Fatalf("#%d: err = %v, want nil", i, err)
805 }
806
807 wresp := Response{Event: &v2store.Event{}, Index: resp.Index}
808 if !reflect.DeepEqual(resp, wresp) {
809 t.Errorf("#%d: resp = %v, want %v", i, resp, wresp)
810 }
811 }
812 }
813
814 func TestDoProposalCancelled(t *testing.T) {
815 wt := mockwait.NewRecorder()
816 srv := &EtcdServer{
817 lgMu: new(sync.RWMutex),
818 lg: zap.NewExample(),
819 Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
820 r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
821 w: wt,
822 reqIDGen: idutil.NewGenerator(0, time.Time{}),
823 }
824 srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
825
826 ctx, cancel := context.WithCancel(context.Background())
827 cancel()
828 _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
829
830 if err != ErrCanceled {
831 t.Fatalf("err = %v, want %v", err, ErrCanceled)
832 }
833 w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}}
834 if !reflect.DeepEqual(wt.Action(), w) {
835 t.Errorf("wt.action = %+v, want %+v", wt.Action(), w)
836 }
837 }
838
839 func TestDoProposalTimeout(t *testing.T) {
840 srv := &EtcdServer{
841 lgMu: new(sync.RWMutex),
842 lg: zap.NewExample(),
843 Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
844 r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
845 w: mockwait.NewNop(),
846 reqIDGen: idutil.NewGenerator(0, time.Time{}),
847 }
848 srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
849
850 ctx, cancel := context.WithTimeout(context.Background(), 0)
851 _, err := srv.Do(ctx, pb.Request{Method: "PUT"})
852 cancel()
853 if err != ErrTimeout {
854 t.Fatalf("err = %v, want %v", err, ErrTimeout)
855 }
856 }
857
858 func TestDoProposalStopped(t *testing.T) {
859 srv := &EtcdServer{
860 lgMu: new(sync.RWMutex),
861 lg: zap.NewExample(),
862 Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
863 r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeNop()}),
864 w: mockwait.NewNop(),
865 reqIDGen: idutil.NewGenerator(0, time.Time{}),
866 }
867 srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
868
869 srv.stopping = make(chan struct{})
870 close(srv.stopping)
871 _, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1})
872 if err != ErrStopped {
873 t.Errorf("err = %v, want %v", err, ErrStopped)
874 }
875 }
876
877
878 func TestSync(t *testing.T) {
879 n := newNodeRecorder()
880 ctx, cancel := context.WithCancel(context.Background())
881 srv := &EtcdServer{
882 lgMu: new(sync.RWMutex),
883 lg: zap.NewExample(),
884 r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
885 reqIDGen: idutil.NewGenerator(0, time.Time{}),
886 ctx: ctx,
887 cancel: cancel,
888 }
889 srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
890
891
892 done := make(chan struct{}, 1)
893 go func() {
894 srv.sync(10 * time.Second)
895 done <- struct{}{}
896 }()
897
898 select {
899 case <-done:
900 case <-time.After(time.Second):
901 t.Fatal("sync should be non-blocking but did not return after 1s!")
902 }
903
904 action, _ := n.Wait(1)
905 if len(action) != 1 {
906 t.Fatalf("len(action) = %d, want 1", len(action))
907 }
908 if action[0].Name != "Propose" {
909 t.Fatalf("action = %s, want Propose", action[0].Name)
910 }
911 data := action[0].Params[0].([]byte)
912 var r pb.Request
913 if err := r.Unmarshal(data); err != nil {
914 t.Fatalf("unmarshal request error: %v", err)
915 }
916 if r.Method != "SYNC" {
917 t.Errorf("method = %s, want SYNC", r.Method)
918 }
919 }
920
921
922
923 func TestSyncTimeout(t *testing.T) {
924 n := newProposalBlockerRecorder()
925 ctx, cancel := context.WithCancel(context.Background())
926 srv := &EtcdServer{
927 lgMu: new(sync.RWMutex),
928 lg: zap.NewExample(),
929 r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
930 reqIDGen: idutil.NewGenerator(0, time.Time{}),
931 ctx: ctx,
932 cancel: cancel,
933 }
934 srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
935
936
937 done := make(chan struct{}, 1)
938 go func() {
939 srv.sync(0)
940 done <- struct{}{}
941 }()
942
943 select {
944 case <-done:
945 case <-time.After(time.Second):
946 t.Fatal("sync should be non-blocking but did not return after 1s!")
947 }
948
949 w := []testutil.Action{{Name: "Propose blocked"}}
950 if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
951 t.Errorf("action = %v, want %v", g, w)
952 }
953 }
954
955
956
957
958 func TestSyncTrigger(t *testing.T) {
959 n := newReadyNode()
960 st := make(chan time.Time, 1)
961 tk := &time.Ticker{C: st}
962 r := newRaftNode(raftNodeConfig{
963 lg: zap.NewExample(),
964 Node: n,
965 raftStorage: raft.NewMemoryStorage(),
966 transport: newNopTransporter(),
967 storage: mockstorage.NewStorageRecorder(""),
968 })
969
970 srv := &EtcdServer{
971 lgMu: new(sync.RWMutex),
972 lg: zap.NewExample(),
973 Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
974 r: *r,
975 v2store: mockstore.NewNop(),
976 SyncTicker: tk,
977 reqIDGen: idutil.NewGenerator(0, time.Time{}),
978 }
979
980
981 go func() {
982 srv.start()
983 n.readyc <- raft.Ready{
984 SoftState: &raft.SoftState{
985 RaftState: raft.StateLeader,
986 },
987 }
988
989 st <- time.Time{}
990 }()
991
992 action, _ := n.Wait(1)
993 go srv.Stop()
994
995 if len(action) != 1 {
996 t.Fatalf("len(action) = %d, want 1", len(action))
997 }
998 if action[0].Name != "Propose" {
999 t.Fatalf("action = %s, want Propose", action[0].Name)
1000 }
1001 data := action[0].Params[0].([]byte)
1002 var req pb.Request
1003 if err := req.Unmarshal(data); err != nil {
1004 t.Fatalf("error unmarshalling data: %v", err)
1005 }
1006 if req.Method != "SYNC" {
1007 t.Fatalf("unexpected proposed request: %#v", req.Method)
1008 }
1009
1010
1011 <-n.Chan()
1012 }
1013
1014
1015 func TestSnapshot(t *testing.T) {
1016 be, _ := betesting.NewDefaultTmpBackend(t)
1017 defer betesting.Close(t, be)
1018
1019 s := raft.NewMemoryStorage()
1020 s.Append([]raftpb.Entry{{Index: 1}})
1021 st := mockstore.NewRecorderStream()
1022 p := mockstorage.NewStorageRecorderStream("")
1023 r := newRaftNode(raftNodeConfig{
1024 lg: zap.NewExample(),
1025 Node: newNodeNop(),
1026 raftStorage: s,
1027 storage: p,
1028 })
1029 srv := &EtcdServer{
1030 lgMu: new(sync.RWMutex),
1031 lg: zap.NewExample(),
1032 r: *r,
1033 v2store: st,
1034 consistIndex: cindex.NewConsistentIndex(be),
1035 }
1036 srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
1037
1038 defer func() {
1039 assert.NoError(t, srv.kv.Close())
1040 }()
1041
1042 srv.be = be
1043
1044 ch := make(chan struct{}, 2)
1045
1046 go func() {
1047 gaction, _ := p.Wait(2)
1048 defer func() { ch <- struct{}{} }()
1049
1050 if len(gaction) != 2 {
1051 t.Errorf("len(action) = %d, want 2", len(gaction))
1052 return
1053 }
1054 if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
1055 t.Errorf("action = %s, want SaveSnap", gaction[0])
1056 }
1057
1058 if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Release"}) {
1059 t.Errorf("action = %s, want Release", gaction[1])
1060 }
1061 }()
1062
1063 go func() {
1064 gaction, _ := st.Wait(2)
1065 defer func() { ch <- struct{}{} }()
1066
1067 if len(gaction) != 2 {
1068 t.Errorf("len(action) = %d, want 2", len(gaction))
1069 }
1070 if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
1071 t.Errorf("action = %s, want Clone", gaction[0])
1072 }
1073 if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
1074 t.Errorf("action = %s, want SaveNoCopy", gaction[1])
1075 }
1076 }()
1077
1078 srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
1079 <-ch
1080 <-ch
1081 }
1082
1083
1084
1085 func TestSnapshotOrdering(t *testing.T) {
1086 lg := zaptest.NewLogger(t)
1087 n := newNopReadyNode()
1088 st := v2store.New()
1089 cl := membership.NewCluster(lg)
1090 cl.SetStore(st)
1091
1092 testdir, err := ioutil.TempDir(t.TempDir(), "testsnapdir")
1093 if err != nil {
1094 t.Fatalf("couldn't open tempdir (%v)", err)
1095 }
1096 defer os.RemoveAll(testdir)
1097
1098 snapdir := filepath.Join(testdir, "member", "snap")
1099 if err := os.MkdirAll(snapdir, 0755); err != nil {
1100 t.Fatalf("couldn't make snap dir (%v)", err)
1101 }
1102
1103 rs := raft.NewMemoryStorage()
1104 p := mockstorage.NewStorageRecorderStream(testdir)
1105 tr, snapDoneC := newSnapTransporter(snapdir)
1106 r := newRaftNode(raftNodeConfig{
1107 lg: lg,
1108 isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
1109 Node: n,
1110 transport: tr,
1111 storage: p,
1112 raftStorage: rs,
1113 })
1114 be, _ := betesting.NewDefaultTmpBackend(t)
1115 ci := cindex.NewConsistentIndex(be)
1116 s := &EtcdServer{
1117 lgMu: new(sync.RWMutex),
1118 lg: lg,
1119 Cfg: config.ServerConfig{Logger: lg, DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
1120 r: *r,
1121 v2store: st,
1122 snapshotter: snap.New(lg, snapdir),
1123 cluster: cl,
1124 SyncTicker: &time.Ticker{},
1125 consistIndex: ci,
1126 beHooks: &backendHooks{lg: lg, indexer: ci},
1127 }
1128 s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
1129
1130 s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
1131 s.be = be
1132
1133 s.start()
1134 defer s.Stop()
1135
1136 n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
1137 go func() {
1138
1139 snapMsg := <-snapDoneC
1140
1141
1142 snapMsg.Snapshot.Metadata.Index = 1
1143 n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
1144 }()
1145
1146 ac := <-p.Chan()
1147 if ac.Name != "Save" {
1148 t.Fatalf("expected Save, got %+v", ac)
1149 }
1150
1151 if ac := <-p.Chan(); ac.Name != "SaveSnap" {
1152 t.Fatalf("expected SaveSnap, got %+v", ac)
1153 }
1154
1155 if ac := <-p.Chan(); ac.Name != "Save" {
1156 t.Fatalf("expected Save, got %+v", ac)
1157 }
1158
1159
1160 snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1))
1161 if !fileutil.Exist(snapPath) {
1162 t.Fatalf("expected file %q, got missing", snapPath)
1163 }
1164
1165
1166 if ac := <-p.Chan(); ac.Name != "Sync" {
1167 t.Fatalf("expected Sync, got %+v", ac)
1168 }
1169
1170 if ac := <-p.Chan(); ac.Name != "Release" {
1171 t.Fatalf("expected Release, got %+v", ac)
1172 }
1173 }
1174
1175
1176 func TestTriggerSnap(t *testing.T) {
1177 be, tmpPath := betesting.NewDefaultTmpBackend(t)
1178 defer func() {
1179 os.RemoveAll(tmpPath)
1180 }()
1181
1182 snapc := 10
1183 st := mockstore.NewRecorder()
1184 p := mockstorage.NewStorageRecorderStream("")
1185 r := newRaftNode(raftNodeConfig{
1186 lg: zap.NewExample(),
1187 Node: newNodeCommitter(),
1188 raftStorage: raft.NewMemoryStorage(),
1189 storage: p,
1190 transport: newNopTransporter(),
1191 })
1192 srv := &EtcdServer{
1193 lgMu: new(sync.RWMutex),
1194 lg: zap.NewExample(),
1195 Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
1196 r: *r,
1197 v2store: st,
1198 reqIDGen: idutil.NewGenerator(0, time.Time{}),
1199 SyncTicker: &time.Ticker{},
1200 consistIndex: cindex.NewConsistentIndex(be),
1201 }
1202 srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
1203
1204 srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
1205 srv.be = be
1206
1207 srv.start()
1208
1209 donec := make(chan struct{})
1210 go func() {
1211 defer close(donec)
1212 wcnt := 3 + snapc
1213 gaction, _ := p.Wait(wcnt)
1214
1215
1216
1217 if len(gaction) != wcnt {
1218 t.Logf("gaction: %v", gaction)
1219 t.Errorf("len(action) = %d, want %d", len(gaction), wcnt)
1220 return
1221 }
1222 if !reflect.DeepEqual(gaction[wcnt-2], testutil.Action{Name: "SaveSnap"}) {
1223 t.Errorf("action = %s, want SaveSnap", gaction[wcnt-2])
1224 }
1225
1226 if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "Release"}) {
1227 t.Errorf("action = %s, want Release", gaction[wcnt-1])
1228 }
1229 }()
1230
1231 for i := 0; i < snapc+1; i++ {
1232 srv.Do(context.Background(), pb.Request{Method: "PUT"})
1233 }
1234
1235 <-donec
1236 srv.Stop()
1237 }
1238
1239
1240
1241 func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
1242 lg := zaptest.NewLogger(t)
1243 n := newNopReadyNode()
1244 st := v2store.New()
1245 cl := membership.NewCluster(lg)
1246 cl.SetStore(st)
1247
1248 testdir, err := ioutil.TempDir(t.TempDir(), "testsnapdir")
1249 if err != nil {
1250 t.Fatalf("Couldn't open tempdir (%v)", err)
1251 }
1252 defer os.RemoveAll(testdir)
1253 if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil {
1254 t.Fatalf("Couldn't make snap dir (%v)", err)
1255 }
1256
1257 rs := raft.NewMemoryStorage()
1258 tr, snapDoneC := newSnapTransporter(testdir)
1259 r := newRaftNode(raftNodeConfig{
1260 lg: lg,
1261 isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
1262 Node: n,
1263 transport: tr,
1264 storage: mockstorage.NewStorageRecorder(testdir),
1265 raftStorage: rs,
1266 })
1267 be, _ := betesting.NewDefaultTmpBackend(t)
1268 ci := cindex.NewConsistentIndex(be)
1269 s := &EtcdServer{
1270 lgMu: new(sync.RWMutex),
1271 lg: lg,
1272 Cfg: config.ServerConfig{Logger: lg, DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
1273 r: *r,
1274 v2store: st,
1275 snapshotter: snap.New(lg, testdir),
1276 cluster: cl,
1277 SyncTicker: &time.Ticker{},
1278 consistIndex: ci,
1279 beHooks: &backendHooks{lg: lg, indexer: ci},
1280 }
1281 s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster}
1282
1283 s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
1284 s.be = be
1285
1286 s.start()
1287 defer s.Stop()
1288
1289
1290 idx := uint64(0)
1291 outdated := 0
1292 accepted := 0
1293 for k := 1; k <= 101; k++ {
1294 idx++
1295 ch := s.w.Register(idx)
1296 req := &pb.Request{Method: "QGET", ID: idx}
1297 ent := raftpb.Entry{Index: idx, Data: pbutil.MustMarshal(req)}
1298 ready := raft.Ready{Entries: []raftpb.Entry{ent}}
1299 n.readyc <- ready
1300
1301 ready = raft.Ready{CommittedEntries: []raftpb.Entry{ent}}
1302 n.readyc <- ready
1303
1304
1305 <-ch
1306
1307
1308 if k%2 != 0 {
1309 continue
1310 }
1311
1312 n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
1313
1314 snapMsg := <-snapDoneC
1315
1316
1317
1318
1319 if snapMsg.Snapshot.Metadata.Index == idx {
1320 idx++
1321 snapMsg.Snapshot.Metadata.Index = idx
1322 ready = raft.Ready{Snapshot: snapMsg.Snapshot}
1323 n.readyc <- ready
1324 accepted++
1325 } else {
1326 outdated++
1327 }
1328
1329 }
1330 if accepted != 50 {
1331 t.Errorf("accepted=%v, want 50", accepted)
1332 }
1333 if outdated != 0 {
1334 t.Errorf("outdated=%v, want 0", outdated)
1335 }
1336 }
1337
1338
1339 func TestAddMember(t *testing.T) {
1340 lg := zaptest.NewLogger(t)
1341 n := newNodeConfChangeCommitterRecorder()
1342 n.readyc <- raft.Ready{
1343 SoftState: &raft.SoftState{RaftState: raft.StateLeader},
1344 }
1345 cl := newTestCluster(t, nil)
1346 st := v2store.New()
1347 cl.SetStore(st)
1348 r := newRaftNode(raftNodeConfig{
1349 lg: lg,
1350 Node: n,
1351 raftStorage: raft.NewMemoryStorage(),
1352 storage: mockstorage.NewStorageRecorder(""),
1353 transport: newNopTransporter(),
1354 })
1355 s := &EtcdServer{
1356 lgMu: new(sync.RWMutex),
1357 lg: lg,
1358 r: *r,
1359 v2store: st,
1360 cluster: cl,
1361 reqIDGen: idutil.NewGenerator(0, time.Time{}),
1362 SyncTicker: &time.Ticker{},
1363 consistIndex: cindex.NewFakeConsistentIndex(0),
1364 beHooks: &backendHooks{lg: lg},
1365 }
1366 s.start()
1367 m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}}
1368 _, err := s.AddMember(context.Background(), m)
1369 gaction := n.Action()
1370 s.Stop()
1371
1372 if err != nil {
1373 t.Fatalf("AddMember error: %v", err)
1374 }
1375 wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeAddNode"}, {Name: "ApplyConfChange:ConfChangeAddNode"}}
1376 if !reflect.DeepEqual(gaction, wactions) {
1377 t.Errorf("action = %v, want %v", gaction, wactions)
1378 }
1379 if cl.Member(1234) == nil {
1380 t.Errorf("member with id 1234 is not added")
1381 }
1382 }
1383
1384
1385 func TestRemoveMember(t *testing.T) {
1386 lg := zaptest.NewLogger(t)
1387 n := newNodeConfChangeCommitterRecorder()
1388 n.readyc <- raft.Ready{
1389 SoftState: &raft.SoftState{RaftState: raft.StateLeader},
1390 }
1391 cl := newTestCluster(t, nil)
1392 st := v2store.New()
1393 cl.SetStore(v2store.New())
1394 cl.AddMember(&membership.Member{ID: 1234}, true)
1395 r := newRaftNode(raftNodeConfig{
1396 lg: lg,
1397 Node: n,
1398 raftStorage: raft.NewMemoryStorage(),
1399 storage: mockstorage.NewStorageRecorder(""),
1400 transport: newNopTransporter(),
1401 })
1402 s := &EtcdServer{
1403 lgMu: new(sync.RWMutex),
1404 lg: zap.NewExample(),
1405 r: *r,
1406 v2store: st,
1407 cluster: cl,
1408 reqIDGen: idutil.NewGenerator(0, time.Time{}),
1409 SyncTicker: &time.Ticker{},
1410 consistIndex: cindex.NewFakeConsistentIndex(0),
1411 beHooks: &backendHooks{lg: lg},
1412 }
1413 s.start()
1414 _, err := s.RemoveMember(context.Background(), 1234)
1415 gaction := n.Action()
1416 s.Stop()
1417
1418 if err != nil {
1419 t.Fatalf("RemoveMember error: %v", err)
1420 }
1421 wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeRemoveNode"}, {Name: "ApplyConfChange:ConfChangeRemoveNode"}}
1422 if !reflect.DeepEqual(gaction, wactions) {
1423 t.Errorf("action = %v, want %v", gaction, wactions)
1424 }
1425 if cl.Member(1234) != nil {
1426 t.Errorf("member with id 1234 is not removed")
1427 }
1428 }
1429
1430
1431 func TestUpdateMember(t *testing.T) {
1432 lg := zaptest.NewLogger(t)
1433 n := newNodeConfChangeCommitterRecorder()
1434 n.readyc <- raft.Ready{
1435 SoftState: &raft.SoftState{RaftState: raft.StateLeader},
1436 }
1437 cl := newTestCluster(t, nil)
1438 st := v2store.New()
1439 cl.SetStore(st)
1440 cl.AddMember(&membership.Member{ID: 1234}, true)
1441 r := newRaftNode(raftNodeConfig{
1442 lg: lg,
1443 Node: n,
1444 raftStorage: raft.NewMemoryStorage(),
1445 storage: mockstorage.NewStorageRecorder(""),
1446 transport: newNopTransporter(),
1447 })
1448 s := &EtcdServer{
1449 lgMu: new(sync.RWMutex),
1450 lg: lg,
1451 r: *r,
1452 v2store: st,
1453 cluster: cl,
1454 reqIDGen: idutil.NewGenerator(0, time.Time{}),
1455 SyncTicker: &time.Ticker{},
1456 consistIndex: cindex.NewFakeConsistentIndex(0),
1457 beHooks: &backendHooks{lg: lg},
1458 }
1459 s.start()
1460 wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
1461 _, err := s.UpdateMember(context.Background(), wm)
1462 gaction := n.Action()
1463 s.Stop()
1464
1465 if err != nil {
1466 t.Fatalf("UpdateMember error: %v", err)
1467 }
1468 wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeUpdateNode"}, {Name: "ApplyConfChange:ConfChangeUpdateNode"}}
1469 if !reflect.DeepEqual(gaction, wactions) {
1470 t.Errorf("action = %v, want %v", gaction, wactions)
1471 }
1472 if !reflect.DeepEqual(cl.Member(1234), &wm) {
1473 t.Errorf("member = %v, want %v", cl.Member(1234), &wm)
1474 }
1475 }
1476
1477
1478
1479 func TestPublish(t *testing.T) {
1480 lg := zaptest.NewLogger(t)
1481 n := newNodeRecorder()
1482 ch := make(chan interface{}, 1)
1483
1484 ch <- Response{}
1485 w := wait.NewWithResponse(ch)
1486 ctx, cancel := context.WithCancel(context.Background())
1487 srv := &EtcdServer{
1488 lgMu: new(sync.RWMutex),
1489 lg: lg,
1490 readych: make(chan struct{}),
1491 Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
1492 id: 1,
1493 r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
1494 attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
1495 cluster: &membership.RaftCluster{},
1496 w: w,
1497 reqIDGen: idutil.NewGenerator(0, time.Time{}),
1498 SyncTicker: &time.Ticker{},
1499
1500 ctx: ctx,
1501 cancel: cancel,
1502 }
1503 srv.publish(time.Hour)
1504
1505 action := n.Action()
1506 if len(action) != 1 {
1507 t.Fatalf("len(action) = %d, want 1", len(action))
1508 }
1509 if action[0].Name != "Propose" {
1510 t.Fatalf("action = %s, want Propose", action[0].Name)
1511 }
1512 data := action[0].Params[0].([]byte)
1513 var r pb.Request
1514 if err := r.Unmarshal(data); err != nil {
1515 t.Fatalf("unmarshal request error: %v", err)
1516 }
1517 if r.Method != "PUT" {
1518 t.Errorf("method = %s, want PUT", r.Method)
1519 }
1520 wm := membership.Member{ID: 1, Attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
1521 if wpath := membership.MemberAttributesStorePath(wm.ID); r.Path != wpath {
1522 t.Errorf("path = %s, want %s", r.Path, wpath)
1523 }
1524 var gattr membership.Attributes
1525 if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
1526 t.Fatalf("unmarshal val error: %v", err)
1527 }
1528 if !reflect.DeepEqual(gattr, wm.Attributes) {
1529 t.Errorf("member = %v, want %v", gattr, wm.Attributes)
1530 }
1531 }
1532
1533
1534 func TestPublishStopped(t *testing.T) {
1535 lg := zaptest.NewLogger(t)
1536 ctx, cancel := context.WithCancel(context.Background())
1537 r := newRaftNode(raftNodeConfig{
1538 lg: lg,
1539 Node: newNodeNop(),
1540 transport: newNopTransporter(),
1541 })
1542 srv := &EtcdServer{
1543 lgMu: new(sync.RWMutex),
1544 lg: lg,
1545 Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
1546 r: *r,
1547 cluster: &membership.RaftCluster{},
1548 w: mockwait.NewNop(),
1549 done: make(chan struct{}),
1550 stopping: make(chan struct{}),
1551 stop: make(chan struct{}),
1552 reqIDGen: idutil.NewGenerator(0, time.Time{}),
1553 SyncTicker: &time.Ticker{},
1554
1555 ctx: ctx,
1556 cancel: cancel,
1557 }
1558 close(srv.stopping)
1559 srv.publish(time.Hour)
1560 }
1561
1562
1563 func TestPublishRetry(t *testing.T) {
1564 lg := zaptest.NewLogger(t)
1565
1566 ctx, cancel := context.WithCancel(context.Background())
1567 n := newNodeRecorderStream()
1568 srv := &EtcdServer{
1569 lgMu: new(sync.RWMutex),
1570 lg: lg,
1571 Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
1572 r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
1573 w: mockwait.NewNop(),
1574 stopping: make(chan struct{}),
1575 reqIDGen: idutil.NewGenerator(0, time.Time{}),
1576 SyncTicker: &time.Ticker{},
1577 ctx: ctx,
1578 cancel: cancel,
1579 }
1580
1581 ch := make(chan struct{})
1582 go func() {
1583 defer close(ch)
1584 if action, err := n.Wait(2); err != nil {
1585 t.Errorf("len(action) = %d, want >= 2 (%v)", len(action), err)
1586 }
1587 close(srv.stopping)
1588
1589 for {
1590 select {
1591 case <-ch:
1592 return
1593 default:
1594 n.Action()
1595 }
1596 }
1597 }()
1598 srv.publish(10 * time.Nanosecond)
1599 ch <- struct{}{}
1600 <-ch
1601 }
1602
1603 func TestPublishV3(t *testing.T) {
1604 n := newNodeRecorder()
1605 ch := make(chan interface{}, 1)
1606
1607 ch <- &applyResult{}
1608 w := wait.NewWithResponse(ch)
1609 ctx, cancel := context.WithCancel(context.Background())
1610 lg := zaptest.NewLogger(t)
1611 be, _ := betesting.NewDefaultTmpBackend(t)
1612 defer betesting.Close(t, be)
1613 srv := &EtcdServer{
1614 lgMu: new(sync.RWMutex),
1615 lg: lg,
1616 readych: make(chan struct{}),
1617 Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
1618 id: 1,
1619 r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
1620 attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
1621 cluster: &membership.RaftCluster{},
1622 w: w,
1623 reqIDGen: idutil.NewGenerator(0, time.Time{}),
1624 SyncTicker: &time.Ticker{},
1625 authStore: auth.NewAuthStore(lg, be, nil, 0),
1626 be: be,
1627 ctx: ctx,
1628 cancel: cancel,
1629 }
1630 srv.publishV3(time.Hour)
1631
1632 action := n.Action()
1633 if len(action) != 1 {
1634 t.Fatalf("len(action) = %d, want 1", len(action))
1635 }
1636 if action[0].Name != "Propose" {
1637 t.Fatalf("action = %s, want Propose", action[0].Name)
1638 }
1639 data := action[0].Params[0].([]byte)
1640 var r pb.InternalRaftRequest
1641 if err := r.Unmarshal(data); err != nil {
1642 t.Fatalf("unmarshal request error: %v", err)
1643 }
1644 assert.Equal(t, &membershippb.ClusterMemberAttrSetRequest{Member_ID: 0x1, MemberAttributes: &membershippb.Attributes{
1645 Name: "node1", ClientUrls: []string{"http://a", "http://b"}}}, r.ClusterMemberAttrSet)
1646 }
1647
1648
1649 func TestPublishV3Stopped(t *testing.T) {
1650 ctx, cancel := context.WithCancel(context.Background())
1651 r := newRaftNode(raftNodeConfig{
1652 lg: zap.NewExample(),
1653 Node: newNodeNop(),
1654 transport: newNopTransporter(),
1655 })
1656 srv := &EtcdServer{
1657 lgMu: new(sync.RWMutex),
1658 lg: zap.NewExample(),
1659 Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
1660 r: *r,
1661 cluster: &membership.RaftCluster{},
1662 w: mockwait.NewNop(),
1663 done: make(chan struct{}),
1664 stopping: make(chan struct{}),
1665 stop: make(chan struct{}),
1666 reqIDGen: idutil.NewGenerator(0, time.Time{}),
1667 SyncTicker: &time.Ticker{},
1668
1669 ctx: ctx,
1670 cancel: cancel,
1671 }
1672 close(srv.stopping)
1673 srv.publishV3(time.Hour)
1674 }
1675
1676
1677 func TestPublishV3Retry(t *testing.T) {
1678 ctx, cancel := context.WithCancel(context.Background())
1679 n := newNodeRecorderStream()
1680
1681 lg := zaptest.NewLogger(t)
1682 be, _ := betesting.NewDefaultTmpBackend(t)
1683 defer betesting.Close(t, be)
1684 srv := &EtcdServer{
1685 lgMu: new(sync.RWMutex),
1686 lg: lg,
1687 readych: make(chan struct{}),
1688 Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
1689 id: 1,
1690 r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
1691 w: mockwait.NewNop(),
1692 stopping: make(chan struct{}),
1693 attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
1694 cluster: &membership.RaftCluster{},
1695 reqIDGen: idutil.NewGenerator(0, time.Time{}),
1696 SyncTicker: &time.Ticker{},
1697 authStore: auth.NewAuthStore(lg, be, nil, 0),
1698 be: be,
1699 ctx: ctx,
1700 cancel: cancel,
1701 }
1702
1703
1704 ch := make(chan struct{})
1705 go func() {
1706 defer close(ch)
1707 if action, err := n.Wait(2); err != nil {
1708 t.Errorf("len(action) = %d, want >= 2 (%v)", len(action), err)
1709 }
1710 close(srv.stopping)
1711
1712 for {
1713 select {
1714 case <-ch:
1715 return
1716 default:
1717 n.Action()
1718 }
1719 }
1720 }()
1721 srv.publishV3(10 * time.Nanosecond)
1722 ch <- struct{}{}
1723 <-ch
1724 }
1725
1726 func TestUpdateVersion(t *testing.T) {
1727 n := newNodeRecorder()
1728 ch := make(chan interface{}, 1)
1729
1730 ch <- Response{}
1731 w := wait.NewWithResponse(ch)
1732 ctx, cancel := context.WithCancel(context.TODO())
1733 srv := &EtcdServer{
1734 lgMu: new(sync.RWMutex),
1735 lg: zap.NewExample(),
1736 id: 1,
1737 Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
1738 r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
1739 attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
1740 cluster: &membership.RaftCluster{},
1741 w: w,
1742 reqIDGen: idutil.NewGenerator(0, time.Time{}),
1743 SyncTicker: &time.Ticker{},
1744
1745 ctx: ctx,
1746 cancel: cancel,
1747 }
1748 srv.updateClusterVersionV2("2.0.0")
1749
1750 action := n.Action()
1751 if len(action) != 1 {
1752 t.Fatalf("len(action) = %d, want 1", len(action))
1753 }
1754 if action[0].Name != "Propose" {
1755 t.Fatalf("action = %s, want Propose", action[0].Name)
1756 }
1757 data := action[0].Params[0].([]byte)
1758 var r pb.Request
1759 if err := r.Unmarshal(data); err != nil {
1760 t.Fatalf("unmarshal request error: %v", err)
1761 }
1762 if r.Method != "PUT" {
1763 t.Errorf("method = %s, want PUT", r.Method)
1764 }
1765 if wpath := path.Join(StoreClusterPrefix, "version"); r.Path != wpath {
1766 t.Errorf("path = %s, want %s", r.Path, wpath)
1767 }
1768 if r.Val != "2.0.0" {
1769 t.Errorf("val = %s, want %s", r.Val, "2.0.0")
1770 }
1771 }
1772
1773 func TestStopNotify(t *testing.T) {
1774 s := &EtcdServer{
1775 lgMu: new(sync.RWMutex),
1776 lg: zap.NewExample(),
1777 stop: make(chan struct{}),
1778 done: make(chan struct{}),
1779 }
1780 go func() {
1781 <-s.stop
1782 close(s.done)
1783 }()
1784
1785 notifier := s.StopNotify()
1786 select {
1787 case <-notifier:
1788 t.Fatalf("received unexpected stop notification")
1789 default:
1790 }
1791 s.Stop()
1792 select {
1793 case <-notifier:
1794 default:
1795 t.Fatalf("cannot receive stop notification")
1796 }
1797 }
1798
1799 func TestGetOtherPeerURLs(t *testing.T) {
1800 lg := zaptest.NewLogger(t)
1801 tests := []struct {
1802 membs []*membership.Member
1803 wurls []string
1804 }{
1805 {
1806 []*membership.Member{
1807 membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
1808 },
1809 []string{},
1810 },
1811 {
1812 []*membership.Member{
1813 membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
1814 membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil),
1815 membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil),
1816 },
1817 []string{"http://10.0.0.2:2", "http://10.0.0.3:3"},
1818 },
1819 {
1820 []*membership.Member{
1821 membership.NewMember("1", types.MustNewURLs([]string{"http://10.0.0.1:1"}), "a", nil),
1822 membership.NewMember("3", types.MustNewURLs([]string{"http://10.0.0.3:3"}), "a", nil),
1823 membership.NewMember("2", types.MustNewURLs([]string{"http://10.0.0.2:2"}), "a", nil),
1824 },
1825 []string{"http://10.0.0.2:2", "http://10.0.0.3:3"},
1826 },
1827 }
1828 for i, tt := range tests {
1829 cl := membership.NewClusterFromMembers(lg, types.ID(0), tt.membs)
1830 self := "1"
1831 urls := getRemotePeerURLs(cl, self)
1832 if !reflect.DeepEqual(urls, tt.wurls) {
1833 t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls)
1834 }
1835 }
1836 }
1837
1838 type nodeRecorder struct{ testutil.Recorder }
1839
1840 func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} }
1841 func newNodeRecorderStream() *nodeRecorder { return &nodeRecorder{testutil.NewRecorderStream()} }
1842 func newNodeNop() raft.Node { return newNodeRecorder() }
1843
1844 func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }
1845 func (n *nodeRecorder) Campaign(ctx context.Context) error {
1846 n.Record(testutil.Action{Name: "Campaign"})
1847 return nil
1848 }
1849 func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
1850 n.Record(testutil.Action{Name: "Propose", Params: []interface{}{data}})
1851 return nil
1852 }
1853 func (n *nodeRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error {
1854 n.Record(testutil.Action{Name: "ProposeConfChange"})
1855 return nil
1856 }
1857 func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
1858 n.Record(testutil.Action{Name: "Step"})
1859 return nil
1860 }
1861 func (n *nodeRecorder) Status() raft.Status { return raft.Status{} }
1862 func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
1863 func (n *nodeRecorder) TransferLeadership(ctx context.Context, lead, transferee uint64) {}
1864 func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil }
1865 func (n *nodeRecorder) Advance() {}
1866 func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState {
1867 n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
1868 return &raftpb.ConfState{}
1869 }
1870
1871 func (n *nodeRecorder) Stop() {
1872 n.Record(testutil.Action{Name: "Stop"})
1873 }
1874
1875 func (n *nodeRecorder) ReportUnreachable(id uint64) {}
1876
1877 func (n *nodeRecorder) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
1878
1879 func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
1880 n.Record(testutil.Action{Name: "Compact"})
1881 }
1882
1883 type nodeProposalBlockerRecorder struct {
1884 nodeRecorder
1885 }
1886
1887 func newProposalBlockerRecorder() *nodeProposalBlockerRecorder {
1888 return &nodeProposalBlockerRecorder{*newNodeRecorderStream()}
1889 }
1890
1891 func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
1892 <-ctx.Done()
1893 n.Record(testutil.Action{Name: "Propose blocked"})
1894 return nil
1895 }
1896
1897
1898 type readyNode struct {
1899 nodeRecorder
1900 readyc chan raft.Ready
1901 }
1902
1903 func newReadyNode() *readyNode {
1904 return &readyNode{
1905 nodeRecorder{testutil.NewRecorderStream()},
1906 make(chan raft.Ready, 1)}
1907 }
1908 func newNopReadyNode() *readyNode {
1909 return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)}
1910 }
1911
1912 func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
1913
1914 type nodeConfChangeCommitterRecorder struct {
1915 readyNode
1916 index uint64
1917 }
1918
1919 func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
1920 return &nodeConfChangeCommitterRecorder{*newNopReadyNode(), 0}
1921 }
1922
1923 func newNodeConfChangeCommitterStream() *nodeConfChangeCommitterRecorder {
1924 return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0}
1925 }
1926
1927 func confChangeActionName(conf raftpb.ConfChangeI) string {
1928 var s string
1929 if confV1, ok := conf.AsV1(); ok {
1930 s = confV1.Type.String()
1931 } else {
1932 for i, chg := range conf.AsV2().Changes {
1933 if i > 0 {
1934 s += "/"
1935 }
1936 s += chg.Type.String()
1937 }
1938 }
1939 return s
1940 }
1941
1942 func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChangeI) error {
1943 typ, data, err := raftpb.MarshalConfChange(conf)
1944 if err != nil {
1945 return err
1946 }
1947
1948 n.index++
1949 n.Record(testutil.Action{Name: "ProposeConfChange:" + confChangeActionName(conf)})
1950 n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Index: n.index, Type: typ, Data: data}}}
1951 return nil
1952 }
1953 func (n *nodeConfChangeCommitterRecorder) Ready() <-chan raft.Ready {
1954 return n.readyc
1955 }
1956 func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChangeI) *raftpb.ConfState {
1957 n.Record(testutil.Action{Name: "ApplyConfChange:" + confChangeActionName(conf)})
1958 return &raftpb.ConfState{}
1959 }
1960
1961
1962 type nodeCommitter struct {
1963 readyNode
1964 index uint64
1965 }
1966
1967 func newNodeCommitter() raft.Node {
1968 return &nodeCommitter{*newNopReadyNode(), 0}
1969 }
1970 func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
1971 n.index++
1972 ents := []raftpb.Entry{{Index: n.index, Data: data}}
1973 n.readyc <- raft.Ready{
1974 Entries: ents,
1975 CommittedEntries: ents,
1976 }
1977 return nil
1978 }
1979
1980 func newTestCluster(t testing.TB, membs []*membership.Member) *membership.RaftCluster {
1981 c := membership.NewCluster(zaptest.NewLogger(t))
1982 for _, m := range membs {
1983 c.AddMember(m, true)
1984 }
1985 return c
1986 }
1987
1988 type nopTransporter struct{}
1989
1990 func newNopTransporter() rafthttp.Transporter {
1991 return &nopTransporter{}
1992 }
1993
1994 func (s *nopTransporter) Start() error { return nil }
1995 func (s *nopTransporter) Handler() http.Handler { return nil }
1996 func (s *nopTransporter) Send(m []raftpb.Message) {}
1997 func (s *nopTransporter) SendSnapshot(m snap.Message) {}
1998 func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
1999 func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
2000 func (s *nopTransporter) RemovePeer(id types.ID) {}
2001 func (s *nopTransporter) RemoveAllPeers() {}
2002 func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
2003 func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
2004 func (s *nopTransporter) ActivePeers() int { return 0 }
2005 func (s *nopTransporter) Stop() {}
2006 func (s *nopTransporter) Pause() {}
2007 func (s *nopTransporter) Resume() {}
2008
2009 type snapTransporter struct {
2010 nopTransporter
2011 snapDoneC chan snap.Message
2012 snapDir string
2013 }
2014
2015 func newSnapTransporter(snapDir string) (rafthttp.Transporter, <-chan snap.Message) {
2016 ch := make(chan snap.Message, 1)
2017 tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir}
2018 return tr, ch
2019 }
2020
2021 func (s *snapTransporter) SendSnapshot(m snap.Message) {
2022 ss := snap.New(zap.NewExample(), s.snapDir)
2023 ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
2024 m.CloseWithError(nil)
2025 s.snapDoneC <- m
2026 }
2027
2028 type sendMsgAppRespTransporter struct {
2029 nopTransporter
2030 sendC chan int
2031 }
2032
2033 func newSendMsgAppRespTransporter() (rafthttp.Transporter, <-chan int) {
2034 ch := make(chan int, 1)
2035 tr := &sendMsgAppRespTransporter{sendC: ch}
2036 return tr, ch
2037 }
2038
2039 func (s *sendMsgAppRespTransporter) Send(m []raftpb.Message) {
2040 var send int
2041 for _, msg := range m {
2042 if msg.To != 0 {
2043 send++
2044 }
2045 }
2046 s.sendC <- send
2047 }
2048
2049 func TestWaitAppliedIndex(t *testing.T) {
2050 cases := []struct {
2051 name string
2052 appliedIndex uint64
2053 committedIndex uint64
2054 action func(s *EtcdServer)
2055 ExpectedError error
2056 }{
2057 {
2058 name: "The applied Id is already equal to the commitId",
2059 appliedIndex: 10,
2060 committedIndex: 10,
2061 action: func(s *EtcdServer) {
2062 s.applyWait.Trigger(10)
2063 },
2064 ExpectedError: nil,
2065 },
2066 {
2067 name: "The etcd server has already stopped",
2068 appliedIndex: 10,
2069 committedIndex: 12,
2070 action: func(s *EtcdServer) {
2071 s.stopping <- struct{}{}
2072 },
2073 ExpectedError: ErrStopped,
2074 },
2075 {
2076 name: "Timed out waiting for the applied index",
2077 appliedIndex: 10,
2078 committedIndex: 12,
2079 action: nil,
2080 ExpectedError: ErrTimeoutWaitAppliedIndex,
2081 },
2082 }
2083 for _, tc := range cases {
2084 t.Run(tc.name, func(t *testing.T) {
2085 s := &EtcdServer{
2086 appliedIndex: tc.appliedIndex,
2087 committedIndex: tc.committedIndex,
2088 stopping: make(chan struct{}, 1),
2089 applyWait: wait.NewTimeList(),
2090 }
2091
2092 if tc.action != nil {
2093 go tc.action(s)
2094 }
2095
2096 err := s.waitAppliedIndex()
2097
2098 if err != tc.ExpectedError {
2099 t.Errorf("Unexpected error, want (%v), got (%v)", tc.ExpectedError, err)
2100 }
2101 })
2102 }
2103 }
2104
View as plain text