...

Source file src/go.etcd.io/etcd/raft/v3/raft_flow_control_test.go

Documentation: go.etcd.io/etcd/raft/v3

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package raft
    16  
    17  import (
    18  	"testing"
    19  
    20  	pb "go.etcd.io/etcd/raft/v3/raftpb"
    21  )
    22  
    23  // TestMsgAppFlowControlFull ensures:
    24  // 1. msgApp can fill the sending window until full
    25  // 2. when the window is full, no more msgApp can be sent.
    26  
    27  func TestMsgAppFlowControlFull(t *testing.T) {
    28  	r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
    29  	r.becomeCandidate()
    30  	r.becomeLeader()
    31  
    32  	pr2 := r.prs.Progress[2]
    33  	// force the progress to be in replicate state
    34  	pr2.BecomeReplicate()
    35  	// fill in the inflights window
    36  	for i := 0; i < r.prs.MaxInflight; i++ {
    37  		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
    38  		ms := r.readMessages()
    39  		if len(ms) != 1 {
    40  			t.Fatalf("#%d: len(ms) = %d, want 1", i, len(ms))
    41  		}
    42  	}
    43  
    44  	// ensure 1
    45  	if !pr2.Inflights.Full() {
    46  		t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true)
    47  	}
    48  
    49  	// ensure 2
    50  	for i := 0; i < 10; i++ {
    51  		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
    52  		ms := r.readMessages()
    53  		if len(ms) != 0 {
    54  			t.Fatalf("#%d: len(ms) = %d, want 0", i, len(ms))
    55  		}
    56  	}
    57  }
    58  
    59  // TestMsgAppFlowControlMoveForward ensures msgAppResp can move
    60  // forward the sending window correctly:
    61  // 1. valid msgAppResp.index moves the windows to pass all smaller or equal index.
    62  // 2. out-of-dated msgAppResp has no effect on the sliding window.
    63  func TestMsgAppFlowControlMoveForward(t *testing.T) {
    64  	r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
    65  	r.becomeCandidate()
    66  	r.becomeLeader()
    67  
    68  	pr2 := r.prs.Progress[2]
    69  	// force the progress to be in replicate state
    70  	pr2.BecomeReplicate()
    71  	// fill in the inflights window
    72  	for i := 0; i < r.prs.MaxInflight; i++ {
    73  		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
    74  		r.readMessages()
    75  	}
    76  
    77  	// 1 is noop, 2 is the first proposal we just sent.
    78  	// so we start with 2.
    79  	for tt := 2; tt < r.prs.MaxInflight; tt++ {
    80  		// move forward the window
    81  		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(tt)})
    82  		r.readMessages()
    83  
    84  		// fill in the inflights window again
    85  		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
    86  		ms := r.readMessages()
    87  		if len(ms) != 1 {
    88  			t.Fatalf("#%d: len(ms) = %d, want 1", tt, len(ms))
    89  		}
    90  
    91  		// ensure 1
    92  		if !pr2.Inflights.Full() {
    93  			t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true)
    94  		}
    95  
    96  		// ensure 2
    97  		for i := 0; i < tt; i++ {
    98  			r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(i)})
    99  			if !pr2.Inflights.Full() {
   100  				t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true)
   101  			}
   102  		}
   103  	}
   104  }
   105  
   106  // TestMsgAppFlowControlRecvHeartbeat ensures a heartbeat response
   107  // frees one slot if the window is full.
   108  func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
   109  	r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
   110  	r.becomeCandidate()
   111  	r.becomeLeader()
   112  
   113  	pr2 := r.prs.Progress[2]
   114  	// force the progress to be in replicate state
   115  	pr2.BecomeReplicate()
   116  	// fill in the inflights window
   117  	for i := 0; i < r.prs.MaxInflight; i++ {
   118  		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
   119  		r.readMessages()
   120  	}
   121  
   122  	for tt := 1; tt < 5; tt++ {
   123  		if !pr2.Inflights.Full() {
   124  			t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true)
   125  		}
   126  
   127  		// recv tt msgHeartbeatResp and expect one free slot
   128  		for i := 0; i < tt; i++ {
   129  			r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
   130  			r.readMessages()
   131  			if pr2.Inflights.Full() {
   132  				t.Fatalf("#%d.%d: inflights.full = %t, want %t", tt, i, pr2.Inflights.Full(), false)
   133  			}
   134  		}
   135  
   136  		// one slot
   137  		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
   138  		ms := r.readMessages()
   139  		if len(ms) != 1 {
   140  			t.Fatalf("#%d: free slot = 0, want 1", tt)
   141  		}
   142  
   143  		// and just one slot
   144  		for i := 0; i < 10; i++ {
   145  			r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
   146  			ms1 := r.readMessages()
   147  			if len(ms1) != 0 {
   148  				t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms1))
   149  			}
   150  		}
   151  
   152  		// clear all pending messages.
   153  		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
   154  		r.readMessages()
   155  	}
   156  }
   157  

View as plain text