...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp/transport_test.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafthttp
    16  
    17  import (
    18  	"net/http"
    19  	"reflect"
    20  	"testing"
    21  	"time"
    22  
    23  	"go.etcd.io/etcd/client/pkg/v3/testutil"
    24  	"go.etcd.io/etcd/client/pkg/v3/types"
    25  	"go.etcd.io/etcd/raft/v3/raftpb"
    26  	stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
    27  
    28  	"github.com/xiang90/probing"
    29  	"go.uber.org/zap"
    30  )
    31  
    32  // TestTransportSend tests that transport can send messages using correct
    33  // underlying peer, and drop local or unknown-target messages.
    34  func TestTransportSend(t *testing.T) {
    35  	peer1 := newFakePeer()
    36  	peer2 := newFakePeer()
    37  	tr := &Transport{
    38  		ServerStats: stats.NewServerStats("", ""),
    39  		peers:       map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
    40  	}
    41  	wmsgsIgnored := []raftpb.Message{
    42  		// bad local message
    43  		{Type: raftpb.MsgBeat},
    44  		// bad remote message
    45  		{Type: raftpb.MsgProp, To: 3},
    46  	}
    47  	wmsgsTo1 := []raftpb.Message{
    48  		// good message
    49  		{Type: raftpb.MsgProp, To: 1},
    50  		{Type: raftpb.MsgApp, To: 1},
    51  	}
    52  	wmsgsTo2 := []raftpb.Message{
    53  		// good message
    54  		{Type: raftpb.MsgProp, To: 2},
    55  		{Type: raftpb.MsgApp, To: 2},
    56  	}
    57  	tr.Send(wmsgsIgnored)
    58  	tr.Send(wmsgsTo1)
    59  	tr.Send(wmsgsTo2)
    60  
    61  	if !reflect.DeepEqual(peer1.msgs, wmsgsTo1) {
    62  		t.Errorf("msgs to peer 1 = %+v, want %+v", peer1.msgs, wmsgsTo1)
    63  	}
    64  	if !reflect.DeepEqual(peer2.msgs, wmsgsTo2) {
    65  		t.Errorf("msgs to peer 2 = %+v, want %+v", peer2.msgs, wmsgsTo2)
    66  	}
    67  }
    68  
    69  func TestTransportCutMend(t *testing.T) {
    70  	peer1 := newFakePeer()
    71  	peer2 := newFakePeer()
    72  	tr := &Transport{
    73  		ServerStats: stats.NewServerStats("", ""),
    74  		peers:       map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
    75  	}
    76  
    77  	tr.CutPeer(types.ID(1))
    78  
    79  	wmsgsTo := []raftpb.Message{
    80  		// good message
    81  		{Type: raftpb.MsgProp, To: 1},
    82  		{Type: raftpb.MsgApp, To: 1},
    83  	}
    84  
    85  	tr.Send(wmsgsTo)
    86  	if len(peer1.msgs) > 0 {
    87  		t.Fatalf("msgs expected to be ignored, got %+v", peer1.msgs)
    88  	}
    89  
    90  	tr.MendPeer(types.ID(1))
    91  
    92  	tr.Send(wmsgsTo)
    93  	if !reflect.DeepEqual(peer1.msgs, wmsgsTo) {
    94  		t.Errorf("msgs to peer 1 = %+v, want %+v", peer1.msgs, wmsgsTo)
    95  	}
    96  }
    97  
    98  func TestTransportAdd(t *testing.T) {
    99  	ls := stats.NewLeaderStats(zap.NewExample(), "")
   100  	tr := &Transport{
   101  		LeaderStats:    ls,
   102  		streamRt:       &roundTripperRecorder{},
   103  		peers:          make(map[types.ID]Peer),
   104  		pipelineProber: probing.NewProber(nil),
   105  		streamProber:   probing.NewProber(nil),
   106  	}
   107  	tr.AddPeer(1, []string{"http://localhost:2380"})
   108  
   109  	if _, ok := ls.Followers["1"]; !ok {
   110  		t.Errorf("FollowerStats[1] is nil, want exists")
   111  	}
   112  	s, ok := tr.peers[types.ID(1)]
   113  	if !ok {
   114  		tr.Stop()
   115  		t.Fatalf("senders[1] is nil, want exists")
   116  	}
   117  
   118  	// duplicate AddPeer is ignored
   119  	tr.AddPeer(1, []string{"http://localhost:2380"})
   120  	ns := tr.peers[types.ID(1)]
   121  	if s != ns {
   122  		t.Errorf("sender = %v, want %v", ns, s)
   123  	}
   124  
   125  	tr.Stop()
   126  }
   127  
   128  func TestTransportRemove(t *testing.T) {
   129  	tr := &Transport{
   130  		LeaderStats:    stats.NewLeaderStats(zap.NewExample(), ""),
   131  		streamRt:       &roundTripperRecorder{},
   132  		peers:          make(map[types.ID]Peer),
   133  		pipelineProber: probing.NewProber(nil),
   134  		streamProber:   probing.NewProber(nil),
   135  	}
   136  	tr.AddPeer(1, []string{"http://localhost:2380"})
   137  	tr.RemovePeer(types.ID(1))
   138  	defer tr.Stop()
   139  
   140  	if _, ok := tr.peers[types.ID(1)]; ok {
   141  		t.Fatalf("senders[1] exists, want removed")
   142  	}
   143  }
   144  
   145  func TestTransportUpdate(t *testing.T) {
   146  	peer := newFakePeer()
   147  	tr := &Transport{
   148  		peers:          map[types.ID]Peer{types.ID(1): peer},
   149  		pipelineProber: probing.NewProber(nil),
   150  		streamProber:   probing.NewProber(nil),
   151  	}
   152  	u := "http://localhost:2380"
   153  	tr.UpdatePeer(types.ID(1), []string{u})
   154  	wurls := types.URLs(testutil.MustNewURLs(t, []string{"http://localhost:2380"}))
   155  	if !reflect.DeepEqual(peer.peerURLs, wurls) {
   156  		t.Errorf("urls = %+v, want %+v", peer.peerURLs, wurls)
   157  	}
   158  }
   159  
   160  func TestTransportErrorc(t *testing.T) {
   161  	errorc := make(chan error, 1)
   162  	tr := &Transport{
   163  		Raft:           &fakeRaft{},
   164  		LeaderStats:    stats.NewLeaderStats(zap.NewExample(), ""),
   165  		ErrorC:         errorc,
   166  		streamRt:       newRespRoundTripper(http.StatusForbidden, nil),
   167  		pipelineRt:     newRespRoundTripper(http.StatusForbidden, nil),
   168  		peers:          make(map[types.ID]Peer),
   169  		pipelineProber: probing.NewProber(nil),
   170  		streamProber:   probing.NewProber(nil),
   171  	}
   172  	tr.AddPeer(1, []string{"http://localhost:2380"})
   173  	defer tr.Stop()
   174  
   175  	select {
   176  	case <-errorc:
   177  		t.Fatalf("received unexpected from errorc")
   178  	case <-time.After(10 * time.Millisecond):
   179  	}
   180  	tr.peers[1].send(raftpb.Message{})
   181  
   182  	select {
   183  	case <-errorc:
   184  	case <-time.After(1 * time.Second):
   185  		t.Fatalf("cannot receive error from errorc")
   186  	}
   187  }
   188  

View as plain text