...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp/functional_test.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafthttp
    16  
    17  import (
    18  	"context"
    19  	"net/http/httptest"
    20  	"reflect"
    21  	"testing"
    22  	"time"
    23  
    24  	"go.etcd.io/etcd/client/pkg/v3/types"
    25  	"go.etcd.io/etcd/raft/v3"
    26  	"go.etcd.io/etcd/raft/v3/raftpb"
    27  	stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
    28  
    29  	"go.uber.org/zap"
    30  )
    31  
    32  func TestSendMessage(t *testing.T) {
    33  	// member 1
    34  	tr := &Transport{
    35  		ID:          types.ID(1),
    36  		ClusterID:   types.ID(1),
    37  		Raft:        &fakeRaft{},
    38  		ServerStats: newServerStats(),
    39  		LeaderStats: stats.NewLeaderStats(zap.NewExample(), "1"),
    40  	}
    41  	tr.Start()
    42  	srv := httptest.NewServer(tr.Handler())
    43  	defer srv.Close()
    44  
    45  	// member 2
    46  	recvc := make(chan raftpb.Message, 1)
    47  	p := &fakeRaft{recvc: recvc}
    48  	tr2 := &Transport{
    49  		ID:          types.ID(2),
    50  		ClusterID:   types.ID(1),
    51  		Raft:        p,
    52  		ServerStats: newServerStats(),
    53  		LeaderStats: stats.NewLeaderStats(zap.NewExample(), "2"),
    54  	}
    55  	tr2.Start()
    56  	srv2 := httptest.NewServer(tr2.Handler())
    57  	defer srv2.Close()
    58  
    59  	tr.AddPeer(types.ID(2), []string{srv2.URL})
    60  	defer tr.Stop()
    61  	tr2.AddPeer(types.ID(1), []string{srv.URL})
    62  	defer tr2.Stop()
    63  	if !waitStreamWorking(tr.Get(types.ID(2)).(*peer)) {
    64  		t.Fatalf("stream from 1 to 2 is not in work as expected")
    65  	}
    66  
    67  	data := []byte("some data")
    68  	tests := []raftpb.Message{
    69  		// these messages are set to send to itself, which facilitates testing.
    70  		{Type: raftpb.MsgProp, From: 1, To: 2, Entries: []raftpb.Entry{{Data: data}}},
    71  		{Type: raftpb.MsgApp, From: 1, To: 2, Term: 1, Index: 3, LogTerm: 0, Entries: []raftpb.Entry{{Index: 4, Term: 1, Data: data}}, Commit: 3},
    72  		{Type: raftpb.MsgAppResp, From: 1, To: 2, Term: 1, Index: 3},
    73  		{Type: raftpb.MsgVote, From: 1, To: 2, Term: 1, Index: 3, LogTerm: 0},
    74  		{Type: raftpb.MsgVoteResp, From: 1, To: 2, Term: 1},
    75  		{Type: raftpb.MsgSnap, From: 1, To: 2, Term: 1, Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1000, Term: 1}, Data: data}},
    76  		{Type: raftpb.MsgHeartbeat, From: 1, To: 2, Term: 1, Commit: 3},
    77  		{Type: raftpb.MsgHeartbeatResp, From: 1, To: 2, Term: 1},
    78  	}
    79  	for i, tt := range tests {
    80  		tr.Send([]raftpb.Message{tt})
    81  		msg := <-recvc
    82  		if !reflect.DeepEqual(msg, tt) {
    83  			t.Errorf("#%d: msg = %+v, want %+v", i, msg, tt)
    84  		}
    85  	}
    86  }
    87  
    88  // TestSendMessageWhenStreamIsBroken tests that message can be sent to the
    89  // remote in a limited time when all underlying connections are broken.
    90  func TestSendMessageWhenStreamIsBroken(t *testing.T) {
    91  	// member 1
    92  	tr := &Transport{
    93  		ID:          types.ID(1),
    94  		ClusterID:   types.ID(1),
    95  		Raft:        &fakeRaft{},
    96  		ServerStats: newServerStats(),
    97  		LeaderStats: stats.NewLeaderStats(zap.NewExample(), "1"),
    98  	}
    99  	tr.Start()
   100  	srv := httptest.NewServer(tr.Handler())
   101  	defer srv.Close()
   102  
   103  	// member 2
   104  	recvc := make(chan raftpb.Message, 1)
   105  	p := &fakeRaft{recvc: recvc}
   106  	tr2 := &Transport{
   107  		ID:          types.ID(2),
   108  		ClusterID:   types.ID(1),
   109  		Raft:        p,
   110  		ServerStats: newServerStats(),
   111  		LeaderStats: stats.NewLeaderStats(zap.NewExample(), "2"),
   112  	}
   113  	tr2.Start()
   114  	srv2 := httptest.NewServer(tr2.Handler())
   115  	defer srv2.Close()
   116  
   117  	tr.AddPeer(types.ID(2), []string{srv2.URL})
   118  	defer tr.Stop()
   119  	tr2.AddPeer(types.ID(1), []string{srv.URL})
   120  	defer tr2.Stop()
   121  	if !waitStreamWorking(tr.Get(types.ID(2)).(*peer)) {
   122  		t.Fatalf("stream from 1 to 2 is not in work as expected")
   123  	}
   124  
   125  	// break the stream
   126  	srv.CloseClientConnections()
   127  	srv2.CloseClientConnections()
   128  	var n int
   129  	for {
   130  		select {
   131  		// TODO: remove this resend logic when we add retry logic into the code
   132  		case <-time.After(time.Millisecond):
   133  			n++
   134  			tr.Send([]raftpb.Message{{Type: raftpb.MsgHeartbeat, From: 1, To: 2, Term: 1, Commit: 3}})
   135  		case <-recvc:
   136  			if n > 50 {
   137  				t.Errorf("disconnection time = %dms, want < 50ms", n)
   138  			}
   139  			return
   140  		}
   141  	}
   142  }
   143  
   144  func newServerStats() *stats.ServerStats {
   145  	return stats.NewServerStats("", "")
   146  }
   147  
   148  func waitStreamWorking(p *peer) bool {
   149  	for i := 0; i < 1000; i++ {
   150  		time.Sleep(time.Millisecond)
   151  		if _, ok := p.msgAppV2Writer.writec(); !ok {
   152  			continue
   153  		}
   154  		if _, ok := p.writer.writec(); !ok {
   155  			continue
   156  		}
   157  		return true
   158  	}
   159  	return false
   160  }
   161  
   162  type fakeRaft struct {
   163  	recvc     chan<- raftpb.Message
   164  	err       error
   165  	removedID uint64
   166  }
   167  
   168  func (p *fakeRaft) Process(ctx context.Context, m raftpb.Message) error {
   169  	select {
   170  	case p.recvc <- m:
   171  	default:
   172  	}
   173  	return p.err
   174  }
   175  
   176  func (p *fakeRaft) IsIDRemoved(id uint64) bool { return id == p.removedID }
   177  
   178  func (p *fakeRaft) ReportUnreachable(id uint64) {}
   179  
   180  func (p *fakeRaft) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
   181  

View as plain text