...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp/pipeline_test.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafthttp
    16  
    17  import (
    18  	"errors"
    19  	"fmt"
    20  	"io"
    21  	"io/ioutil"
    22  	"net/http"
    23  	"sync"
    24  	"testing"
    25  	"time"
    26  
    27  	"go.etcd.io/etcd/api/v3/version"
    28  	"go.etcd.io/etcd/client/pkg/v3/testutil"
    29  	"go.etcd.io/etcd/client/pkg/v3/types"
    30  	"go.etcd.io/etcd/raft/v3/raftpb"
    31  	stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
    32  
    33  	"go.uber.org/zap"
    34  )
    35  
    36  // TestPipelineSend tests that pipeline could send data using roundtripper
    37  // and increase success count in stats.
    38  func TestPipelineSend(t *testing.T) {
    39  	tr := &roundTripperRecorder{rec: testutil.NewRecorderStream()}
    40  	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
    41  	tp := &Transport{pipelineRt: tr}
    42  	p := startTestPipeline(tp, picker)
    43  
    44  	p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
    45  	tr.rec.Wait(1)
    46  	p.stop()
    47  	if p.followerStats.Counts.Success != 1 {
    48  		t.Errorf("success = %d, want 1", p.followerStats.Counts.Success)
    49  	}
    50  }
    51  
    52  // TestPipelineKeepSendingWhenPostError tests that pipeline can keep
    53  // sending messages if previous messages meet post error.
    54  func TestPipelineKeepSendingWhenPostError(t *testing.T) {
    55  	tr := &respRoundTripper{rec: testutil.NewRecorderStream(), err: fmt.Errorf("roundtrip error")}
    56  	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
    57  	tp := &Transport{pipelineRt: tr}
    58  	p := startTestPipeline(tp, picker)
    59  	defer p.stop()
    60  
    61  	for i := 0; i < 50; i++ {
    62  		p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
    63  	}
    64  
    65  	_, err := tr.rec.Wait(50)
    66  	if err != nil {
    67  		t.Errorf("unexpected wait error %v", err)
    68  	}
    69  }
    70  
    71  func TestPipelineExceedMaximumServing(t *testing.T) {
    72  	rt := newRoundTripperBlocker()
    73  	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
    74  	tp := &Transport{pipelineRt: rt}
    75  	p := startTestPipeline(tp, picker)
    76  	defer p.stop()
    77  
    78  	// keep the sender busy and make the buffer full
    79  	// nothing can go out as we block the sender
    80  	for i := 0; i < connPerPipeline+pipelineBufSize; i++ {
    81  		select {
    82  		case p.msgc <- raftpb.Message{}:
    83  		case <-time.After(time.Second):
    84  			t.Errorf("failed to send out message")
    85  		}
    86  	}
    87  
    88  	// try to send a data when we are sure the buffer is full
    89  	select {
    90  	case p.msgc <- raftpb.Message{}:
    91  		t.Errorf("unexpected message sendout")
    92  	default:
    93  	}
    94  
    95  	// unblock the senders and force them to send out the data
    96  	rt.unblock()
    97  
    98  	// It could send new data after previous ones succeed
    99  	select {
   100  	case p.msgc <- raftpb.Message{}:
   101  	case <-time.After(time.Second):
   102  		t.Errorf("failed to send out message")
   103  	}
   104  }
   105  
   106  // TestPipelineSendFailed tests that when send func meets the post error,
   107  // it increases fail count in stats.
   108  func TestPipelineSendFailed(t *testing.T) {
   109  	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
   110  	rt := newRespRoundTripper(0, errors.New("blah"))
   111  	rt.rec = testutil.NewRecorderStream()
   112  	tp := &Transport{pipelineRt: rt}
   113  	p := startTestPipeline(tp, picker)
   114  
   115  	p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
   116  	if _, err := rt.rec.Wait(1); err != nil {
   117  		t.Fatal(err)
   118  	}
   119  
   120  	p.stop()
   121  
   122  	if p.followerStats.Counts.Fail != 1 {
   123  		t.Errorf("fail = %d, want 1", p.followerStats.Counts.Fail)
   124  	}
   125  }
   126  
   127  func TestPipelinePost(t *testing.T) {
   128  	tr := &roundTripperRecorder{rec: &testutil.RecorderBuffered{}}
   129  	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
   130  	tp := &Transport{ClusterID: types.ID(1), pipelineRt: tr}
   131  	p := startTestPipeline(tp, picker)
   132  	if err := p.post([]byte("some data")); err != nil {
   133  		t.Fatalf("unexpected post error: %v", err)
   134  	}
   135  	act, err := tr.rec.Wait(1)
   136  	if err != nil {
   137  		t.Fatal(err)
   138  	}
   139  	p.stop()
   140  
   141  	req := act[0].Params[0].(*http.Request)
   142  
   143  	if g := req.Method; g != "POST" {
   144  		t.Errorf("method = %s, want %s", g, "POST")
   145  	}
   146  	if g := req.URL.String(); g != "http://localhost:2380/raft" {
   147  		t.Errorf("url = %s, want %s", g, "http://localhost:2380/raft")
   148  	}
   149  	if g := req.Header.Get("Content-Type"); g != "application/protobuf" {
   150  		t.Errorf("content type = %s, want %s", g, "application/protobuf")
   151  	}
   152  	if g := req.Header.Get("X-Server-Version"); g != version.Version {
   153  		t.Errorf("version = %s, want %s", g, version.Version)
   154  	}
   155  	if g := req.Header.Get("X-Min-Cluster-Version"); g != version.MinClusterVersion {
   156  		t.Errorf("min version = %s, want %s", g, version.MinClusterVersion)
   157  	}
   158  	if g := req.Header.Get("X-Etcd-Cluster-ID"); g != "1" {
   159  		t.Errorf("cluster id = %s, want %s", g, "1")
   160  	}
   161  	b, err := ioutil.ReadAll(req.Body)
   162  	if err != nil {
   163  		t.Fatalf("unexpected ReadAll error: %v", err)
   164  	}
   165  	if string(b) != "some data" {
   166  		t.Errorf("body = %s, want %s", b, "some data")
   167  	}
   168  }
   169  
   170  func TestPipelinePostBad(t *testing.T) {
   171  	tests := []struct {
   172  		u    string
   173  		code int
   174  		err  error
   175  	}{
   176  		// RoundTrip returns error
   177  		{"http://localhost:2380", 0, errors.New("blah")},
   178  		// unexpected response status code
   179  		{"http://localhost:2380", http.StatusOK, nil},
   180  		{"http://localhost:2380", http.StatusCreated, nil},
   181  	}
   182  	for i, tt := range tests {
   183  		picker := mustNewURLPicker(t, []string{tt.u})
   184  		tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)}
   185  		p := startTestPipeline(tp, picker)
   186  		err := p.post([]byte("some data"))
   187  		p.stop()
   188  
   189  		if err == nil {
   190  			t.Errorf("#%d: err = nil, want not nil", i)
   191  		}
   192  	}
   193  }
   194  
   195  func TestPipelinePostErrorc(t *testing.T) {
   196  	tests := []struct {
   197  		u    string
   198  		code int
   199  		err  error
   200  	}{
   201  		{"http://localhost:2380", http.StatusForbidden, nil},
   202  	}
   203  	for i, tt := range tests {
   204  		picker := mustNewURLPicker(t, []string{tt.u})
   205  		tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)}
   206  		p := startTestPipeline(tp, picker)
   207  		p.post([]byte("some data"))
   208  		p.stop()
   209  		select {
   210  		case <-p.errorc:
   211  		default:
   212  			t.Fatalf("#%d: cannot receive from errorc", i)
   213  		}
   214  	}
   215  }
   216  
   217  func TestStopBlockedPipeline(t *testing.T) {
   218  	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
   219  	tp := &Transport{pipelineRt: newRoundTripperBlocker()}
   220  	p := startTestPipeline(tp, picker)
   221  	// send many messages that most of them will be blocked in buffer
   222  	for i := 0; i < connPerPipeline*10; i++ {
   223  		p.msgc <- raftpb.Message{}
   224  	}
   225  
   226  	done := make(chan struct{})
   227  	go func() {
   228  		p.stop()
   229  		done <- struct{}{}
   230  	}()
   231  	select {
   232  	case <-done:
   233  	case <-time.After(time.Second):
   234  		t.Fatalf("failed to stop pipeline in 1s")
   235  	}
   236  }
   237  
   238  type roundTripperBlocker struct {
   239  	unblockc chan struct{}
   240  	mu       sync.Mutex
   241  	cancel   map[*http.Request]chan struct{}
   242  }
   243  
   244  func newRoundTripperBlocker() *roundTripperBlocker {
   245  	return &roundTripperBlocker{
   246  		unblockc: make(chan struct{}),
   247  		cancel:   make(map[*http.Request]chan struct{}),
   248  	}
   249  }
   250  
   251  func (t *roundTripperBlocker) unblock() {
   252  	close(t.unblockc)
   253  }
   254  
   255  func (t *roundTripperBlocker) CancelRequest(req *http.Request) {
   256  	t.mu.Lock()
   257  	defer t.mu.Unlock()
   258  	if c, ok := t.cancel[req]; ok {
   259  		c <- struct{}{}
   260  		delete(t.cancel, req)
   261  	}
   262  }
   263  
   264  type respRoundTripper struct {
   265  	mu  sync.Mutex
   266  	rec testutil.Recorder
   267  
   268  	code   int
   269  	header http.Header
   270  	err    error
   271  }
   272  
   273  func newRespRoundTripper(code int, err error) *respRoundTripper {
   274  	return &respRoundTripper{code: code, err: err}
   275  }
   276  func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
   277  	t.mu.Lock()
   278  	defer t.mu.Unlock()
   279  	if t.rec != nil {
   280  		t.rec.Record(testutil.Action{Name: "req", Params: []interface{}{req}})
   281  	}
   282  	return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err
   283  }
   284  
   285  type roundTripperRecorder struct {
   286  	rec testutil.Recorder
   287  }
   288  
   289  func (t *roundTripperRecorder) RoundTrip(req *http.Request) (*http.Response, error) {
   290  	if t.rec != nil {
   291  		t.rec.Record(testutil.Action{Name: "req", Params: []interface{}{req}})
   292  	}
   293  	return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
   294  }
   295  
   296  type nopReadCloser struct{}
   297  
   298  func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, io.EOF }
   299  func (n *nopReadCloser) Close() error               { return nil }
   300  
   301  func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline {
   302  	p := &pipeline{
   303  		peerID:        types.ID(1),
   304  		tr:            tr,
   305  		picker:        picker,
   306  		status:        newPeerStatus(zap.NewExample(), tr.ID, types.ID(1)),
   307  		raft:          &fakeRaft{},
   308  		followerStats: &stats.FollowerStats{},
   309  		errorc:        make(chan error, 1),
   310  	}
   311  	p.start()
   312  	return p
   313  }
   314  

View as plain text