...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp/stream_test.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafthttp
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"io"
    22  	"net/http"
    23  	"net/http/httptest"
    24  	"reflect"
    25  	"sync"
    26  	"testing"
    27  	"time"
    28  
    29  	"go.etcd.io/etcd/api/v3/version"
    30  	"go.etcd.io/etcd/client/pkg/v3/testutil"
    31  	"go.etcd.io/etcd/client/pkg/v3/types"
    32  	"go.etcd.io/etcd/raft/v3/raftpb"
    33  	stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
    34  
    35  	"github.com/coreos/go-semver/semver"
    36  	"go.uber.org/zap"
    37  	"golang.org/x/time/rate"
    38  )
    39  
    40  // TestStreamWriterAttachOutgoingConn tests that outgoingConn can be attached
    41  // to streamWriter. After that, streamWriter can use it to send messages
    42  // continuously, and closes it when stopped.
    43  func TestStreamWriterAttachOutgoingConn(t *testing.T) {
    44  	sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
    45  	// the expected initial state of streamWriter is not working
    46  	if _, ok := sw.writec(); ok {
    47  		t.Errorf("initial working status = %v, want false", ok)
    48  	}
    49  
    50  	// repeat tests to ensure streamWriter can use last attached connection
    51  	var wfc *fakeWriteFlushCloser
    52  	for i := 0; i < 3; i++ {
    53  		prevwfc := wfc
    54  		wfc = newFakeWriteFlushCloser(nil)
    55  		sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
    56  
    57  		// previous attached connection should be closed
    58  		if prevwfc != nil {
    59  			select {
    60  			case <-prevwfc.closed:
    61  			case <-time.After(time.Second):
    62  				t.Errorf("#%d: close of previous connection timed out", i)
    63  			}
    64  		}
    65  
    66  		// if prevwfc != nil, the new msgc is ready since prevwfc has closed
    67  		// if prevwfc == nil, the first connection may be pending, but the first
    68  		// msgc is already available since it's set on calling startStreamwriter
    69  		msgc, _ := sw.writec()
    70  		msgc <- raftpb.Message{}
    71  
    72  		select {
    73  		case <-wfc.writec:
    74  		case <-time.After(time.Second):
    75  			t.Errorf("#%d: failed to write to the underlying connection", i)
    76  		}
    77  		// write chan is still available
    78  		if _, ok := sw.writec(); !ok {
    79  			t.Errorf("#%d: working status = %v, want true", i, ok)
    80  		}
    81  	}
    82  
    83  	sw.stop()
    84  	// write chan is unavailable since the writer is stopped.
    85  	if _, ok := sw.writec(); ok {
    86  		t.Errorf("working status after stop = %v, want false", ok)
    87  	}
    88  	if !wfc.Closed() {
    89  		t.Errorf("failed to close the underlying connection")
    90  	}
    91  }
    92  
    93  // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
    94  // outgoingConn will close the outgoingConn and fall back to non-working status.
    95  func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
    96  	sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
    97  	defer sw.stop()
    98  	wfc := newFakeWriteFlushCloser(errors.New("blah"))
    99  	sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
   100  
   101  	sw.msgc <- raftpb.Message{}
   102  	select {
   103  	case <-wfc.closed:
   104  	case <-time.After(time.Second):
   105  		t.Errorf("failed to close the underlying connection in time")
   106  	}
   107  	// no longer working
   108  	if _, ok := sw.writec(); ok {
   109  		t.Errorf("working = %v, want false", ok)
   110  	}
   111  }
   112  
   113  func TestStreamReaderDialRequest(t *testing.T) {
   114  	for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} {
   115  		tr := &roundTripperRecorder{rec: &testutil.RecorderBuffered{}}
   116  		sr := &streamReader{
   117  			peerID: types.ID(2),
   118  			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
   119  			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
   120  			ctx:    context.Background(),
   121  		}
   122  		sr.dial(tt)
   123  
   124  		act, err := tr.rec.Wait(1)
   125  		if err != nil {
   126  			t.Fatal(err)
   127  		}
   128  		req := act[0].Params[0].(*http.Request)
   129  
   130  		wurl := fmt.Sprintf("http://localhost:2380" + tt.endpoint(zap.NewExample()) + "/1")
   131  		if req.URL.String() != wurl {
   132  			t.Errorf("#%d: url = %s, want %s", i, req.URL.String(), wurl)
   133  		}
   134  		if w := "GET"; req.Method != w {
   135  			t.Errorf("#%d: method = %s, want %s", i, req.Method, w)
   136  		}
   137  		if g := req.Header.Get("X-Etcd-Cluster-ID"); g != "1" {
   138  			t.Errorf("#%d: header X-Etcd-Cluster-ID = %s, want 1", i, g)
   139  		}
   140  		if g := req.Header.Get("X-Raft-To"); g != "2" {
   141  			t.Errorf("#%d: header X-Raft-To = %s, want 2", i, g)
   142  		}
   143  	}
   144  }
   145  
   146  // TestStreamReaderDialResult tests the result of the dial func call meets the
   147  // HTTP response received.
   148  func TestStreamReaderDialResult(t *testing.T) {
   149  	tests := []struct {
   150  		code  int
   151  		err   error
   152  		wok   bool
   153  		whalt bool
   154  	}{
   155  		{0, errors.New("blah"), false, false},
   156  		{http.StatusOK, nil, true, false},
   157  		{http.StatusMethodNotAllowed, nil, false, false},
   158  		{http.StatusNotFound, nil, false, false},
   159  		{http.StatusPreconditionFailed, nil, false, false},
   160  		{http.StatusGone, nil, false, true},
   161  	}
   162  	for i, tt := range tests {
   163  		h := http.Header{}
   164  		h.Add("X-Server-Version", version.Version)
   165  		tr := &respRoundTripper{
   166  			code:   tt.code,
   167  			header: h,
   168  			err:    tt.err,
   169  		}
   170  		sr := &streamReader{
   171  			peerID: types.ID(2),
   172  			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
   173  			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
   174  			errorc: make(chan error, 1),
   175  			ctx:    context.Background(),
   176  		}
   177  
   178  		_, err := sr.dial(streamTypeMessage)
   179  		if ok := err == nil; ok != tt.wok {
   180  			t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
   181  		}
   182  		if halt := len(sr.errorc) > 0; halt != tt.whalt {
   183  			t.Errorf("#%d: halt = %v, want %v", i, halt, tt.whalt)
   184  		}
   185  	}
   186  }
   187  
   188  // TestStreamReaderStopOnDial tests a stream reader closes the connection on stop.
   189  func TestStreamReaderStopOnDial(t *testing.T) {
   190  	testutil.RegisterLeakDetection(t)
   191  	h := http.Header{}
   192  	h.Add("X-Server-Version", version.Version)
   193  	tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}}
   194  	sr := &streamReader{
   195  		peerID: types.ID(2),
   196  		tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
   197  		picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
   198  		errorc: make(chan error, 1),
   199  		typ:    streamTypeMessage,
   200  		status: newPeerStatus(zap.NewExample(), types.ID(1), types.ID(2)),
   201  		rl:     rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
   202  	}
   203  	tr.onResp = func() {
   204  		// stop() waits for the run() goroutine to exit, but that exit
   205  		// needs a response from RoundTrip() first; use goroutine
   206  		go sr.stop()
   207  		// wait so that stop() is blocked on run() exiting
   208  		time.Sleep(10 * time.Millisecond)
   209  		// sr.run() completes dialing then begins decoding while stopped
   210  	}
   211  	sr.start()
   212  	select {
   213  	case <-sr.done:
   214  	case <-time.After(time.Second):
   215  		t.Fatal("streamReader did not stop in time")
   216  	}
   217  }
   218  
   219  type respWaitRoundTripper struct {
   220  	rrt    *respRoundTripper
   221  	onResp func()
   222  }
   223  
   224  func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
   225  	resp, err := t.rrt.RoundTrip(req)
   226  	resp.Body = newWaitReadCloser()
   227  	t.onResp()
   228  	return resp, err
   229  }
   230  
   231  type waitReadCloser struct{ closec chan struct{} }
   232  
   233  func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} }
   234  func (wrc *waitReadCloser) Read(p []byte) (int, error) {
   235  	<-wrc.closec
   236  	return 0, io.EOF
   237  }
   238  func (wrc *waitReadCloser) Close() error {
   239  	close(wrc.closec)
   240  	return nil
   241  }
   242  
   243  // TestStreamReaderDialDetectUnsupport tests that dial func could find
   244  // out that the stream type is not supported by the remote.
   245  func TestStreamReaderDialDetectUnsupport(t *testing.T) {
   246  	for i, typ := range []streamType{streamTypeMsgAppV2, streamTypeMessage} {
   247  		// the response from etcd 2.0
   248  		tr := &respRoundTripper{
   249  			code:   http.StatusNotFound,
   250  			header: http.Header{},
   251  		}
   252  		sr := &streamReader{
   253  			peerID: types.ID(2),
   254  			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
   255  			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
   256  			ctx:    context.Background(),
   257  		}
   258  
   259  		_, err := sr.dial(typ)
   260  		if err != errUnsupportedStreamType {
   261  			t.Errorf("#%d: error = %v, want %v", i, err, errUnsupportedStreamType)
   262  		}
   263  	}
   264  }
   265  
   266  // TestStream tests that streamReader and streamWriter can build stream to
   267  // send messages between each other.
   268  func TestStream(t *testing.T) {
   269  	recvc := make(chan raftpb.Message, streamBufSize)
   270  	propc := make(chan raftpb.Message, streamBufSize)
   271  	msgapp := raftpb.Message{
   272  		Type:    raftpb.MsgApp,
   273  		From:    2,
   274  		To:      1,
   275  		Term:    1,
   276  		LogTerm: 1,
   277  		Index:   3,
   278  		Entries: []raftpb.Entry{{Term: 1, Index: 4}},
   279  	}
   280  
   281  	tests := []struct {
   282  		t  streamType
   283  		m  raftpb.Message
   284  		wc chan raftpb.Message
   285  	}{
   286  		{
   287  			streamTypeMessage,
   288  			raftpb.Message{Type: raftpb.MsgProp, To: 2},
   289  			propc,
   290  		},
   291  		{
   292  			streamTypeMessage,
   293  			msgapp,
   294  			recvc,
   295  		},
   296  		{
   297  			streamTypeMsgAppV2,
   298  			msgapp,
   299  			recvc,
   300  		},
   301  	}
   302  	for i, tt := range tests {
   303  		h := &fakeStreamHandler{t: tt.t}
   304  		srv := httptest.NewServer(h)
   305  		defer srv.Close()
   306  
   307  		sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
   308  		defer sw.stop()
   309  		h.sw = sw
   310  
   311  		picker := mustNewURLPicker(t, []string{srv.URL})
   312  		tr := &Transport{streamRt: &http.Transport{}, ClusterID: types.ID(1)}
   313  
   314  		sr := &streamReader{
   315  			peerID: types.ID(2),
   316  			typ:    tt.t,
   317  			tr:     tr,
   318  			picker: picker,
   319  			status: newPeerStatus(zap.NewExample(), types.ID(0), types.ID(2)),
   320  			recvc:  recvc,
   321  			propc:  propc,
   322  			rl:     rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
   323  		}
   324  		sr.start()
   325  
   326  		// wait for stream to work
   327  		var writec chan<- raftpb.Message
   328  		for {
   329  			var ok bool
   330  			if writec, ok = sw.writec(); ok {
   331  				break
   332  			}
   333  			time.Sleep(time.Millisecond)
   334  		}
   335  
   336  		writec <- tt.m
   337  		var m raftpb.Message
   338  		select {
   339  		case m = <-tt.wc:
   340  		case <-time.After(time.Second):
   341  			t.Fatalf("#%d: failed to receive message from the channel", i)
   342  		}
   343  		if !reflect.DeepEqual(m, tt.m) {
   344  			t.Fatalf("#%d: message = %+v, want %+v", i, m, tt.m)
   345  		}
   346  
   347  		sr.stop()
   348  	}
   349  }
   350  
   351  func TestCheckStreamSupport(t *testing.T) {
   352  	tests := []struct {
   353  		v *semver.Version
   354  		t streamType
   355  		w bool
   356  	}{
   357  		// support
   358  		{
   359  			semver.Must(semver.NewVersion("2.1.0")),
   360  			streamTypeMsgAppV2,
   361  			true,
   362  		},
   363  		// ignore patch
   364  		{
   365  			semver.Must(semver.NewVersion("2.1.9")),
   366  			streamTypeMsgAppV2,
   367  			true,
   368  		},
   369  		// ignore prerelease
   370  		{
   371  			semver.Must(semver.NewVersion("2.1.0-alpha")),
   372  			streamTypeMsgAppV2,
   373  			true,
   374  		},
   375  	}
   376  	for i, tt := range tests {
   377  		if g := checkStreamSupport(tt.v, tt.t); g != tt.w {
   378  			t.Errorf("#%d: check = %v, want %v", i, g, tt.w)
   379  		}
   380  	}
   381  }
   382  
   383  func TestStreamSupportCurrentVersion(t *testing.T) {
   384  	cv := version.Cluster(version.Version)
   385  	cv = cv + ".0"
   386  	if _, ok := supportedStream[cv]; !ok {
   387  		t.Errorf("Current version does not have stream support.")
   388  	}
   389  }
   390  
   391  type fakeWriteFlushCloser struct {
   392  	mu      sync.Mutex
   393  	err     error
   394  	written int
   395  	closed  chan struct{}
   396  	writec  chan struct{}
   397  }
   398  
   399  func newFakeWriteFlushCloser(err error) *fakeWriteFlushCloser {
   400  	return &fakeWriteFlushCloser{
   401  		err:    err,
   402  		closed: make(chan struct{}),
   403  		writec: make(chan struct{}, 1),
   404  	}
   405  }
   406  
   407  func (wfc *fakeWriteFlushCloser) Write(p []byte) (n int, err error) {
   408  	wfc.mu.Lock()
   409  	defer wfc.mu.Unlock()
   410  	select {
   411  	case wfc.writec <- struct{}{}:
   412  	default:
   413  	}
   414  	wfc.written += len(p)
   415  	return len(p), wfc.err
   416  }
   417  
   418  func (wfc *fakeWriteFlushCloser) Flush() {}
   419  
   420  func (wfc *fakeWriteFlushCloser) Close() error {
   421  	close(wfc.closed)
   422  	return wfc.err
   423  }
   424  
   425  func (wfc *fakeWriteFlushCloser) Written() int {
   426  	wfc.mu.Lock()
   427  	defer wfc.mu.Unlock()
   428  	return wfc.written
   429  }
   430  
   431  func (wfc *fakeWriteFlushCloser) Closed() bool {
   432  	select {
   433  	case <-wfc.closed:
   434  		return true
   435  	default:
   436  		return false
   437  	}
   438  }
   439  
   440  type fakeStreamHandler struct {
   441  	t  streamType
   442  	sw *streamWriter
   443  }
   444  
   445  func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   446  	w.Header().Add("X-Server-Version", version.Version)
   447  	w.(http.Flusher).Flush()
   448  	c := newCloseNotifier()
   449  	h.sw.attach(&outgoingConn{
   450  		t:       h.t,
   451  		Writer:  w,
   452  		Flusher: w.(http.Flusher),
   453  		Closer:  c,
   454  	})
   455  	<-c.closeNotify()
   456  }
   457  

View as plain text