...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp/http_test.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafthttp
    16  
    17  import (
    18  	"bytes"
    19  	"errors"
    20  	"fmt"
    21  	"io"
    22  	"net/http"
    23  	"net/http/httptest"
    24  	"net/url"
    25  	"strings"
    26  	"testing"
    27  	"time"
    28  
    29  	"go.etcd.io/etcd/api/v3/version"
    30  	"go.etcd.io/etcd/client/pkg/v3/types"
    31  	"go.etcd.io/etcd/pkg/v3/pbutil"
    32  	"go.etcd.io/etcd/raft/v3/raftpb"
    33  	"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
    34  
    35  	"go.uber.org/zap"
    36  )
    37  
    38  func TestServeRaftPrefix(t *testing.T) {
    39  	testCases := []struct {
    40  		method    string
    41  		body      io.Reader
    42  		p         Raft
    43  		clusterID string
    44  
    45  		wcode int
    46  	}{
    47  		{
    48  			// bad method
    49  			"GET",
    50  			bytes.NewReader(
    51  				pbutil.MustMarshal(&raftpb.Message{}),
    52  			),
    53  			&fakeRaft{},
    54  			"0",
    55  			http.StatusMethodNotAllowed,
    56  		},
    57  		{
    58  			// bad method
    59  			"PUT",
    60  			bytes.NewReader(
    61  				pbutil.MustMarshal(&raftpb.Message{}),
    62  			),
    63  			&fakeRaft{},
    64  			"0",
    65  			http.StatusMethodNotAllowed,
    66  		},
    67  		{
    68  			// bad method
    69  			"DELETE",
    70  			bytes.NewReader(
    71  				pbutil.MustMarshal(&raftpb.Message{}),
    72  			),
    73  			&fakeRaft{},
    74  			"0",
    75  			http.StatusMethodNotAllowed,
    76  		},
    77  		{
    78  			// bad request body
    79  			"POST",
    80  			&errReader{},
    81  			&fakeRaft{},
    82  			"0",
    83  			http.StatusBadRequest,
    84  		},
    85  		{
    86  			// bad request protobuf
    87  			"POST",
    88  			strings.NewReader("malformed garbage"),
    89  			&fakeRaft{},
    90  			"0",
    91  			http.StatusBadRequest,
    92  		},
    93  		{
    94  			// good request, wrong cluster ID
    95  			"POST",
    96  			bytes.NewReader(
    97  				pbutil.MustMarshal(&raftpb.Message{}),
    98  			),
    99  			&fakeRaft{},
   100  			"1",
   101  			http.StatusPreconditionFailed,
   102  		},
   103  		{
   104  			// good request, Processor failure
   105  			"POST",
   106  			bytes.NewReader(
   107  				pbutil.MustMarshal(&raftpb.Message{}),
   108  			),
   109  			&fakeRaft{
   110  				err: &resWriterToError{code: http.StatusForbidden},
   111  			},
   112  			"0",
   113  			http.StatusForbidden,
   114  		},
   115  		{
   116  			// good request, Processor failure
   117  			"POST",
   118  			bytes.NewReader(
   119  				pbutil.MustMarshal(&raftpb.Message{}),
   120  			),
   121  			&fakeRaft{
   122  				err: &resWriterToError{code: http.StatusInternalServerError},
   123  			},
   124  			"0",
   125  			http.StatusInternalServerError,
   126  		},
   127  		{
   128  			// good request, Processor failure
   129  			"POST",
   130  			bytes.NewReader(
   131  				pbutil.MustMarshal(&raftpb.Message{}),
   132  			),
   133  			&fakeRaft{err: errors.New("blah")},
   134  			"0",
   135  			http.StatusInternalServerError,
   136  		},
   137  		{
   138  			// good request
   139  			"POST",
   140  			bytes.NewReader(
   141  				pbutil.MustMarshal(&raftpb.Message{}),
   142  			),
   143  			&fakeRaft{},
   144  			"0",
   145  			http.StatusNoContent,
   146  		},
   147  	}
   148  	for i, tt := range testCases {
   149  		req, err := http.NewRequest(tt.method, "foo", tt.body)
   150  		if err != nil {
   151  			t.Fatalf("#%d: could not create request: %#v", i, err)
   152  		}
   153  		req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
   154  		req.Header.Set("X-Server-Version", version.Version)
   155  		rw := httptest.NewRecorder()
   156  		h := newPipelineHandler(&Transport{Logger: zap.NewExample()}, tt.p, types.ID(0))
   157  
   158  		// goroutine because the handler panics to disconnect on raft error
   159  		donec := make(chan struct{})
   160  		go func() {
   161  			defer func() {
   162  				recover()
   163  				close(donec)
   164  			}()
   165  			h.ServeHTTP(rw, req)
   166  		}()
   167  		<-donec
   168  
   169  		if rw.Code != tt.wcode {
   170  			t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode)
   171  		}
   172  	}
   173  }
   174  
   175  func TestServeRaftStreamPrefix(t *testing.T) {
   176  	tests := []struct {
   177  		path  string
   178  		wtype streamType
   179  	}{
   180  		{
   181  			RaftStreamPrefix + "/message/1",
   182  			streamTypeMessage,
   183  		},
   184  		{
   185  			RaftStreamPrefix + "/msgapp/1",
   186  			streamTypeMsgAppV2,
   187  		},
   188  	}
   189  	for i, tt := range tests {
   190  		req, err := http.NewRequest("GET", "http://localhost:2380"+tt.path, nil)
   191  		if err != nil {
   192  			t.Fatalf("#%d: could not create request: %#v", i, err)
   193  		}
   194  		req.Header.Set("X-Etcd-Cluster-ID", "1")
   195  		req.Header.Set("X-Server-Version", version.Version)
   196  		req.Header.Set("X-Raft-To", "2")
   197  
   198  		peer := newFakePeer()
   199  		peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}}
   200  		tr := &Transport{}
   201  		h := newStreamHandler(tr, peerGetter, &fakeRaft{}, types.ID(2), types.ID(1))
   202  
   203  		rw := httptest.NewRecorder()
   204  		go h.ServeHTTP(rw, req)
   205  
   206  		var conn *outgoingConn
   207  		select {
   208  		case conn = <-peer.connc:
   209  		case <-time.After(time.Second):
   210  			t.Fatalf("#%d: failed to attach outgoingConn", i)
   211  		}
   212  		if g := rw.Header().Get("X-Server-Version"); g != version.Version {
   213  			t.Errorf("#%d: X-Server-Version = %s, want %s", i, g, version.Version)
   214  		}
   215  		if conn.t != tt.wtype {
   216  			t.Errorf("#%d: type = %s, want %s", i, conn.t, tt.wtype)
   217  		}
   218  		conn.Close()
   219  	}
   220  }
   221  
   222  func TestServeRaftStreamPrefixBad(t *testing.T) {
   223  	removedID := uint64(5)
   224  	tests := []struct {
   225  		method    string
   226  		path      string
   227  		clusterID string
   228  		remote    string
   229  
   230  		wcode int
   231  	}{
   232  		// bad method
   233  		{
   234  			"PUT",
   235  			RaftStreamPrefix + "/message/1",
   236  			"1",
   237  			"1",
   238  			http.StatusMethodNotAllowed,
   239  		},
   240  		// bad method
   241  		{
   242  			"POST",
   243  			RaftStreamPrefix + "/message/1",
   244  			"1",
   245  			"1",
   246  			http.StatusMethodNotAllowed,
   247  		},
   248  		// bad method
   249  		{
   250  			"DELETE",
   251  			RaftStreamPrefix + "/message/1",
   252  			"1",
   253  			"1",
   254  			http.StatusMethodNotAllowed,
   255  		},
   256  		// bad path
   257  		{
   258  			"GET",
   259  			RaftStreamPrefix + "/strange/1",
   260  			"1",
   261  			"1",
   262  			http.StatusNotFound,
   263  		},
   264  		// bad path
   265  		{
   266  			"GET",
   267  			RaftStreamPrefix + "/strange",
   268  			"1",
   269  			"1",
   270  			http.StatusNotFound,
   271  		},
   272  		// non-existent peer
   273  		{
   274  			"GET",
   275  			RaftStreamPrefix + "/message/2",
   276  			"1",
   277  			"1",
   278  			http.StatusNotFound,
   279  		},
   280  		// removed peer
   281  		{
   282  			"GET",
   283  			RaftStreamPrefix + "/message/" + fmt.Sprint(removedID),
   284  			"1",
   285  			"1",
   286  			http.StatusGone,
   287  		},
   288  		// wrong cluster ID
   289  		{
   290  			"GET",
   291  			RaftStreamPrefix + "/message/1",
   292  			"2",
   293  			"1",
   294  			http.StatusPreconditionFailed,
   295  		},
   296  		// wrong remote id
   297  		{
   298  			"GET",
   299  			RaftStreamPrefix + "/message/1",
   300  			"1",
   301  			"2",
   302  			http.StatusPreconditionFailed,
   303  		},
   304  	}
   305  	for i, tt := range tests {
   306  		req, err := http.NewRequest(tt.method, "http://localhost:2380"+tt.path, nil)
   307  		if err != nil {
   308  			t.Fatalf("#%d: could not create request: %#v", i, err)
   309  		}
   310  		req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
   311  		req.Header.Set("X-Server-Version", version.Version)
   312  		req.Header.Set("X-Raft-To", tt.remote)
   313  		rw := httptest.NewRecorder()
   314  		tr := &Transport{}
   315  		peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): newFakePeer()}}
   316  		r := &fakeRaft{removedID: removedID}
   317  		h := newStreamHandler(tr, peerGetter, r, types.ID(1), types.ID(1))
   318  		h.ServeHTTP(rw, req)
   319  
   320  		if rw.Code != tt.wcode {
   321  			t.Errorf("#%d: code = %d, want %d", i, rw.Code, tt.wcode)
   322  		}
   323  	}
   324  }
   325  
   326  func TestCloseNotifier(t *testing.T) {
   327  	c := newCloseNotifier()
   328  	select {
   329  	case <-c.closeNotify():
   330  		t.Fatalf("received unexpected close notification")
   331  	default:
   332  	}
   333  	c.Close()
   334  	select {
   335  	case <-c.closeNotify():
   336  	default:
   337  		t.Fatalf("failed to get close notification")
   338  	}
   339  }
   340  
   341  // errReader implements io.Reader to facilitate a broken request.
   342  type errReader struct{}
   343  
   344  func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") }
   345  
   346  type resWriterToError struct {
   347  	code int
   348  }
   349  
   350  func (e *resWriterToError) Error() string                 { return "" }
   351  func (e *resWriterToError) WriteTo(w http.ResponseWriter) { w.WriteHeader(e.code) }
   352  
   353  type fakePeerGetter struct {
   354  	peers map[types.ID]Peer
   355  }
   356  
   357  func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
   358  
   359  type fakePeer struct {
   360  	msgs     []raftpb.Message
   361  	snapMsgs []snap.Message
   362  	peerURLs types.URLs
   363  	connc    chan *outgoingConn
   364  	paused   bool
   365  }
   366  
   367  func newFakePeer() *fakePeer {
   368  	fakeURL, _ := url.Parse("http://localhost")
   369  	return &fakePeer{
   370  		connc:    make(chan *outgoingConn, 1),
   371  		peerURLs: types.URLs{*fakeURL},
   372  	}
   373  }
   374  
   375  func (pr *fakePeer) send(m raftpb.Message) {
   376  	if pr.paused {
   377  		return
   378  	}
   379  	pr.msgs = append(pr.msgs, m)
   380  }
   381  
   382  func (pr *fakePeer) sendSnap(m snap.Message) {
   383  	if pr.paused {
   384  		return
   385  	}
   386  	pr.snapMsgs = append(pr.snapMsgs, m)
   387  }
   388  
   389  func (pr *fakePeer) update(urls types.URLs)                { pr.peerURLs = urls }
   390  func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
   391  func (pr *fakePeer) activeSince() time.Time                { return time.Time{} }
   392  func (pr *fakePeer) stop()                                 {}
   393  func (pr *fakePeer) Pause()                                { pr.paused = true }
   394  func (pr *fakePeer) Resume()                               { pr.paused = false }
   395  

View as plain text