...

Source file src/go.etcd.io/etcd/raft/v3/rawnode_test.go

Documentation: go.etcd.io/etcd/raft/v3

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package raft
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"fmt"
    21  	"math"
    22  	"reflect"
    23  	"testing"
    24  
    25  	"go.etcd.io/etcd/raft/v3/quorum"
    26  	pb "go.etcd.io/etcd/raft/v3/raftpb"
    27  	"go.etcd.io/etcd/raft/v3/tracker"
    28  )
    29  
    30  // rawNodeAdapter is essentially a lint that makes sure that RawNode implements
    31  // "most of" Node. The exceptions (some of which are easy to fix) are listed
    32  // below.
    33  type rawNodeAdapter struct {
    34  	*RawNode
    35  }
    36  
    37  var _ Node = (*rawNodeAdapter)(nil)
    38  
    39  // TransferLeadership is to test when node specifies lead, which is pointless, can just be filled in.
    40  func (a *rawNodeAdapter) TransferLeadership(ctx context.Context, lead, transferee uint64) {
    41  	a.RawNode.TransferLeader(transferee)
    42  }
    43  
    44  // Stop when node has a goroutine, RawNode doesn't need this.
    45  func (a *rawNodeAdapter) Stop() {}
    46  
    47  // Status returns RawNode's status as *Status.
    48  func (a *rawNodeAdapter) Status() Status { return a.RawNode.Status() }
    49  
    50  // Advance is when RawNode takes a Ready. It doesn't really have to do that I think? It can hold on
    51  // to it internally. But maybe that approach is frail.
    52  func (a *rawNodeAdapter) Advance() { a.RawNode.Advance(Ready{}) }
    53  
    54  // Ready when RawNode returns a Ready, not a chan of one.
    55  func (a *rawNodeAdapter) Ready() <-chan Ready { return nil }
    56  
    57  // Node takes more contexts. Easy enough to fix.
    58  
    59  func (a *rawNodeAdapter) Campaign(context.Context) error { return a.RawNode.Campaign() }
    60  func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error {
    61  	a.RawNode.ReadIndex(rctx)
    62  	// RawNode swallowed the error in ReadIndex, it probably should not do that.
    63  	return nil
    64  }
    65  func (a *rawNodeAdapter) Step(_ context.Context, m pb.Message) error { return a.RawNode.Step(m) }
    66  func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error {
    67  	return a.RawNode.Propose(data)
    68  }
    69  func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc pb.ConfChangeI) error {
    70  	return a.RawNode.ProposeConfChange(cc)
    71  }
    72  
    73  // TestRawNodeStep ensures that RawNode.Step ignore local message.
    74  func TestRawNodeStep(t *testing.T) {
    75  	for i, msgn := range pb.MessageType_name {
    76  		t.Run(msgn, func(t *testing.T) {
    77  			s := NewMemoryStorage()
    78  			s.SetHardState(pb.HardState{Term: 1, Commit: 1})
    79  			s.Append([]pb.Entry{{Term: 1, Index: 1}})
    80  			if err := s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{
    81  				ConfState: pb.ConfState{
    82  					Voters: []uint64{1},
    83  				},
    84  				Index: 1,
    85  				Term:  1,
    86  			}}); err != nil {
    87  				t.Fatal(err)
    88  			}
    89  			// Append an empty entry to make sure the non-local messages (like
    90  			// vote requests) are ignored and don't trigger assertions.
    91  			rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
    92  			if err != nil {
    93  				t.Fatal(err)
    94  			}
    95  			msgt := pb.MessageType(i)
    96  			err = rawNode.Step(pb.Message{Type: msgt})
    97  			// LocalMsg should be ignored.
    98  			if IsLocalMsg(msgt) {
    99  				if err != ErrStepLocalMsg {
   100  					t.Errorf("%d: step should ignore %s", msgt, msgn)
   101  				}
   102  			}
   103  		})
   104  	}
   105  }
   106  
   107  // TestNodeStepUnblock from node_test.go has no equivalent in rawNode because there is
   108  // no goroutine in RawNode.
   109  
   110  // TestRawNodeProposeAndConfChange tests the configuration change mechanism. Each
   111  // test case sends a configuration change which is either simple or joint, verifies
   112  // that it applies and that the resulting ConfState matches expectations, and for
   113  // joint configurations makes sure that they are exited successfully.
   114  func TestRawNodeProposeAndConfChange(t *testing.T) {
   115  	testCases := []struct {
   116  		cc   pb.ConfChangeI
   117  		exp  pb.ConfState
   118  		exp2 *pb.ConfState
   119  	}{
   120  		// V1 config change.
   121  		{
   122  			pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 2},
   123  			pb.ConfState{Voters: []uint64{1, 2}},
   124  			nil,
   125  		},
   126  		// Proposing the same as a V2 change works just the same, without entering
   127  		// a joint config.
   128  		{
   129  			pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
   130  				{Type: pb.ConfChangeAddNode, NodeID: 2},
   131  			},
   132  			},
   133  			pb.ConfState{Voters: []uint64{1, 2}},
   134  			nil,
   135  		},
   136  		// Ditto if we add it as a learner instead.
   137  		{
   138  			pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
   139  				{Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
   140  			},
   141  			},
   142  			pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}},
   143  			nil,
   144  		},
   145  		// We can ask explicitly for joint consensus if we want it.
   146  		{
   147  			pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
   148  				{Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
   149  			},
   150  				Transition: pb.ConfChangeTransitionJointExplicit,
   151  			},
   152  			pb.ConfState{Voters: []uint64{1}, VotersOutgoing: []uint64{1}, Learners: []uint64{2}},
   153  			&pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}},
   154  		},
   155  		// Ditto, but with implicit transition (the harness checks this).
   156  		{
   157  			pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
   158  				{Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
   159  			},
   160  				Transition: pb.ConfChangeTransitionJointImplicit,
   161  			},
   162  			pb.ConfState{
   163  				Voters: []uint64{1}, VotersOutgoing: []uint64{1}, Learners: []uint64{2},
   164  				AutoLeave: true,
   165  			},
   166  			&pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}},
   167  		},
   168  		// Add a new node and demote n1. This exercises the interesting case in
   169  		// which we really need joint config changes and also need LearnersNext.
   170  		{
   171  			pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
   172  				{NodeID: 2, Type: pb.ConfChangeAddNode},
   173  				{NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
   174  				{NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
   175  			},
   176  			},
   177  			pb.ConfState{
   178  				Voters:         []uint64{2},
   179  				VotersOutgoing: []uint64{1},
   180  				Learners:       []uint64{3},
   181  				LearnersNext:   []uint64{1},
   182  				AutoLeave:      true,
   183  			},
   184  			&pb.ConfState{Voters: []uint64{2}, Learners: []uint64{1, 3}},
   185  		},
   186  		// Ditto explicit.
   187  		{
   188  			pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
   189  				{NodeID: 2, Type: pb.ConfChangeAddNode},
   190  				{NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
   191  				{NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
   192  			},
   193  				Transition: pb.ConfChangeTransitionJointExplicit,
   194  			},
   195  			pb.ConfState{
   196  				Voters:         []uint64{2},
   197  				VotersOutgoing: []uint64{1},
   198  				Learners:       []uint64{3},
   199  				LearnersNext:   []uint64{1},
   200  			},
   201  			&pb.ConfState{Voters: []uint64{2}, Learners: []uint64{1, 3}},
   202  		},
   203  		// Ditto implicit.
   204  		{
   205  			pb.ConfChangeV2{
   206  				Changes: []pb.ConfChangeSingle{
   207  					{NodeID: 2, Type: pb.ConfChangeAddNode},
   208  					{NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
   209  					{NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
   210  				},
   211  				Transition: pb.ConfChangeTransitionJointImplicit,
   212  			},
   213  			pb.ConfState{
   214  				Voters:         []uint64{2},
   215  				VotersOutgoing: []uint64{1},
   216  				Learners:       []uint64{3},
   217  				LearnersNext:   []uint64{1},
   218  				AutoLeave:      true,
   219  			},
   220  			&pb.ConfState{Voters: []uint64{2}, Learners: []uint64{1, 3}},
   221  		},
   222  	}
   223  
   224  	for _, tc := range testCases {
   225  		t.Run("", func(t *testing.T) {
   226  			s := newTestMemoryStorage(withPeers(1))
   227  			rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
   228  			if err != nil {
   229  				t.Fatal(err)
   230  			}
   231  
   232  			rawNode.Campaign()
   233  			proposed := false
   234  			var (
   235  				lastIndex uint64
   236  				ccdata    []byte
   237  			)
   238  			// Propose the ConfChange, wait until it applies, save the resulting
   239  			// ConfState.
   240  			var cs *pb.ConfState
   241  			for cs == nil {
   242  				rd := rawNode.Ready()
   243  				s.Append(rd.Entries)
   244  				for _, ent := range rd.CommittedEntries {
   245  					var cc pb.ConfChangeI
   246  					if ent.Type == pb.EntryConfChange {
   247  						var ccc pb.ConfChange
   248  						if err = ccc.Unmarshal(ent.Data); err != nil {
   249  							t.Fatal(err)
   250  						}
   251  						cc = ccc
   252  					} else if ent.Type == pb.EntryConfChangeV2 {
   253  						var ccc pb.ConfChangeV2
   254  						if err = ccc.Unmarshal(ent.Data); err != nil {
   255  							t.Fatal(err)
   256  						}
   257  						cc = ccc
   258  					}
   259  					if cc != nil {
   260  						cs = rawNode.ApplyConfChange(cc)
   261  					}
   262  				}
   263  				rawNode.Advance(rd)
   264  				// Once we are the leader, propose a command and a ConfChange.
   265  				if !proposed && rd.SoftState.Lead == rawNode.raft.id {
   266  					if err = rawNode.Propose([]byte("somedata")); err != nil {
   267  						t.Fatal(err)
   268  					}
   269  					if ccv1, ok := tc.cc.AsV1(); ok {
   270  						ccdata, err = ccv1.Marshal()
   271  						if err != nil {
   272  							t.Fatal(err)
   273  						}
   274  						rawNode.ProposeConfChange(ccv1)
   275  					} else {
   276  						ccv2 := tc.cc.AsV2()
   277  						ccdata, err = ccv2.Marshal()
   278  						if err != nil {
   279  							t.Fatal(err)
   280  						}
   281  						rawNode.ProposeConfChange(ccv2)
   282  					}
   283  					proposed = true
   284  				}
   285  			}
   286  
   287  			// Check that the last index is exactly the conf change we put in,
   288  			// down to the bits. Note that this comes from the Storage, which
   289  			// will not reflect any unstable entries that we'll only be presented
   290  			// with in the next Ready.
   291  			lastIndex, err = s.LastIndex()
   292  			if err != nil {
   293  				t.Fatal(err)
   294  			}
   295  
   296  			entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
   297  			if err != nil {
   298  				t.Fatal(err)
   299  			}
   300  			if len(entries) != 2 {
   301  				t.Fatalf("len(entries) = %d, want %d", len(entries), 2)
   302  			}
   303  			if !bytes.Equal(entries[0].Data, []byte("somedata")) {
   304  				t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
   305  			}
   306  			typ := pb.EntryConfChange
   307  			if _, ok := tc.cc.AsV1(); !ok {
   308  				typ = pb.EntryConfChangeV2
   309  			}
   310  			if entries[1].Type != typ {
   311  				t.Fatalf("type = %v, want %v", entries[1].Type, typ)
   312  			}
   313  			if !bytes.Equal(entries[1].Data, ccdata) {
   314  				t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
   315  			}
   316  
   317  			if exp := &tc.exp; !reflect.DeepEqual(exp, cs) {
   318  				t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs)
   319  			}
   320  
   321  			var maybePlusOne uint64
   322  			if autoLeave, ok := tc.cc.AsV2().EnterJoint(); ok && autoLeave {
   323  				// If this is an auto-leaving joint conf change, it will have
   324  				// appended the entry that auto-leaves, so add one to the last
   325  				// index that forms the basis of our expectations on
   326  				// pendingConfIndex. (Recall that lastIndex was taken from stable
   327  				// storage, but this auto-leaving entry isn't on stable storage
   328  				// yet).
   329  				maybePlusOne = 1
   330  			}
   331  			if exp, act := lastIndex+maybePlusOne, rawNode.raft.pendingConfIndex; exp != act {
   332  				t.Fatalf("pendingConfIndex: expected %d, got %d", exp, act)
   333  			}
   334  
   335  			// Move the RawNode along. If the ConfChange was simple, nothing else
   336  			// should happen. Otherwise, we're in a joint state, which is either
   337  			// left automatically or not. If not, we add the proposal that leaves
   338  			// it manually.
   339  			rd := rawNode.Ready()
   340  			var context []byte
   341  			if !tc.exp.AutoLeave {
   342  				if len(rd.Entries) > 0 {
   343  					t.Fatal("expected no more entries")
   344  				}
   345  				if tc.exp2 == nil {
   346  					return
   347  				}
   348  				context = []byte("manual")
   349  				t.Log("leaving joint state manually")
   350  				if err := rawNode.ProposeConfChange(pb.ConfChangeV2{Context: context}); err != nil {
   351  					t.Fatal(err)
   352  				}
   353  				rd = rawNode.Ready()
   354  			}
   355  
   356  			// Check that the right ConfChange comes out.
   357  			if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 {
   358  				t.Fatalf("expected exactly one more entry, got %+v", rd)
   359  			}
   360  			var cc pb.ConfChangeV2
   361  			if err := cc.Unmarshal(rd.Entries[0].Data); err != nil {
   362  				t.Fatal(err)
   363  			}
   364  			if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: context}) {
   365  				t.Fatalf("expected zero ConfChangeV2, got %+v", cc)
   366  			}
   367  			// Lie and pretend the ConfChange applied. It won't do so because now
   368  			// we require the joint quorum and we're only running one node.
   369  			cs = rawNode.ApplyConfChange(cc)
   370  			if exp := tc.exp2; !reflect.DeepEqual(exp, cs) {
   371  				t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs)
   372  			}
   373  		})
   374  	}
   375  }
   376  
   377  // TestRawNodeJointAutoLeave tests the configuration change auto leave even leader
   378  // lost leadership.
   379  func TestRawNodeJointAutoLeave(t *testing.T) {
   380  	testCc := pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
   381  		{Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
   382  	},
   383  		Transition: pb.ConfChangeTransitionJointImplicit,
   384  	}
   385  	expCs := pb.ConfState{
   386  		Voters: []uint64{1}, VotersOutgoing: []uint64{1}, Learners: []uint64{2},
   387  		AutoLeave: true,
   388  	}
   389  	exp2Cs := pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}}
   390  
   391  	t.Run("", func(t *testing.T) {
   392  		s := newTestMemoryStorage(withPeers(1))
   393  		rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
   394  		if err != nil {
   395  			t.Fatal(err)
   396  		}
   397  
   398  		rawNode.Campaign()
   399  		proposed := false
   400  		var (
   401  			lastIndex uint64
   402  			ccdata    []byte
   403  		)
   404  		// Propose the ConfChange, wait until it applies, save the resulting
   405  		// ConfState.
   406  		var cs *pb.ConfState
   407  		for cs == nil {
   408  			rd := rawNode.Ready()
   409  			s.Append(rd.Entries)
   410  			for _, ent := range rd.CommittedEntries {
   411  				var cc pb.ConfChangeI
   412  				if ent.Type == pb.EntryConfChangeV2 {
   413  					var ccc pb.ConfChangeV2
   414  					if err = ccc.Unmarshal(ent.Data); err != nil {
   415  						t.Fatal(err)
   416  					}
   417  					cc = &ccc
   418  				}
   419  				if cc != nil {
   420  					// Force it step down.
   421  					rawNode.Step(pb.Message{Type: pb.MsgHeartbeatResp, From: 1, Term: rawNode.raft.Term + 1})
   422  					cs = rawNode.ApplyConfChange(cc)
   423  				}
   424  			}
   425  			rawNode.Advance(rd)
   426  			// Once we are the leader, propose a command and a ConfChange.
   427  			if !proposed && rd.SoftState.Lead == rawNode.raft.id {
   428  				if err = rawNode.Propose([]byte("somedata")); err != nil {
   429  					t.Fatal(err)
   430  				}
   431  				ccdata, err = testCc.Marshal()
   432  				if err != nil {
   433  					t.Fatal(err)
   434  				}
   435  				rawNode.ProposeConfChange(testCc)
   436  				proposed = true
   437  			}
   438  		}
   439  
   440  		// Check that the last index is exactly the conf change we put in,
   441  		// down to the bits. Note that this comes from the Storage, which
   442  		// will not reflect any unstable entries that we'll only be presented
   443  		// with in the next Ready.
   444  		lastIndex, err = s.LastIndex()
   445  		if err != nil {
   446  			t.Fatal(err)
   447  		}
   448  
   449  		entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
   450  		if err != nil {
   451  			t.Fatal(err)
   452  		}
   453  		if len(entries) != 2 {
   454  			t.Fatalf("len(entries) = %d, want %d", len(entries), 2)
   455  		}
   456  		if !bytes.Equal(entries[0].Data, []byte("somedata")) {
   457  			t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
   458  		}
   459  		if entries[1].Type != pb.EntryConfChangeV2 {
   460  			t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChangeV2)
   461  		}
   462  		if !bytes.Equal(entries[1].Data, ccdata) {
   463  			t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
   464  		}
   465  
   466  		if !reflect.DeepEqual(&expCs, cs) {
   467  			t.Fatalf("exp:\n%+v\nact:\n%+v", expCs, cs)
   468  		}
   469  
   470  		if rawNode.raft.pendingConfIndex != 0 {
   471  			t.Fatalf("pendingConfIndex: expected %d, got %d", 0, rawNode.raft.pendingConfIndex)
   472  		}
   473  
   474  		// Move the RawNode along. It should not leave joint because it's follower.
   475  		rd := rawNode.readyWithoutAccept()
   476  		// Check that the right ConfChange comes out.
   477  		if len(rd.Entries) != 0 {
   478  			t.Fatalf("expected zero entry, got %+v", rd)
   479  		}
   480  
   481  		// Make it leader again. It should leave joint automatically after moving apply index.
   482  		rawNode.Campaign()
   483  		rd = rawNode.Ready()
   484  		s.Append(rd.Entries)
   485  		rawNode.Advance(rd)
   486  		rd = rawNode.Ready()
   487  		s.Append(rd.Entries)
   488  
   489  		// Check that the right ConfChange comes out.
   490  		if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 {
   491  			t.Fatalf("expected exactly one more entry, got %+v", rd)
   492  		}
   493  		var cc pb.ConfChangeV2
   494  		if err := cc.Unmarshal(rd.Entries[0].Data); err != nil {
   495  			t.Fatal(err)
   496  		}
   497  		if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: nil}) {
   498  			t.Fatalf("expected zero ConfChangeV2, got %+v", cc)
   499  		}
   500  		// Lie and pretend the ConfChange applied. It won't do so because now
   501  		// we require the joint quorum and we're only running one node.
   502  		cs = rawNode.ApplyConfChange(cc)
   503  		if exp := exp2Cs; !reflect.DeepEqual(&exp, cs) {
   504  			t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs)
   505  		}
   506  	})
   507  }
   508  
   509  // TestRawNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
   510  // not affect the later propose to add new node.
   511  func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
   512  	s := newTestMemoryStorage(withPeers(1))
   513  	rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
   514  	if err != nil {
   515  		t.Fatal(err)
   516  	}
   517  	rd := rawNode.Ready()
   518  	s.Append(rd.Entries)
   519  	rawNode.Advance(rd)
   520  
   521  	rawNode.Campaign()
   522  	for {
   523  		rd = rawNode.Ready()
   524  		s.Append(rd.Entries)
   525  		if rd.SoftState.Lead == rawNode.raft.id {
   526  			rawNode.Advance(rd)
   527  			break
   528  		}
   529  		rawNode.Advance(rd)
   530  	}
   531  
   532  	proposeConfChangeAndApply := func(cc pb.ConfChange) {
   533  		rawNode.ProposeConfChange(cc)
   534  		rd = rawNode.Ready()
   535  		s.Append(rd.Entries)
   536  		for _, entry := range rd.CommittedEntries {
   537  			if entry.Type == pb.EntryConfChange {
   538  				var cc pb.ConfChange
   539  				cc.Unmarshal(entry.Data)
   540  				rawNode.ApplyConfChange(cc)
   541  			}
   542  		}
   543  		rawNode.Advance(rd)
   544  	}
   545  
   546  	cc1 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1}
   547  	ccdata1, err := cc1.Marshal()
   548  	if err != nil {
   549  		t.Fatal(err)
   550  	}
   551  	proposeConfChangeAndApply(cc1)
   552  
   553  	// try to add the same node again
   554  	proposeConfChangeAndApply(cc1)
   555  
   556  	// the new node join should be ok
   557  	cc2 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 2}
   558  	ccdata2, err := cc2.Marshal()
   559  	if err != nil {
   560  		t.Fatal(err)
   561  	}
   562  	proposeConfChangeAndApply(cc2)
   563  
   564  	lastIndex, err := s.LastIndex()
   565  	if err != nil {
   566  		t.Fatal(err)
   567  	}
   568  
   569  	// the last three entries should be: ConfChange cc1, cc1, cc2
   570  	entries, err := s.Entries(lastIndex-2, lastIndex+1, noLimit)
   571  	if err != nil {
   572  		t.Fatal(err)
   573  	}
   574  	if len(entries) != 3 {
   575  		t.Fatalf("len(entries) = %d, want %d", len(entries), 3)
   576  	}
   577  	if !bytes.Equal(entries[0].Data, ccdata1) {
   578  		t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, ccdata1)
   579  	}
   580  	if !bytes.Equal(entries[2].Data, ccdata2) {
   581  		t.Errorf("entries[2].Data = %v, want %v", entries[2].Data, ccdata2)
   582  	}
   583  }
   584  
   585  // TestRawNodeReadIndex ensures that Rawnode.ReadIndex sends the MsgReadIndex message
   586  // to the underlying raft. It also ensures that ReadState can be read out.
   587  func TestRawNodeReadIndex(t *testing.T) {
   588  	msgs := []pb.Message{}
   589  	appendStep := func(r *raft, m pb.Message) error {
   590  		msgs = append(msgs, m)
   591  		return nil
   592  	}
   593  	wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
   594  
   595  	s := newTestMemoryStorage(withPeers(1))
   596  	c := newTestConfig(1, 10, 1, s)
   597  	rawNode, err := NewRawNode(c)
   598  	if err != nil {
   599  		t.Fatal(err)
   600  	}
   601  	rawNode.raft.readStates = wrs
   602  	// ensure the ReadStates can be read out
   603  	hasReady := rawNode.HasReady()
   604  	if !hasReady {
   605  		t.Errorf("HasReady() returns %t, want %t", hasReady, true)
   606  	}
   607  	rd := rawNode.Ready()
   608  	if !reflect.DeepEqual(rd.ReadStates, wrs) {
   609  		t.Errorf("ReadStates = %d, want %d", rd.ReadStates, wrs)
   610  	}
   611  	s.Append(rd.Entries)
   612  	rawNode.Advance(rd)
   613  	// ensure raft.readStates is reset after advance
   614  	if rawNode.raft.readStates != nil {
   615  		t.Errorf("readStates = %v, want %v", rawNode.raft.readStates, nil)
   616  	}
   617  
   618  	wrequestCtx := []byte("somedata2")
   619  	rawNode.Campaign()
   620  	for {
   621  		rd = rawNode.Ready()
   622  		s.Append(rd.Entries)
   623  
   624  		if rd.SoftState.Lead == rawNode.raft.id {
   625  			rawNode.Advance(rd)
   626  
   627  			// Once we are the leader, issue a ReadIndex request
   628  			rawNode.raft.step = appendStep
   629  			rawNode.ReadIndex(wrequestCtx)
   630  			break
   631  		}
   632  		rawNode.Advance(rd)
   633  	}
   634  	// ensure that MsgReadIndex message is sent to the underlying raft
   635  	if len(msgs) != 1 {
   636  		t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
   637  	}
   638  	if msgs[0].Type != pb.MsgReadIndex {
   639  		t.Errorf("msg type = %d, want %d", msgs[0].Type, pb.MsgReadIndex)
   640  	}
   641  	if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
   642  		t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
   643  	}
   644  }
   645  
   646  // TestBlockProposal from node_test.go has no equivalent in rawNode because there is
   647  // no leader check in RawNode.
   648  
   649  // TestNodeTick from node_test.go has no equivalent in rawNode because
   650  // it reaches into the raft object which is not exposed.
   651  
   652  // TestNodeStop from node_test.go has no equivalent in rawNode because there is
   653  // no goroutine in RawNode.
   654  
   655  // TestRawNodeStart ensures that a node can be started correctly. Note that RawNode
   656  // requires the application to bootstrap the state, i.e. it does not accept peers
   657  // and will not create faux configuration change entries.
   658  func TestRawNodeStart(t *testing.T) {
   659  	want := Ready{
   660  		SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
   661  		HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1},
   662  		Entries: []pb.Entry{
   663  			{Term: 1, Index: 2, Data: nil},           // empty entry
   664  			{Term: 1, Index: 3, Data: []byte("foo")}, // empty entry
   665  		},
   666  		CommittedEntries: []pb.Entry{
   667  			{Term: 1, Index: 2, Data: nil},           // empty entry
   668  			{Term: 1, Index: 3, Data: []byte("foo")}, // empty entry
   669  		},
   670  		MustSync: true,
   671  	}
   672  
   673  	storage := NewMemoryStorage()
   674  	storage.ents[0].Index = 1
   675  
   676  	// TODO(tbg): this is a first prototype of what bootstrapping could look
   677  	// like (without the annoying faux ConfChanges). We want to persist a
   678  	// ConfState at some index and make sure that this index can't be reached
   679  	// from log position 1, so that followers are forced to pick up the
   680  	// ConfState in order to move away from log position 1 (unless they got
   681  	// bootstrapped in the same way already). Failing to do so would mean that
   682  	// followers diverge from the bootstrapped nodes and don't learn about the
   683  	// initial config.
   684  	//
   685  	// NB: this is exactly what CockroachDB does. The Raft log really begins at
   686  	// index 10, so empty followers (at index 1) always need a snapshot first.
   687  	type appenderStorage interface {
   688  		Storage
   689  		ApplySnapshot(pb.Snapshot) error
   690  	}
   691  	bootstrap := func(storage appenderStorage, cs pb.ConfState) error {
   692  		if len(cs.Voters) == 0 {
   693  			return fmt.Errorf("no voters specified")
   694  		}
   695  		fi, err := storage.FirstIndex()
   696  		if err != nil {
   697  			return err
   698  		}
   699  		if fi < 2 {
   700  			return fmt.Errorf("FirstIndex >= 2 is prerequisite for bootstrap")
   701  		}
   702  		if _, err = storage.Entries(fi, fi, math.MaxUint64); err == nil {
   703  			// TODO(tbg): match exact error
   704  			return fmt.Errorf("should not have been able to load first index")
   705  		}
   706  		li, err := storage.LastIndex()
   707  		if err != nil {
   708  			return err
   709  		}
   710  		if _, err = storage.Entries(li, li, math.MaxUint64); err == nil {
   711  			return fmt.Errorf("should not have been able to load last index")
   712  		}
   713  		hs, ics, err := storage.InitialState()
   714  		if err != nil {
   715  			return err
   716  		}
   717  		if !IsEmptyHardState(hs) {
   718  			return fmt.Errorf("HardState not empty")
   719  		}
   720  		if len(ics.Voters) != 0 {
   721  			return fmt.Errorf("ConfState not empty")
   722  		}
   723  
   724  		meta := pb.SnapshotMetadata{
   725  			Index:     1,
   726  			Term:      0,
   727  			ConfState: cs,
   728  		}
   729  		snap := pb.Snapshot{Metadata: meta}
   730  		return storage.ApplySnapshot(snap)
   731  	}
   732  
   733  	if err := bootstrap(storage, pb.ConfState{Voters: []uint64{1}}); err != nil {
   734  		t.Fatal(err)
   735  	}
   736  
   737  	rawNode, err := NewRawNode(newTestConfig(1, 10, 1, storage))
   738  	if err != nil {
   739  		t.Fatal(err)
   740  	}
   741  	if rawNode.HasReady() {
   742  		t.Fatalf("unexpected ready: %+v", rawNode.Ready())
   743  	}
   744  	rawNode.Campaign()
   745  	rawNode.Propose([]byte("foo"))
   746  	if !rawNode.HasReady() {
   747  		t.Fatal("expected a Ready")
   748  	}
   749  	rd := rawNode.Ready()
   750  	storage.Append(rd.Entries)
   751  	rawNode.Advance(rd)
   752  
   753  	rd.SoftState, want.SoftState = nil, nil
   754  
   755  	if !reflect.DeepEqual(rd, want) {
   756  		t.Fatalf("unexpected Ready:\n%+v\nvs\n%+v", rd, want)
   757  	}
   758  
   759  	if rawNode.HasReady() {
   760  		t.Errorf("unexpected Ready: %+v", rawNode.Ready())
   761  	}
   762  }
   763  
   764  func TestRawNodeRestart(t *testing.T) {
   765  	entries := []pb.Entry{
   766  		{Term: 1, Index: 1},
   767  		{Term: 1, Index: 2, Data: []byte("foo")},
   768  	}
   769  	st := pb.HardState{Term: 1, Commit: 1}
   770  
   771  	want := Ready{
   772  		HardState: emptyState,
   773  		// commit up to commit index in st
   774  		CommittedEntries: entries[:st.Commit],
   775  		MustSync:         false,
   776  	}
   777  
   778  	storage := newTestMemoryStorage(withPeers(1))
   779  	storage.SetHardState(st)
   780  	storage.Append(entries)
   781  	rawNode, err := NewRawNode(newTestConfig(1, 10, 1, storage))
   782  	if err != nil {
   783  		t.Fatal(err)
   784  	}
   785  	rd := rawNode.Ready()
   786  	if !reflect.DeepEqual(rd, want) {
   787  		t.Errorf("g = %+v,\n             w   %+v", rd, want)
   788  	}
   789  	rawNode.Advance(rd)
   790  	if rawNode.HasReady() {
   791  		t.Errorf("unexpected Ready: %+v", rawNode.Ready())
   792  	}
   793  }
   794  
   795  func TestRawNodeRestartFromSnapshot(t *testing.T) {
   796  	snap := pb.Snapshot{
   797  		Metadata: pb.SnapshotMetadata{
   798  			ConfState: pb.ConfState{Voters: []uint64{1, 2}},
   799  			Index:     2,
   800  			Term:      1,
   801  		},
   802  	}
   803  	entries := []pb.Entry{
   804  		{Term: 1, Index: 3, Data: []byte("foo")},
   805  	}
   806  	st := pb.HardState{Term: 1, Commit: 3}
   807  
   808  	want := Ready{
   809  		HardState: emptyState,
   810  		// commit up to commit index in st
   811  		CommittedEntries: entries,
   812  		MustSync:         false,
   813  	}
   814  
   815  	s := NewMemoryStorage()
   816  	s.SetHardState(st)
   817  	s.ApplySnapshot(snap)
   818  	s.Append(entries)
   819  	rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
   820  	if err != nil {
   821  		t.Fatal(err)
   822  	}
   823  	if rd := rawNode.Ready(); !reflect.DeepEqual(rd, want) {
   824  		t.Errorf("g = %+v,\n             w   %+v", rd, want)
   825  	} else {
   826  		rawNode.Advance(rd)
   827  	}
   828  	if rawNode.HasReady() {
   829  		t.Errorf("unexpected Ready: %+v", rawNode.HasReady())
   830  	}
   831  }
   832  
   833  // TestNodeAdvance from node_test.go has no equivalent in rawNode because there is
   834  // no dependency check between Ready() and Advance()
   835  
   836  func TestRawNodeStatus(t *testing.T) {
   837  	s := newTestMemoryStorage(withPeers(1))
   838  	rn, err := NewRawNode(newTestConfig(1, 10, 1, s))
   839  	if err != nil {
   840  		t.Fatal(err)
   841  	}
   842  	if status := rn.Status(); status.Progress != nil {
   843  		t.Fatalf("expected no Progress because not leader: %+v", status.Progress)
   844  	}
   845  	if err := rn.Campaign(); err != nil {
   846  		t.Fatal(err)
   847  	}
   848  	status := rn.Status()
   849  	if status.Lead != 1 {
   850  		t.Fatal("not lead")
   851  	}
   852  	if status.RaftState != StateLeader {
   853  		t.Fatal("not leader")
   854  	}
   855  	if exp, act := *rn.raft.prs.Progress[1], status.Progress[1]; !reflect.DeepEqual(exp, act) {
   856  		t.Fatalf("want: %+v\ngot:  %+v", exp, act)
   857  	}
   858  	expCfg := tracker.Config{Voters: quorum.JointConfig{
   859  		quorum.MajorityConfig{1: {}},
   860  		nil,
   861  	}}
   862  	if !reflect.DeepEqual(expCfg, status.Config) {
   863  		t.Fatalf("want: %+v\ngot:  %+v", expCfg, status.Config)
   864  	}
   865  }
   866  
   867  // TestRawNodeCommitPaginationAfterRestart is the RawNode version of
   868  // TestNodeCommitPaginationAfterRestart. The anomaly here was even worse as the
   869  // Raft group would forget to apply entries:
   870  //
   871  //   - node learns that index 11 is committed
   872  //   - nextEnts returns index 1..10 in CommittedEntries (but index 10 already
   873  //     exceeds maxBytes), which isn't noticed internally by Raft
   874  //   - Commit index gets bumped to 10
   875  //   - the node persists the HardState, but crashes before applying the entries
   876  //   - upon restart, the storage returns the same entries, but `slice` takes a
   877  //     different code path and removes the last entry.
   878  //   - Raft does not emit a HardState, but when the app calls Advance(), it bumps
   879  //     its internal applied index cursor to 10 (when it should be 9)
   880  //   - the next Ready asks the app to apply index 11 (omitting index 10), losing a
   881  //     write.
   882  func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
   883  	s := &ignoreSizeHintMemStorage{
   884  		MemoryStorage: newTestMemoryStorage(withPeers(1)),
   885  	}
   886  	persistedHardState := pb.HardState{
   887  		Term:   1,
   888  		Vote:   1,
   889  		Commit: 10,
   890  	}
   891  
   892  	s.hardState = persistedHardState
   893  	s.ents = make([]pb.Entry, 10)
   894  	var size uint64
   895  	for i := range s.ents {
   896  		ent := pb.Entry{
   897  			Term:  1,
   898  			Index: uint64(i + 1),
   899  			Type:  pb.EntryNormal,
   900  			Data:  []byte("a"),
   901  		}
   902  
   903  		s.ents[i] = ent
   904  		size += uint64(ent.Size())
   905  	}
   906  
   907  	cfg := newTestConfig(1, 10, 1, s)
   908  	// Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
   909  	// not be included in the initial rd.CommittedEntries. However, our storage will ignore
   910  	// this and *will* return it (which is how the Commit index ended up being 10 initially).
   911  	cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
   912  
   913  	s.ents = append(s.ents, pb.Entry{
   914  		Term:  1,
   915  		Index: uint64(11),
   916  		Type:  pb.EntryNormal,
   917  		Data:  []byte("boom"),
   918  	})
   919  
   920  	rawNode, err := NewRawNode(cfg)
   921  	if err != nil {
   922  		t.Fatal(err)
   923  	}
   924  
   925  	for highestApplied := uint64(0); highestApplied != 11; {
   926  		rd := rawNode.Ready()
   927  		n := len(rd.CommittedEntries)
   928  		if n == 0 {
   929  			t.Fatalf("stopped applying entries at index %d", highestApplied)
   930  		}
   931  		if next := rd.CommittedEntries[0].Index; highestApplied != 0 && highestApplied+1 != next {
   932  			t.Fatalf("attempting to apply index %d after index %d, leaving a gap", next, highestApplied)
   933  		}
   934  		highestApplied = rd.CommittedEntries[n-1].Index
   935  		rawNode.Advance(rd)
   936  		rawNode.Step(pb.Message{
   937  			Type:   pb.MsgHeartbeat,
   938  			To:     1,
   939  			From:   1, // illegal, but we get away with it
   940  			Term:   1,
   941  			Commit: 11,
   942  		})
   943  	}
   944  }
   945  
   946  // TestRawNodeBoundedLogGrowthWithPartition tests a scenario where a leader is
   947  // partitioned from a quorum of nodes. It verifies that the leader's log is
   948  // protected from unbounded growth even as new entries continue to be proposed.
   949  // This protection is provided by the MaxUncommittedEntriesSize configuration.
   950  func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
   951  	const maxEntries = 16
   952  	data := []byte("testdata")
   953  	testEntry := pb.Entry{Data: data}
   954  	maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
   955  
   956  	s := newTestMemoryStorage(withPeers(1))
   957  	cfg := newTestConfig(1, 10, 1, s)
   958  	cfg.MaxUncommittedEntriesSize = maxEntrySize
   959  	rawNode, err := NewRawNode(cfg)
   960  	if err != nil {
   961  		t.Fatal(err)
   962  	}
   963  	rd := rawNode.Ready()
   964  	s.Append(rd.Entries)
   965  	rawNode.Advance(rd)
   966  
   967  	// Become the leader.
   968  	rawNode.Campaign()
   969  	for {
   970  		rd = rawNode.Ready()
   971  		s.Append(rd.Entries)
   972  		if rd.SoftState.Lead == rawNode.raft.id {
   973  			rawNode.Advance(rd)
   974  			break
   975  		}
   976  		rawNode.Advance(rd)
   977  	}
   978  
   979  	// Simulate a network partition while we make our proposals by never
   980  	// committing anything. These proposals should not cause the leader's
   981  	// log to grow indefinitely.
   982  	for i := 0; i < 1024; i++ {
   983  		rawNode.Propose(data)
   984  	}
   985  
   986  	// Check the size of leader's uncommitted log tail. It should not exceed the
   987  	// MaxUncommittedEntriesSize limit.
   988  	checkUncommitted := func(exp uint64) {
   989  		t.Helper()
   990  		if a := rawNode.raft.uncommittedSize; exp != a {
   991  			t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
   992  		}
   993  	}
   994  	checkUncommitted(maxEntrySize)
   995  
   996  	// Recover from the partition. The uncommitted tail of the Raft log should
   997  	// disappear as entries are committed.
   998  	rd = rawNode.Ready()
   999  	if len(rd.CommittedEntries) != maxEntries {
  1000  		t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
  1001  	}
  1002  	s.Append(rd.Entries)
  1003  	rawNode.Advance(rd)
  1004  	checkUncommitted(0)
  1005  }
  1006  
  1007  func BenchmarkStatus(b *testing.B) {
  1008  	setup := func(members int) *RawNode {
  1009  		peers := make([]uint64, members)
  1010  		for i := range peers {
  1011  			peers[i] = uint64(i + 1)
  1012  		}
  1013  		cfg := newTestConfig(1, 3, 1, newTestMemoryStorage(withPeers(peers...)))
  1014  		cfg.Logger = discardLogger
  1015  		r := newRaft(cfg)
  1016  		r.becomeFollower(1, 1)
  1017  		r.becomeCandidate()
  1018  		r.becomeLeader()
  1019  		return &RawNode{raft: r}
  1020  	}
  1021  
  1022  	for _, members := range []int{1, 3, 5, 100} {
  1023  		b.Run(fmt.Sprintf("members=%d", members), func(b *testing.B) {
  1024  			rn := setup(members)
  1025  
  1026  			b.Run("Status", func(b *testing.B) {
  1027  				b.ReportAllocs()
  1028  				for i := 0; i < b.N; i++ {
  1029  					_ = rn.Status()
  1030  				}
  1031  			})
  1032  
  1033  			b.Run("Status-example", func(b *testing.B) {
  1034  				b.ReportAllocs()
  1035  				for i := 0; i < b.N; i++ {
  1036  					s := rn.Status()
  1037  					var n uint64
  1038  					for _, pr := range s.Progress {
  1039  						n += pr.Match
  1040  					}
  1041  					_ = n
  1042  				}
  1043  			})
  1044  
  1045  			b.Run("BasicStatus", func(b *testing.B) {
  1046  				b.ReportAllocs()
  1047  				for i := 0; i < b.N; i++ {
  1048  					_ = rn.BasicStatus()
  1049  				}
  1050  			})
  1051  
  1052  			b.Run("WithProgress", func(b *testing.B) {
  1053  				b.ReportAllocs()
  1054  				visit := func(uint64, ProgressType, tracker.Progress) {}
  1055  
  1056  				for i := 0; i < b.N; i++ {
  1057  					rn.WithProgress(visit)
  1058  				}
  1059  			})
  1060  			b.Run("WithProgress-example", func(b *testing.B) {
  1061  				b.ReportAllocs()
  1062  				for i := 0; i < b.N; i++ {
  1063  					var n uint64
  1064  					visit := func(_ uint64, _ ProgressType, pr tracker.Progress) {
  1065  						n += pr.Match
  1066  					}
  1067  					rn.WithProgress(visit)
  1068  					_ = n
  1069  				}
  1070  			})
  1071  		})
  1072  	}
  1073  }
  1074  
  1075  func TestRawNodeConsumeReady(t *testing.T) {
  1076  	// Check that readyWithoutAccept() does not call acceptReady (which resets
  1077  	// the messages) but Ready() does.
  1078  	s := newTestMemoryStorage(withPeers(1))
  1079  	rn := newTestRawNode(1, 3, 1, s)
  1080  	m1 := pb.Message{Context: []byte("foo")}
  1081  	m2 := pb.Message{Context: []byte("bar")}
  1082  
  1083  	// Inject first message, make sure it's visible via readyWithoutAccept.
  1084  	rn.raft.msgs = append(rn.raft.msgs, m1)
  1085  	rd := rn.readyWithoutAccept()
  1086  	if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
  1087  		t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
  1088  	}
  1089  	if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m1) {
  1090  		t.Fatalf("expected only m1 in raft.msgs, got %+v", rn.raft.msgs)
  1091  	}
  1092  	// Now call Ready() which should move the message into the Ready (as opposed
  1093  	// to leaving it in both places).
  1094  	rd = rn.Ready()
  1095  	if len(rn.raft.msgs) > 0 {
  1096  		t.Fatalf("messages not reset: %+v", rn.raft.msgs)
  1097  	}
  1098  	if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
  1099  		t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
  1100  	}
  1101  	// Add a message to raft to make sure that Advance() doesn't drop it.
  1102  	rn.raft.msgs = append(rn.raft.msgs, m2)
  1103  	rn.Advance(rd)
  1104  	if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m2) {
  1105  		t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs)
  1106  	}
  1107  }
  1108  

View as plain text