...

Source file src/go.etcd.io/etcd/raft/v3/node_test.go

Documentation: go.etcd.io/etcd/raft/v3

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package raft
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"fmt"
    21  	"math"
    22  	"reflect"
    23  	"strings"
    24  	"testing"
    25  	"time"
    26  
    27  	"go.etcd.io/etcd/client/pkg/v3/testutil"
    28  	"go.etcd.io/etcd/raft/v3/raftpb"
    29  )
    30  
    31  // readyWithTimeout selects from n.Ready() with a 1-second timeout. It
    32  // panics on timeout, which is better than the indefinite wait that
    33  // would occur if this channel were read without being wrapped in a
    34  // select.
    35  func readyWithTimeout(n Node) Ready {
    36  	select {
    37  	case rd := <-n.Ready():
    38  		return rd
    39  	case <-time.After(time.Second):
    40  		panic("timed out waiting for ready")
    41  	}
    42  }
    43  
    44  // TestNodeStep ensures that node.Step sends msgProp to propc chan
    45  // and other kinds of messages to recvc chan.
    46  func TestNodeStep(t *testing.T) {
    47  	for i, msgn := range raftpb.MessageType_name {
    48  		n := &node{
    49  			propc: make(chan msgWithResult, 1),
    50  			recvc: make(chan raftpb.Message, 1),
    51  		}
    52  		msgt := raftpb.MessageType(i)
    53  		n.Step(context.TODO(), raftpb.Message{Type: msgt})
    54  		// Proposal goes to proc chan. Others go to recvc chan.
    55  		if msgt == raftpb.MsgProp {
    56  			select {
    57  			case <-n.propc:
    58  			default:
    59  				t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn)
    60  			}
    61  		} else {
    62  			if IsLocalMsg(msgt) {
    63  				select {
    64  				case <-n.recvc:
    65  					t.Errorf("%d: step should ignore %s", msgt, msgn)
    66  				default:
    67  				}
    68  			} else {
    69  				select {
    70  				case <-n.recvc:
    71  				default:
    72  					t.Errorf("%d: cannot receive %s on recvc chan", msgt, msgn)
    73  				}
    74  			}
    75  		}
    76  	}
    77  }
    78  
    79  // TestNodeStepUnblock should Cancel and Stop should unblock Step()
    80  func TestNodeStepUnblock(t *testing.T) {
    81  	// a node without buffer to block step
    82  	n := &node{
    83  		propc: make(chan msgWithResult),
    84  		done:  make(chan struct{}),
    85  	}
    86  
    87  	ctx, cancel := context.WithCancel(context.Background())
    88  	stopFunc := func() { close(n.done) }
    89  
    90  	tests := []struct {
    91  		unblock func()
    92  		werr    error
    93  	}{
    94  		{stopFunc, ErrStopped},
    95  		{cancel, context.Canceled},
    96  	}
    97  
    98  	for i, tt := range tests {
    99  		errc := make(chan error, 1)
   100  		go func() {
   101  			err := n.Step(ctx, raftpb.Message{Type: raftpb.MsgProp})
   102  			errc <- err
   103  		}()
   104  		tt.unblock()
   105  		select {
   106  		case err := <-errc:
   107  			if err != tt.werr {
   108  				t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
   109  			}
   110  			//clean up side-effect
   111  			if ctx.Err() != nil {
   112  				ctx = context.TODO()
   113  			}
   114  			select {
   115  			case <-n.done:
   116  				n.done = make(chan struct{})
   117  			default:
   118  			}
   119  		case <-time.After(1 * time.Second):
   120  			t.Fatalf("#%d: failed to unblock step", i)
   121  		}
   122  	}
   123  }
   124  
   125  // TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
   126  func TestNodePropose(t *testing.T) {
   127  	msgs := []raftpb.Message{}
   128  	appendStep := func(r *raft, m raftpb.Message) error {
   129  		msgs = append(msgs, m)
   130  		return nil
   131  	}
   132  
   133  	s := newTestMemoryStorage(withPeers(1))
   134  	rn := newTestRawNode(1, 10, 1, s)
   135  	n := newNode(rn)
   136  	r := rn.raft
   137  	go n.run()
   138  	if err := n.Campaign(context.TODO()); err != nil {
   139  		t.Fatal(err)
   140  	}
   141  	for {
   142  		rd := <-n.Ready()
   143  		s.Append(rd.Entries)
   144  		// change the step function to appendStep until this raft becomes leader
   145  		if rd.SoftState.Lead == r.id {
   146  			r.step = appendStep
   147  			n.Advance()
   148  			break
   149  		}
   150  		n.Advance()
   151  	}
   152  	n.Propose(context.TODO(), []byte("somedata"))
   153  	n.Stop()
   154  
   155  	if len(msgs) != 1 {
   156  		t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
   157  	}
   158  	if msgs[0].Type != raftpb.MsgProp {
   159  		t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
   160  	}
   161  	if !bytes.Equal(msgs[0].Entries[0].Data, []byte("somedata")) {
   162  		t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, []byte("somedata"))
   163  	}
   164  }
   165  
   166  // TestNodeReadIndex ensures that node.ReadIndex sends the MsgReadIndex message to the underlying raft.
   167  // It also ensures that ReadState can be read out through ready chan.
   168  func TestNodeReadIndex(t *testing.T) {
   169  	msgs := []raftpb.Message{}
   170  	appendStep := func(r *raft, m raftpb.Message) error {
   171  		msgs = append(msgs, m)
   172  		return nil
   173  	}
   174  	wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
   175  
   176  	s := newTestMemoryStorage(withPeers(1))
   177  	rn := newTestRawNode(1, 10, 1, s)
   178  	n := newNode(rn)
   179  	r := rn.raft
   180  	r.readStates = wrs
   181  
   182  	go n.run()
   183  	n.Campaign(context.TODO())
   184  	for {
   185  		rd := <-n.Ready()
   186  		if !reflect.DeepEqual(rd.ReadStates, wrs) {
   187  			t.Errorf("ReadStates = %v, want %v", rd.ReadStates, wrs)
   188  		}
   189  
   190  		s.Append(rd.Entries)
   191  
   192  		if rd.SoftState.Lead == r.id {
   193  			n.Advance()
   194  			break
   195  		}
   196  		n.Advance()
   197  	}
   198  
   199  	r.step = appendStep
   200  	wrequestCtx := []byte("somedata2")
   201  	n.ReadIndex(context.TODO(), wrequestCtx)
   202  	n.Stop()
   203  
   204  	if len(msgs) != 1 {
   205  		t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
   206  	}
   207  	if msgs[0].Type != raftpb.MsgReadIndex {
   208  		t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex)
   209  	}
   210  	if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
   211  		t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
   212  	}
   213  }
   214  
   215  // TestDisableProposalForwarding ensures that proposals are not forwarded to
   216  // the leader when DisableProposalForwarding is true.
   217  func TestDisableProposalForwarding(t *testing.T) {
   218  	r1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   219  	r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   220  	cfg3 := newTestConfig(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   221  	cfg3.DisableProposalForwarding = true
   222  	r3 := newRaft(cfg3)
   223  	nt := newNetwork(r1, r2, r3)
   224  
   225  	// elect r1 as leader
   226  	nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
   227  
   228  	var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}
   229  
   230  	// send proposal to r2(follower) where DisableProposalForwarding is false
   231  	r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgProp, Entries: testEntries})
   232  
   233  	// verify r2(follower) does forward the proposal when DisableProposalForwarding is false
   234  	if len(r2.msgs) != 1 {
   235  		t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs))
   236  	}
   237  
   238  	// send proposal to r3(follower) where DisableProposalForwarding is true
   239  	r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgProp, Entries: testEntries})
   240  
   241  	// verify r3(follower) does not forward the proposal when DisableProposalForwarding is true
   242  	if len(r3.msgs) != 0 {
   243  		t.Fatalf("len(r3.msgs) expected 0, got %d", len(r3.msgs))
   244  	}
   245  }
   246  
   247  // TestNodeReadIndexToOldLeader ensures that raftpb.MsgReadIndex to old leader
   248  // gets forwarded to the new leader and 'send' method does not attach its term.
   249  func TestNodeReadIndexToOldLeader(t *testing.T) {
   250  	r1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   251  	r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   252  	r3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   253  
   254  	nt := newNetwork(r1, r2, r3)
   255  
   256  	// elect r1 as leader
   257  	nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
   258  
   259  	var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}
   260  
   261  	// send readindex request to r2(follower)
   262  	r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgReadIndex, Entries: testEntries})
   263  
   264  	// verify r2(follower) forwards this message to r1(leader) with term not set
   265  	if len(r2.msgs) != 1 {
   266  		t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs))
   267  	}
   268  	readIndxMsg1 := raftpb.Message{From: 2, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries}
   269  	if !reflect.DeepEqual(r2.msgs[0], readIndxMsg1) {
   270  		t.Fatalf("r2.msgs[0] expected %+v, got %+v", readIndxMsg1, r2.msgs[0])
   271  	}
   272  
   273  	// send readindex request to r3(follower)
   274  	r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries})
   275  
   276  	// verify r3(follower) forwards this message to r1(leader) with term not set as well.
   277  	if len(r3.msgs) != 1 {
   278  		t.Fatalf("len(r3.msgs) expected 1, got %d", len(r3.msgs))
   279  	}
   280  	readIndxMsg2 := raftpb.Message{From: 3, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries}
   281  	if !reflect.DeepEqual(r3.msgs[0], readIndxMsg2) {
   282  		t.Fatalf("r3.msgs[0] expected %+v, got %+v", readIndxMsg2, r3.msgs[0])
   283  	}
   284  
   285  	// now elect r3 as leader
   286  	nt.send(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgHup})
   287  
   288  	// let r1 steps the two messages previously we got from r2, r3
   289  	r1.Step(readIndxMsg1)
   290  	r1.Step(readIndxMsg2)
   291  
   292  	// verify r1(follower) forwards these messages again to r3(new leader)
   293  	if len(r1.msgs) != 2 {
   294  		t.Fatalf("len(r1.msgs) expected 1, got %d", len(r1.msgs))
   295  	}
   296  	readIndxMsg3 := raftpb.Message{From: 2, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries}
   297  	if !reflect.DeepEqual(r1.msgs[0], readIndxMsg3) {
   298  		t.Fatalf("r1.msgs[0] expected %+v, got %+v", readIndxMsg3, r1.msgs[0])
   299  	}
   300  	readIndxMsg3 = raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries}
   301  	if !reflect.DeepEqual(r1.msgs[1], readIndxMsg3) {
   302  		t.Fatalf("r1.msgs[1] expected %+v, got %+v", readIndxMsg3, r1.msgs[1])
   303  	}
   304  }
   305  
   306  // TestNodeProposeConfig ensures that node.ProposeConfChange sends the given configuration proposal
   307  // to the underlying raft.
   308  func TestNodeProposeConfig(t *testing.T) {
   309  	msgs := []raftpb.Message{}
   310  	appendStep := func(r *raft, m raftpb.Message) error {
   311  		msgs = append(msgs, m)
   312  		return nil
   313  	}
   314  
   315  	s := newTestMemoryStorage(withPeers(1))
   316  	rn := newTestRawNode(1, 10, 1, s)
   317  	n := newNode(rn)
   318  	r := rn.raft
   319  	go n.run()
   320  	n.Campaign(context.TODO())
   321  	for {
   322  		rd := <-n.Ready()
   323  		s.Append(rd.Entries)
   324  		// change the step function to appendStep until this raft becomes leader
   325  		if rd.SoftState.Lead == r.id {
   326  			r.step = appendStep
   327  			n.Advance()
   328  			break
   329  		}
   330  		n.Advance()
   331  	}
   332  	cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
   333  	ccdata, err := cc.Marshal()
   334  	if err != nil {
   335  		t.Fatal(err)
   336  	}
   337  	n.ProposeConfChange(context.TODO(), cc)
   338  	n.Stop()
   339  
   340  	if len(msgs) != 1 {
   341  		t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
   342  	}
   343  	if msgs[0].Type != raftpb.MsgProp {
   344  		t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
   345  	}
   346  	if !bytes.Equal(msgs[0].Entries[0].Data, ccdata) {
   347  		t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, ccdata)
   348  	}
   349  }
   350  
   351  // TestNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
   352  // not affect the later propose to add new node.
   353  func TestNodeProposeAddDuplicateNode(t *testing.T) {
   354  	s := newTestMemoryStorage(withPeers(1))
   355  	rn := newTestRawNode(1, 10, 1, s)
   356  	n := newNode(rn)
   357  	go n.run()
   358  	n.Campaign(context.TODO())
   359  	rdyEntries := make([]raftpb.Entry, 0)
   360  	ticker := time.NewTicker(time.Millisecond * 100)
   361  	defer ticker.Stop()
   362  	done := make(chan struct{})
   363  	stop := make(chan struct{})
   364  	applyConfChan := make(chan struct{})
   365  
   366  	go func() {
   367  		defer close(done)
   368  		for {
   369  			select {
   370  			case <-stop:
   371  				return
   372  			case <-ticker.C:
   373  				n.Tick()
   374  			case rd := <-n.Ready():
   375  				s.Append(rd.Entries)
   376  				applied := false
   377  				for _, e := range rd.Entries {
   378  					rdyEntries = append(rdyEntries, e)
   379  					switch e.Type {
   380  					case raftpb.EntryNormal:
   381  					case raftpb.EntryConfChange:
   382  						var cc raftpb.ConfChange
   383  						cc.Unmarshal(e.Data)
   384  						n.ApplyConfChange(cc)
   385  						applied = true
   386  					}
   387  				}
   388  				n.Advance()
   389  				if applied {
   390  					applyConfChan <- struct{}{}
   391  				}
   392  			}
   393  		}
   394  	}()
   395  
   396  	cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
   397  	ccdata1, _ := cc1.Marshal()
   398  	n.ProposeConfChange(context.TODO(), cc1)
   399  	<-applyConfChan
   400  
   401  	// try add the same node again
   402  	n.ProposeConfChange(context.TODO(), cc1)
   403  	<-applyConfChan
   404  
   405  	// the new node join should be ok
   406  	cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
   407  	ccdata2, _ := cc2.Marshal()
   408  	n.ProposeConfChange(context.TODO(), cc2)
   409  	<-applyConfChan
   410  
   411  	close(stop)
   412  	<-done
   413  
   414  	if len(rdyEntries) != 4 {
   415  		t.Errorf("len(entry) = %d, want %d, %v\n", len(rdyEntries), 4, rdyEntries)
   416  	}
   417  	if !bytes.Equal(rdyEntries[1].Data, ccdata1) {
   418  		t.Errorf("data = %v, want %v", rdyEntries[1].Data, ccdata1)
   419  	}
   420  	if !bytes.Equal(rdyEntries[3].Data, ccdata2) {
   421  		t.Errorf("data = %v, want %v", rdyEntries[3].Data, ccdata2)
   422  	}
   423  	n.Stop()
   424  }
   425  
   426  // TestBlockProposal ensures that node will block proposal when it does not
   427  // know who is the current leader; node will accept proposal when it knows
   428  // who is the current leader.
   429  func TestBlockProposal(t *testing.T) {
   430  	rn := newTestRawNode(1, 10, 1, newTestMemoryStorage(withPeers(1)))
   431  	n := newNode(rn)
   432  	go n.run()
   433  	defer n.Stop()
   434  
   435  	errc := make(chan error, 1)
   436  	go func() {
   437  		errc <- n.Propose(context.TODO(), []byte("somedata"))
   438  	}()
   439  
   440  	testutil.WaitSchedule()
   441  	select {
   442  	case err := <-errc:
   443  		t.Errorf("err = %v, want blocking", err)
   444  	default:
   445  	}
   446  
   447  	n.Campaign(context.TODO())
   448  	select {
   449  	case err := <-errc:
   450  		if err != nil {
   451  			t.Errorf("err = %v, want %v", err, nil)
   452  		}
   453  	case <-time.After(10 * time.Second):
   454  		t.Errorf("blocking proposal, want unblocking")
   455  	}
   456  }
   457  
   458  func TestNodeProposeWaitDropped(t *testing.T) {
   459  	msgs := []raftpb.Message{}
   460  	droppingMsg := []byte("test_dropping")
   461  	dropStep := func(r *raft, m raftpb.Message) error {
   462  		if m.Type == raftpb.MsgProp && strings.Contains(m.String(), string(droppingMsg)) {
   463  			t.Logf("dropping message: %v", m.String())
   464  			return ErrProposalDropped
   465  		}
   466  		msgs = append(msgs, m)
   467  		return nil
   468  	}
   469  
   470  	s := newTestMemoryStorage(withPeers(1))
   471  	rn := newTestRawNode(1, 10, 1, s)
   472  	n := newNode(rn)
   473  	r := rn.raft
   474  	go n.run()
   475  	n.Campaign(context.TODO())
   476  	for {
   477  		rd := <-n.Ready()
   478  		s.Append(rd.Entries)
   479  		// change the step function to dropStep until this raft becomes leader
   480  		if rd.SoftState.Lead == r.id {
   481  			r.step = dropStep
   482  			n.Advance()
   483  			break
   484  		}
   485  		n.Advance()
   486  	}
   487  	proposalTimeout := time.Millisecond * 100
   488  	ctx, cancel := context.WithTimeout(context.Background(), proposalTimeout)
   489  	// propose with cancel should be cancelled earyly if dropped
   490  	err := n.Propose(ctx, droppingMsg)
   491  	if err != ErrProposalDropped {
   492  		t.Errorf("should drop proposal : %v", err)
   493  	}
   494  	cancel()
   495  
   496  	n.Stop()
   497  	if len(msgs) != 0 {
   498  		t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
   499  	}
   500  }
   501  
   502  // TestNodeTick ensures that node.Tick() will increase the
   503  // elapsed of the underlying raft state machine.
   504  func TestNodeTick(t *testing.T) {
   505  	s := newTestMemoryStorage(withPeers(1))
   506  	rn := newTestRawNode(1, 10, 1, s)
   507  	n := newNode(rn)
   508  	r := rn.raft
   509  	go n.run()
   510  	elapsed := r.electionElapsed
   511  	n.Tick()
   512  
   513  	for len(n.tickc) != 0 {
   514  		time.Sleep(100 * time.Millisecond)
   515  	}
   516  
   517  	n.Stop()
   518  	if r.electionElapsed != elapsed+1 {
   519  		t.Errorf("elapsed = %d, want %d", r.electionElapsed, elapsed+1)
   520  	}
   521  }
   522  
   523  // TestNodeStop ensures that node.Stop() blocks until the node has stopped
   524  // processing, and that it is idempotent
   525  func TestNodeStop(t *testing.T) {
   526  	rn := newTestRawNode(1, 10, 1, newTestMemoryStorage(withPeers(1)))
   527  	n := newNode(rn)
   528  	donec := make(chan struct{})
   529  
   530  	go func() {
   531  		n.run()
   532  		close(donec)
   533  	}()
   534  
   535  	status := n.Status()
   536  	n.Stop()
   537  
   538  	select {
   539  	case <-donec:
   540  	case <-time.After(time.Second):
   541  		t.Fatalf("timed out waiting for node to stop!")
   542  	}
   543  
   544  	emptyStatus := Status{}
   545  
   546  	if reflect.DeepEqual(status, emptyStatus) {
   547  		t.Errorf("status = %v, want not empty", status)
   548  	}
   549  	// Further status should return be empty, the node is stopped.
   550  	status = n.Status()
   551  	if !reflect.DeepEqual(status, emptyStatus) {
   552  		t.Errorf("status = %v, want empty", status)
   553  	}
   554  	// Subsequent Stops should have no effect.
   555  	n.Stop()
   556  }
   557  
   558  func TestReadyContainUpdates(t *testing.T) {
   559  	tests := []struct {
   560  		rd       Ready
   561  		wcontain bool
   562  	}{
   563  		{Ready{}, false},
   564  		{Ready{SoftState: &SoftState{Lead: 1}}, true},
   565  		{Ready{HardState: raftpb.HardState{Vote: 1}}, true},
   566  		{Ready{Entries: make([]raftpb.Entry, 1)}, true},
   567  		{Ready{CommittedEntries: make([]raftpb.Entry, 1)}, true},
   568  		{Ready{Messages: make([]raftpb.Message, 1)}, true},
   569  		{Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}, true},
   570  	}
   571  
   572  	for i, tt := range tests {
   573  		if g := tt.rd.containsUpdates(); g != tt.wcontain {
   574  			t.Errorf("#%d: containUpdates = %v, want %v", i, g, tt.wcontain)
   575  		}
   576  	}
   577  }
   578  
   579  // TestNodeStart ensures that a node can be started correctly. The node should
   580  // start with correct configuration change entries, and can accept and commit
   581  // proposals.
   582  func TestNodeStart(t *testing.T) {
   583  	ctx, cancel := context.WithCancel(context.Background())
   584  	defer cancel()
   585  
   586  	cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
   587  	ccdata, err := cc.Marshal()
   588  	if err != nil {
   589  		t.Fatalf("unexpected marshal error: %v", err)
   590  	}
   591  	wants := []Ready{
   592  		{
   593  			HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0},
   594  			Entries: []raftpb.Entry{
   595  				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
   596  			},
   597  			CommittedEntries: []raftpb.Entry{
   598  				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
   599  			},
   600  			MustSync: true,
   601  		},
   602  		{
   603  			HardState:        raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
   604  			Entries:          []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
   605  			CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
   606  			MustSync:         true,
   607  		},
   608  	}
   609  	storage := NewMemoryStorage()
   610  	c := &Config{
   611  		ID:              1,
   612  		ElectionTick:    10,
   613  		HeartbeatTick:   1,
   614  		Storage:         storage,
   615  		MaxSizePerMsg:   noLimit,
   616  		MaxInflightMsgs: 256,
   617  	}
   618  	n := StartNode(c, []Peer{{ID: 1}})
   619  	defer n.Stop()
   620  	g := <-n.Ready()
   621  	if !reflect.DeepEqual(g, wants[0]) {
   622  		t.Fatalf("#%d: g = %+v,\n             w   %+v", 1, g, wants[0])
   623  	} else {
   624  		storage.Append(g.Entries)
   625  		n.Advance()
   626  	}
   627  
   628  	if err := n.Campaign(ctx); err != nil {
   629  		t.Fatal(err)
   630  	}
   631  	rd := <-n.Ready()
   632  	storage.Append(rd.Entries)
   633  	n.Advance()
   634  
   635  	n.Propose(ctx, []byte("foo"))
   636  	if g2 := <-n.Ready(); !reflect.DeepEqual(g2, wants[1]) {
   637  		t.Errorf("#%d: g = %+v,\n             w   %+v", 2, g2, wants[1])
   638  	} else {
   639  		storage.Append(g2.Entries)
   640  		n.Advance()
   641  	}
   642  
   643  	select {
   644  	case rd := <-n.Ready():
   645  		t.Errorf("unexpected Ready: %+v", rd)
   646  	case <-time.After(time.Millisecond):
   647  	}
   648  }
   649  
   650  func TestNodeRestart(t *testing.T) {
   651  	entries := []raftpb.Entry{
   652  		{Term: 1, Index: 1},
   653  		{Term: 1, Index: 2, Data: []byte("foo")},
   654  	}
   655  	st := raftpb.HardState{Term: 1, Commit: 1}
   656  
   657  	want := Ready{
   658  		// No HardState is emitted because there was no change.
   659  		HardState: raftpb.HardState{},
   660  		// commit up to index commit index in st
   661  		CommittedEntries: entries[:st.Commit],
   662  		// MustSync is false because no HardState or new entries are provided.
   663  		MustSync: false,
   664  	}
   665  
   666  	storage := NewMemoryStorage()
   667  	storage.SetHardState(st)
   668  	storage.Append(entries)
   669  	c := &Config{
   670  		ID:              1,
   671  		ElectionTick:    10,
   672  		HeartbeatTick:   1,
   673  		Storage:         storage,
   674  		MaxSizePerMsg:   noLimit,
   675  		MaxInflightMsgs: 256,
   676  	}
   677  	n := RestartNode(c)
   678  	defer n.Stop()
   679  	if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
   680  		t.Errorf("g = %+v,\n             w   %+v", g, want)
   681  	}
   682  	n.Advance()
   683  
   684  	select {
   685  	case rd := <-n.Ready():
   686  		t.Errorf("unexpected Ready: %+v", rd)
   687  	case <-time.After(time.Millisecond):
   688  	}
   689  }
   690  
   691  func TestNodeRestartFromSnapshot(t *testing.T) {
   692  	snap := raftpb.Snapshot{
   693  		Metadata: raftpb.SnapshotMetadata{
   694  			ConfState: raftpb.ConfState{Voters: []uint64{1, 2}},
   695  			Index:     2,
   696  			Term:      1,
   697  		},
   698  	}
   699  	entries := []raftpb.Entry{
   700  		{Term: 1, Index: 3, Data: []byte("foo")},
   701  	}
   702  	st := raftpb.HardState{Term: 1, Commit: 3}
   703  
   704  	want := Ready{
   705  		// No HardState is emitted because nothing changed relative to what is
   706  		// already persisted.
   707  		HardState: raftpb.HardState{},
   708  		// commit up to index commit index in st
   709  		CommittedEntries: entries,
   710  		// MustSync is only true when there is a new HardState or new entries;
   711  		// neither is the case here.
   712  		MustSync: false,
   713  	}
   714  
   715  	s := NewMemoryStorage()
   716  	s.SetHardState(st)
   717  	s.ApplySnapshot(snap)
   718  	s.Append(entries)
   719  	c := &Config{
   720  		ID:              1,
   721  		ElectionTick:    10,
   722  		HeartbeatTick:   1,
   723  		Storage:         s,
   724  		MaxSizePerMsg:   noLimit,
   725  		MaxInflightMsgs: 256,
   726  	}
   727  	n := RestartNode(c)
   728  	defer n.Stop()
   729  	if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
   730  		t.Errorf("g = %+v,\n             w   %+v", g, want)
   731  	} else {
   732  		n.Advance()
   733  	}
   734  
   735  	select {
   736  	case rd := <-n.Ready():
   737  		t.Errorf("unexpected Ready: %+v", rd)
   738  	case <-time.After(time.Millisecond):
   739  	}
   740  }
   741  
   742  func TestNodeAdvance(t *testing.T) {
   743  	ctx, cancel := context.WithCancel(context.Background())
   744  	defer cancel()
   745  
   746  	storage := NewMemoryStorage()
   747  	c := &Config{
   748  		ID:              1,
   749  		ElectionTick:    10,
   750  		HeartbeatTick:   1,
   751  		Storage:         storage,
   752  		MaxSizePerMsg:   noLimit,
   753  		MaxInflightMsgs: 256,
   754  	}
   755  	n := StartNode(c, []Peer{{ID: 1}})
   756  	defer n.Stop()
   757  	rd := <-n.Ready()
   758  	storage.Append(rd.Entries)
   759  	n.Advance()
   760  
   761  	n.Campaign(ctx)
   762  	<-n.Ready()
   763  
   764  	n.Propose(ctx, []byte("foo"))
   765  	select {
   766  	case rd = <-n.Ready():
   767  		t.Fatalf("unexpected Ready before Advance: %+v", rd)
   768  	case <-time.After(time.Millisecond):
   769  	}
   770  	storage.Append(rd.Entries)
   771  	n.Advance()
   772  	select {
   773  	case <-n.Ready():
   774  	case <-time.After(100 * time.Millisecond):
   775  		t.Errorf("expect Ready after Advance, but there is no Ready available")
   776  	}
   777  }
   778  
   779  func TestSoftStateEqual(t *testing.T) {
   780  	tests := []struct {
   781  		st *SoftState
   782  		we bool
   783  	}{
   784  		{&SoftState{}, true},
   785  		{&SoftState{Lead: 1}, false},
   786  		{&SoftState{RaftState: StateLeader}, false},
   787  	}
   788  	for i, tt := range tests {
   789  		if g := tt.st.equal(&SoftState{}); g != tt.we {
   790  			t.Errorf("#%d, equal = %v, want %v", i, g, tt.we)
   791  		}
   792  	}
   793  }
   794  
   795  func TestIsHardStateEqual(t *testing.T) {
   796  	tests := []struct {
   797  		st raftpb.HardState
   798  		we bool
   799  	}{
   800  		{emptyState, true},
   801  		{raftpb.HardState{Vote: 1}, false},
   802  		{raftpb.HardState{Commit: 1}, false},
   803  		{raftpb.HardState{Term: 1}, false},
   804  	}
   805  
   806  	for i, tt := range tests {
   807  		if isHardStateEqual(tt.st, emptyState) != tt.we {
   808  			t.Errorf("#%d, equal = %v, want %v", i, isHardStateEqual(tt.st, emptyState), tt.we)
   809  		}
   810  	}
   811  }
   812  
   813  func TestNodeProposeAddLearnerNode(t *testing.T) {
   814  	ticker := time.NewTicker(time.Millisecond * 100)
   815  	defer ticker.Stop()
   816  	s := newTestMemoryStorage(withPeers(1))
   817  	rn := newTestRawNode(1, 10, 1, s)
   818  	n := newNode(rn)
   819  	go n.run()
   820  	n.Campaign(context.TODO())
   821  	stop := make(chan struct{})
   822  	done := make(chan struct{})
   823  	applyConfChan := make(chan struct{})
   824  	go func() {
   825  		defer close(done)
   826  		for {
   827  			select {
   828  			case <-stop:
   829  				return
   830  			case <-ticker.C:
   831  				n.Tick()
   832  			case rd := <-n.Ready():
   833  				s.Append(rd.Entries)
   834  				t.Logf("raft: %v", rd.Entries)
   835  				for _, ent := range rd.Entries {
   836  					if ent.Type != raftpb.EntryConfChange {
   837  						continue
   838  					}
   839  					var cc raftpb.ConfChange
   840  					cc.Unmarshal(ent.Data)
   841  					state := n.ApplyConfChange(cc)
   842  					if len(state.Learners) == 0 ||
   843  						state.Learners[0] != cc.NodeID ||
   844  						cc.NodeID != 2 {
   845  						t.Errorf("apply conf change should return new added learner: %v", state.String())
   846  					}
   847  
   848  					if len(state.Voters) != 1 {
   849  						t.Errorf("add learner should not change the nodes: %v", state.String())
   850  					}
   851  					t.Logf("apply raft conf %v changed to: %v", cc, state.String())
   852  					applyConfChan <- struct{}{}
   853  				}
   854  				n.Advance()
   855  			}
   856  		}
   857  	}()
   858  	cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddLearnerNode, NodeID: 2}
   859  	n.ProposeConfChange(context.TODO(), cc)
   860  	<-applyConfChan
   861  	close(stop)
   862  	<-done
   863  }
   864  
   865  func TestAppendPagination(t *testing.T) {
   866  	const maxSizePerMsg = 2048
   867  	n := newNetworkWithConfig(func(c *Config) {
   868  		c.MaxSizePerMsg = maxSizePerMsg
   869  	}, nil, nil, nil)
   870  
   871  	seenFullMessage := false
   872  	// Inspect all messages to see that we never exceed the limit, but
   873  	// we do see messages of larger than half the limit.
   874  	n.msgHook = func(m raftpb.Message) bool {
   875  		if m.Type == raftpb.MsgApp {
   876  			size := 0
   877  			for _, e := range m.Entries {
   878  				size += len(e.Data)
   879  			}
   880  			if size > maxSizePerMsg {
   881  				t.Errorf("sent MsgApp that is too large: %d bytes", size)
   882  			}
   883  			if size > maxSizePerMsg/2 {
   884  				seenFullMessage = true
   885  			}
   886  		}
   887  		return true
   888  	}
   889  
   890  	n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
   891  
   892  	// Partition the network while we make our proposals. This forces
   893  	// the entries to be batched into larger messages.
   894  	n.isolate(1)
   895  	blob := []byte(strings.Repeat("a", 1000))
   896  	for i := 0; i < 5; i++ {
   897  		n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgProp, Entries: []raftpb.Entry{{Data: blob}}})
   898  	}
   899  	n.recover()
   900  
   901  	// After the partition recovers, tick the clock to wake everything
   902  	// back up and send the messages.
   903  	n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
   904  	if !seenFullMessage {
   905  		t.Error("didn't see any messages more than half the max size; something is wrong with this test")
   906  	}
   907  }
   908  
   909  func TestCommitPagination(t *testing.T) {
   910  	s := newTestMemoryStorage(withPeers(1))
   911  	cfg := newTestConfig(1, 10, 1, s)
   912  	cfg.MaxCommittedSizePerReady = 2048
   913  	rn, err := NewRawNode(cfg)
   914  	if err != nil {
   915  		t.Fatal(err)
   916  	}
   917  	n := newNode(rn)
   918  	go n.run()
   919  	n.Campaign(context.TODO())
   920  
   921  	rd := readyWithTimeout(&n)
   922  	if len(rd.CommittedEntries) != 1 {
   923  		t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
   924  	}
   925  	s.Append(rd.Entries)
   926  	n.Advance()
   927  
   928  	blob := []byte(strings.Repeat("a", 1000))
   929  	for i := 0; i < 3; i++ {
   930  		if err := n.Propose(context.TODO(), blob); err != nil {
   931  			t.Fatal(err)
   932  		}
   933  	}
   934  
   935  	// The 3 proposals will commit in two batches.
   936  	rd = readyWithTimeout(&n)
   937  	if len(rd.CommittedEntries) != 2 {
   938  		t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries))
   939  	}
   940  	s.Append(rd.Entries)
   941  	n.Advance()
   942  	rd = readyWithTimeout(&n)
   943  	if len(rd.CommittedEntries) != 1 {
   944  		t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries))
   945  	}
   946  	s.Append(rd.Entries)
   947  	n.Advance()
   948  }
   949  
   950  type ignoreSizeHintMemStorage struct {
   951  	*MemoryStorage
   952  }
   953  
   954  func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raftpb.Entry, error) {
   955  	return s.MemoryStorage.Entries(lo, hi, math.MaxUint64)
   956  }
   957  
   958  // TestNodeCommitPaginationAfterRestart regression tests a scenario in which the
   959  // Storage's Entries size limitation is slightly more permissive than Raft's
   960  // internal one. The original bug was the following:
   961  //
   962  //   - node learns that index 11 (or 100, doesn't matter) is committed
   963  //   - nextEnts returns index 1..10 in CommittedEntries due to size limiting. However,
   964  //     index 10 already exceeds maxBytes, due to a user-provided impl of Entries.
   965  //   - Commit index gets bumped to 10
   966  //   - the node persists the HardState, but crashes before applying the entries
   967  //   - upon restart, the storage returns the same entries, but `slice` takes a different code path
   968  //     (since it is now called with an upper bound of 10) and removes the last entry.
   969  //   - Raft emits a HardState with a regressing commit index.
   970  //
   971  // A simpler version of this test would have the storage return a lot less entries than dictated
   972  // by maxSize (for example, exactly one entry) after the restart, resulting in a larger regression.
   973  // This wouldn't need to exploit anything about Raft-internal code paths to fail.
   974  func TestNodeCommitPaginationAfterRestart(t *testing.T) {
   975  	s := &ignoreSizeHintMemStorage{
   976  		MemoryStorage: newTestMemoryStorage(withPeers(1)),
   977  	}
   978  	persistedHardState := raftpb.HardState{
   979  		Term:   1,
   980  		Vote:   1,
   981  		Commit: 10,
   982  	}
   983  
   984  	s.hardState = persistedHardState
   985  	s.ents = make([]raftpb.Entry, 10)
   986  	var size uint64
   987  	for i := range s.ents {
   988  		ent := raftpb.Entry{
   989  			Term:  1,
   990  			Index: uint64(i + 1),
   991  			Type:  raftpb.EntryNormal,
   992  			Data:  []byte("a"),
   993  		}
   994  
   995  		s.ents[i] = ent
   996  		size += uint64(ent.Size())
   997  	}
   998  
   999  	cfg := newTestConfig(1, 10, 1, s)
  1000  	// Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
  1001  	// not be included in the initial rd.CommittedEntries. However, our storage will ignore
  1002  	// this and *will* return it (which is how the Commit index ended up being 10 initially).
  1003  	cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
  1004  
  1005  	rn, err := NewRawNode(cfg)
  1006  	if err != nil {
  1007  		t.Fatal(err)
  1008  	}
  1009  	n := newNode(rn)
  1010  	go n.run()
  1011  	defer n.Stop()
  1012  
  1013  	rd := readyWithTimeout(&n)
  1014  	if !IsEmptyHardState(rd.HardState) && rd.HardState.Commit < persistedHardState.Commit {
  1015  		t.Errorf("HardState regressed: Commit %d -> %d\nCommitting:\n%+v",
  1016  			persistedHardState.Commit, rd.HardState.Commit,
  1017  			DescribeEntries(rd.CommittedEntries, func(data []byte) string { return fmt.Sprintf("%q", data) }),
  1018  		)
  1019  	}
  1020  }
  1021  

View as plain text