...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/corrupt_test.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver

     1  // Copyright 2022 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package etcdserver
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"encoding/json"
    21  	"fmt"
    22  	"io"
    23  	"net/http"
    24  	"net/http/httptest"
    25  	"strconv"
    26  	"strings"
    27  	"testing"
    28  	"time"
    29  
    30  	"go.uber.org/zap"
    31  
    32  	"go.etcd.io/etcd/server/v3/lease"
    33  
    34  	"github.com/stretchr/testify/assert"
    35  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    36  	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
    37  	"go.etcd.io/etcd/client/pkg/v3/types"
    38  	"go.etcd.io/etcd/server/v3/mvcc"
    39  	betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
    40  	"go.uber.org/zap/zaptest"
    41  )
    42  
    43  func TestInitialCheck(t *testing.T) {
    44  	tcs := []struct {
    45  		name          string
    46  		hasher        fakeHasher
    47  		expectError   bool
    48  		expectCorrupt bool
    49  		expectActions []string
    50  	}{
    51  		{
    52  			name: "No peers",
    53  			hasher: fakeHasher{
    54  				hashByRevResponses: []hashByRev{{revision: 10}},
    55  			},
    56  			expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(10)", "MemberId()"},
    57  		},
    58  		{
    59  			name:          "Error getting hash",
    60  			hasher:        fakeHasher{hashByRevResponses: []hashByRev{{err: fmt.Errorf("error getting hash")}}},
    61  			expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "MemberId()"},
    62  			expectError:   true,
    63  		},
    64  		{
    65  			name:          "Peer with empty response",
    66  			hasher:        fakeHasher{peerHashes: []*peerHashKVResp{{}}},
    67  			expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()"},
    68  		},
    69  		{
    70  			name:          "Peer returned ErrFutureRev",
    71  			hasher:        fakeHasher{peerHashes: []*peerHashKVResp{{err: rpctypes.ErrFutureRev}}},
    72  			expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
    73  		},
    74  		{
    75  			name:          "Peer returned ErrCompacted",
    76  			hasher:        fakeHasher{peerHashes: []*peerHashKVResp{{err: rpctypes.ErrCompacted}}},
    77  			expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
    78  		},
    79  		{
    80  			name:          "Peer returned other error",
    81  			hasher:        fakeHasher{peerHashes: []*peerHashKVResp{{err: rpctypes.ErrCorrupt}}},
    82  			expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()"},
    83  		},
    84  		{
    85  			name:          "Peer returned same hash",
    86  			hasher:        fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 1}}}},
    87  			expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
    88  		},
    89  		{
    90  			name:          "Peer returned different hash with same compaction rev",
    91  			hasher:        fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 1}}}},
    92  			expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
    93  			expectError:   true,
    94  		},
    95  		{
    96  			name:          "Peer returned different hash and compaction rev",
    97  			hasher:        fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}},
    98  			expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
    99  		},
   100  		{
   101  			name: "Cluster ID Mismatch does not fail CorruptionChecker.InitialCheck()",
   102  			hasher: fakeHasher{
   103  				peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIdMismatch}},
   104  			},
   105  			expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"},
   106  		},
   107  	}
   108  	for _, tc := range tcs {
   109  		t.Run(tc.name, func(t *testing.T) {
   110  			monitor := corruptionChecker{
   111  				lg:     zaptest.NewLogger(t),
   112  				hasher: &tc.hasher,
   113  			}
   114  			err := monitor.InitialCheck()
   115  			if gotError := err != nil; gotError != tc.expectError {
   116  				t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError)
   117  			}
   118  			if tc.hasher.alarmTriggered != tc.expectCorrupt {
   119  				t.Errorf("Unexpected corrupt triggered, got: %v, expected?: %v", tc.hasher.alarmTriggered, tc.expectCorrupt)
   120  			}
   121  			assert.Equal(t, tc.expectActions, tc.hasher.actions)
   122  		})
   123  	}
   124  }
   125  
   126  func TestPeriodicCheck(t *testing.T) {
   127  	tcs := []struct {
   128  		name          string
   129  		hasher        fakeHasher
   130  		expectError   bool
   131  		expectCorrupt bool
   132  		expectActions []string
   133  	}{
   134  		{
   135  			name:          "Same local hash and no peers",
   136  			hasher:        fakeHasher{hashByRevResponses: []hashByRev{{revision: 10}, {revision: 10}}},
   137  			expectActions: []string{"HashByRev(0)", "PeerHashByRev(10)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
   138  		},
   139  		{
   140  			name:          "Error getting hash first time",
   141  			hasher:        fakeHasher{hashByRevResponses: []hashByRev{{err: fmt.Errorf("error getting hash")}}},
   142  			expectActions: []string{"HashByRev(0)"},
   143  			expectError:   true,
   144  		},
   145  		{
   146  			name:          "Error getting hash second time",
   147  			hasher:        fakeHasher{hashByRevResponses: []hashByRev{{revision: 11}, {err: fmt.Errorf("error getting hash")}}},
   148  			expectActions: []string{"HashByRev(0)", "PeerHashByRev(11)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
   149  			expectError:   true,
   150  		},
   151  		{
   152  			name:          "Error linearizableReadNotify",
   153  			hasher:        fakeHasher{linearizableReadNotify: fmt.Errorf("error getting linearizableReadNotify")},
   154  			expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()"},
   155  			expectError:   true,
   156  		},
   157  		{
   158  			name:          "Different local hash and revision",
   159  			hasher:        fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2}, revision: 2}}},
   160  			expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
   161  		},
   162  		{
   163  			name:          "Different local hash and compaction revision",
   164  			hasher:        fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}}}},
   165  			expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
   166  		},
   167  		{
   168  			name:          "Different local hash and same revisions",
   169  			hasher:        fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 1}, revision: 1}}},
   170  			expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "MemberId()", "TriggerCorruptAlarm(0)"},
   171  			expectCorrupt: true,
   172  		},
   173  		{
   174  			name: "Peer with nil response",
   175  			hasher: fakeHasher{
   176  				peerHashes: []*peerHashKVResp{{}},
   177  			},
   178  			expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
   179  		},
   180  		{
   181  			name: "Peer with newer revision",
   182  			hasher: fakeHasher{
   183  				peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1}}}},
   184  			},
   185  			expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(0)"},
   186  			expectCorrupt: true,
   187  		},
   188  		{
   189  			name: "Peer with newer compact revision",
   190  			hasher: fakeHasher{
   191  				peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 88}, resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 10}, CompactRevision: 2}}},
   192  			},
   193  			expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(0)"},
   194  			expectCorrupt: true,
   195  		},
   196  		{
   197  			name: "Peer with same hash and compact revision",
   198  			hasher: fakeHasher{
   199  				hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}, revision: 2}},
   200  				peerHashes:         []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1}, CompactRevision: 1, Hash: 1}}},
   201  			},
   202  			expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
   203  		},
   204  		{
   205  			name: "Peer with different hash and same compact revision as first local",
   206  			hasher: fakeHasher{
   207  				hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}, revision: 1}, {hash: mvcc.KeyValueHash{Hash: 2, CompactRevision: 2}, revision: 2}},
   208  				peerHashes:         []*peerHashKVResp{{peerInfo: peerInfo{id: 666}, resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 1}, CompactRevision: 1, Hash: 2}}},
   209  			},
   210  			expectActions: []string{"HashByRev(0)", "PeerHashByRev(1)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(0)"},
   211  			expectCorrupt: true,
   212  		},
   213  		{
   214  			name: "Multiple corrupted peers trigger one alarm",
   215  			hasher: fakeHasher{
   216  				peerHashes: []*peerHashKVResp{
   217  					{peerInfo: peerInfo{id: 88}, resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 10}, CompactRevision: 2}},
   218  					{peerInfo: peerInfo{id: 89}, resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: 10}, CompactRevision: 2}},
   219  				},
   220  			},
   221  			expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(0)"},
   222  			expectCorrupt: true,
   223  		},
   224  		{
   225  			name: "Cluster ID Mismatch does not fail CorruptionChecker.PeriodicCheck()",
   226  			hasher: fakeHasher{
   227  				peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIdMismatch}},
   228  			},
   229  			expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"},
   230  		},
   231  	}
   232  	for _, tc := range tcs {
   233  		t.Run(tc.name, func(t *testing.T) {
   234  			monitor := corruptionChecker{
   235  				lg:     zaptest.NewLogger(t),
   236  				hasher: &tc.hasher,
   237  			}
   238  			err := monitor.PeriodicCheck()
   239  			if gotError := err != nil; gotError != tc.expectError {
   240  				t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError)
   241  			}
   242  			if tc.hasher.alarmTriggered != tc.expectCorrupt {
   243  				t.Errorf("Unexpected corrupt triggered, got: %v, expected?: %v", tc.hasher.alarmTriggered, tc.expectCorrupt)
   244  			}
   245  			assert.Equal(t, tc.expectActions, tc.hasher.actions)
   246  		})
   247  	}
   248  }
   249  
   250  func TestCompactHashCheck(t *testing.T) {
   251  	tcs := []struct {
   252  		name                string
   253  		hasher              fakeHasher
   254  		lastRevisionChecked int64
   255  
   256  		expectError               bool
   257  		expectCorrupt             bool
   258  		expectActions             []string
   259  		expectLastRevisionChecked int64
   260  	}{
   261  		{
   262  			name:          "No hashes",
   263  			expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()"},
   264  		},
   265  		{
   266  			name: "No peers, check new checked from largest to smallest",
   267  			hasher: fakeHasher{
   268  				hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}, {Revision: 3}, {Revision: 4}},
   269  			},
   270  			lastRevisionChecked:       2,
   271  			expectActions:             []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(4)", "PeerHashByRev(3)"},
   272  			expectLastRevisionChecked: 2,
   273  		},
   274  		{
   275  			name: "Peer error",
   276  			hasher: fakeHasher{
   277  				hashes:     []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}},
   278  				peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}},
   279  			},
   280  			expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
   281  		},
   282  		{
   283  			name: "Peer returned different compaction revision is skipped",
   284  			hasher: fakeHasher{
   285  				hashes:     []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}},
   286  				peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}},
   287  			},
   288  			expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
   289  		},
   290  		{
   291  			name: "Peer returned same compaction revision but different hash triggers alarm",
   292  			hasher: fakeHasher{
   293  				hashes:     []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}},
   294  				peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}},
   295  			},
   296  			expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "TriggerCorruptAlarm(0)"},
   297  			expectCorrupt: true,
   298  		},
   299  		{
   300  			name: "Peer returned same hash bumps last revision checked",
   301  			hasher: fakeHasher{
   302  				hashes:     []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}},
   303  				peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}},
   304  			},
   305  			expectActions:             []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)"},
   306  			expectLastRevisionChecked: 2,
   307  		},
   308  		{
   309  			name: "Only one peer succeeded check",
   310  			hasher: fakeHasher{
   311  				hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}},
   312  				peerHashes: []*peerHashKVResp{
   313  					{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}},
   314  					{err: fmt.Errorf("failed getting hash")},
   315  				},
   316  			},
   317  			expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"},
   318  		},
   319  		{
   320  			name: "Cluster ID Mismatch does not fail CorruptionChecker.CompactHashCheck()",
   321  			hasher: fakeHasher{
   322  				hashes:     []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}},
   323  				peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIdMismatch}},
   324  			},
   325  			expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"},
   326  		},
   327  	}
   328  	for _, tc := range tcs {
   329  		t.Run(tc.name, func(t *testing.T) {
   330  			monitor := corruptionChecker{
   331  				latestRevisionChecked: tc.lastRevisionChecked,
   332  				lg:                    zaptest.NewLogger(t),
   333  				hasher:                &tc.hasher,
   334  			}
   335  			monitor.CompactHashCheck()
   336  			if tc.hasher.alarmTriggered != tc.expectCorrupt {
   337  				t.Errorf("Unexpected corrupt triggered, got: %v, expected?: %v", tc.hasher.alarmTriggered, tc.expectCorrupt)
   338  			}
   339  			if tc.expectLastRevisionChecked != monitor.latestRevisionChecked {
   340  				t.Errorf("Unexpected last revision checked, got: %v, expected?: %v", monitor.latestRevisionChecked, tc.expectLastRevisionChecked)
   341  			}
   342  			assert.Equal(t, tc.expectActions, tc.hasher.actions)
   343  		})
   344  	}
   345  }
   346  
   347  type fakeHasher struct {
   348  	peerHashes             []*peerHashKVResp
   349  	hashByRevIndex         int
   350  	hashByRevResponses     []hashByRev
   351  	linearizableReadNotify error
   352  	hashes                 []mvcc.KeyValueHash
   353  
   354  	alarmTriggered bool
   355  	actions        []string
   356  }
   357  
   358  type hashByRev struct {
   359  	hash     mvcc.KeyValueHash
   360  	revision int64
   361  	err      error
   362  }
   363  
   364  func (f *fakeHasher) Hash() (hash uint32, revision int64, err error) {
   365  	panic("not implemented")
   366  }
   367  
   368  func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int64, err error) {
   369  	f.actions = append(f.actions, fmt.Sprintf("HashByRev(%d)", rev))
   370  	if len(f.hashByRevResponses) == 0 {
   371  		return mvcc.KeyValueHash{}, 0, nil
   372  	}
   373  	hashByRev := f.hashByRevResponses[f.hashByRevIndex]
   374  	f.hashByRevIndex++
   375  	return hashByRev.hash, hashByRev.revision, hashByRev.err
   376  }
   377  
   378  func (f *fakeHasher) Store(hash mvcc.KeyValueHash) {
   379  	f.actions = append(f.actions, fmt.Sprintf("Store(%v)", hash))
   380  	f.hashes = append(f.hashes, hash)
   381  }
   382  
   383  func (f *fakeHasher) Hashes() []mvcc.KeyValueHash {
   384  	f.actions = append(f.actions, "Hashes()")
   385  	return f.hashes
   386  }
   387  
   388  func (f *fakeHasher) ReqTimeout() time.Duration {
   389  	f.actions = append(f.actions, "ReqTimeout()")
   390  	return time.Second
   391  }
   392  
   393  func (f *fakeHasher) MemberId() types.ID {
   394  	f.actions = append(f.actions, "MemberId()")
   395  	return 1
   396  }
   397  
   398  func (f *fakeHasher) PeerHashByRev(rev int64) []*peerHashKVResp {
   399  	f.actions = append(f.actions, fmt.Sprintf("PeerHashByRev(%d)", rev))
   400  	return f.peerHashes
   401  }
   402  
   403  func (f *fakeHasher) LinearizableReadNotify(ctx context.Context) error {
   404  	f.actions = append(f.actions, "LinearizableReadNotify()")
   405  	return f.linearizableReadNotify
   406  }
   407  
   408  func (f *fakeHasher) TriggerCorruptAlarm(memberId types.ID) {
   409  	f.actions = append(f.actions, fmt.Sprintf("TriggerCorruptAlarm(%d)", memberId))
   410  	f.alarmTriggered = true
   411  }
   412  
   413  func TestHashKVHandler(t *testing.T) {
   414  	var remoteClusterID = 111195
   415  	var localClusterID = 111196
   416  	var revision = 1
   417  
   418  	etcdSrv := &EtcdServer{}
   419  	etcdSrv.cluster = newTestCluster(t, nil)
   420  	etcdSrv.cluster.SetID(types.ID(localClusterID), types.ID(localClusterID))
   421  	be, _ := betesting.NewDefaultTmpBackend(t)
   422  	defer betesting.Close(t, be)
   423  	etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
   424  	defer func() {
   425  		assert.NoError(t, etcdSrv.kv.Close())
   426  	}()
   427  	ph := &hashKVHandler{
   428  		lg:     zap.NewNop(),
   429  		server: etcdSrv,
   430  	}
   431  	srv := httptest.NewServer(ph)
   432  	defer srv.Close()
   433  
   434  	tests := []struct {
   435  		name            string
   436  		remoteClusterID int
   437  		wcode           int
   438  		wKeyWords       string
   439  	}{
   440  		{
   441  			name:            "HashKV returns 200 if cluster hash matches",
   442  			remoteClusterID: localClusterID,
   443  			wcode:           http.StatusOK,
   444  			wKeyWords:       "",
   445  		},
   446  		{
   447  			name:            "HashKV returns 400 if cluster hash doesn't matche",
   448  			remoteClusterID: remoteClusterID,
   449  			wcode:           http.StatusPreconditionFailed,
   450  			wKeyWords:       "cluster ID mismatch",
   451  		},
   452  	}
   453  	for i, tt := range tests {
   454  		t.Run(tt.name, func(t *testing.T) {
   455  			hashReq := &pb.HashKVRequest{Revision: int64(revision)}
   456  			hashReqBytes, err := json.Marshal(hashReq)
   457  			if err != nil {
   458  				t.Fatalf("failed to marshal request: %v", err)
   459  			}
   460  			req, err := http.NewRequest(http.MethodGet, srv.URL+PeerHashKVPath, bytes.NewReader(hashReqBytes))
   461  			if err != nil {
   462  				t.Fatalf("failed to create request: %v", err)
   463  			}
   464  			req.Header.Set("X-Etcd-Cluster-ID", strconv.FormatUint(uint64(tt.remoteClusterID), 16))
   465  
   466  			resp, err := http.DefaultClient.Do(req)
   467  			if err != nil {
   468  				t.Fatalf("failed to get http response: %v", err)
   469  			}
   470  			body, err := io.ReadAll(resp.Body)
   471  			resp.Body.Close()
   472  			if err != nil {
   473  				t.Fatalf("unexpected io.ReadAll error: %v", err)
   474  			}
   475  			if resp.StatusCode != tt.wcode {
   476  				t.Fatalf("#%d: code = %d, want %d", i, resp.StatusCode, tt.wcode)
   477  			}
   478  			if resp.StatusCode != http.StatusOK {
   479  				if !strings.Contains(string(body), tt.wKeyWords) {
   480  					t.Errorf("#%d: body: %s, want body to contain keywords: %s", i, string(body), tt.wKeyWords)
   481  				}
   482  				return
   483  			}
   484  
   485  			hashKVResponse := pb.HashKVResponse{}
   486  			err = json.Unmarshal(body, &hashKVResponse)
   487  			if err != nil {
   488  				t.Fatalf("unmarshal response error: %v", err)
   489  			}
   490  			hashValue, _, err := etcdSrv.KV().HashStorage().HashByRev(int64(revision))
   491  			if err != nil {
   492  				t.Fatalf("etcd server hash failed: %v", err)
   493  			}
   494  			if hashKVResponse.Hash != hashValue.Hash {
   495  				t.Fatalf("hash value inconsistent: %d != %d", hashKVResponse.Hash, hashValue)
   496  			}
   497  		})
   498  	}
   499  }
   500  

View as plain text