...

Source file src/go.etcd.io/etcd/client/v3/client_test.go

Documentation: go.etcd.io/etcd/client/v3

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package clientv3
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"io"
    21  	"net"
    22  	"sync"
    23  	"testing"
    24  	"time"
    25  
    26  	"go.etcd.io/etcd/api/v3/etcdserverpb"
    27  	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
    28  	"go.etcd.io/etcd/client/pkg/v3/testutil"
    29  	"go.uber.org/zap"
    30  	"go.uber.org/zap/zaptest"
    31  
    32  	"google.golang.org/grpc"
    33  )
    34  
    35  func NewClient(t *testing.T, cfg Config) (*Client, error) {
    36  	cfg.Logger = zaptest.NewLogger(t)
    37  	return New(cfg)
    38  }
    39  
    40  func TestDialCancel(t *testing.T) {
    41  	testutil.RegisterLeakDetection(t)
    42  
    43  	// accept first connection so client is created with dial timeout
    44  	ln, err := net.Listen("unix", "dialcancel:12345")
    45  	if err != nil {
    46  		t.Fatal(err)
    47  	}
    48  	defer ln.Close()
    49  
    50  	ep := "unix://dialcancel:12345"
    51  	cfg := Config{
    52  		Endpoints:   []string{ep},
    53  		DialTimeout: 30 * time.Second}
    54  	c, err := NewClient(t, cfg)
    55  	if err != nil {
    56  		t.Fatal(err)
    57  	}
    58  
    59  	// connect to ipv4 black hole so dial blocks
    60  	c.SetEndpoints("http://254.0.0.1:12345")
    61  
    62  	// issue Get to force redial attempts
    63  	getc := make(chan struct{})
    64  	go func() {
    65  		defer close(getc)
    66  		// Get may hang forever on grpc's Stream.Header() if its
    67  		// context is never canceled.
    68  		c.Get(c.Ctx(), "abc")
    69  	}()
    70  
    71  	// wait a little bit so client close is after dial starts
    72  	time.Sleep(100 * time.Millisecond)
    73  
    74  	donec := make(chan struct{})
    75  	go func() {
    76  		defer close(donec)
    77  		c.Close()
    78  	}()
    79  
    80  	select {
    81  	case <-time.After(5 * time.Second):
    82  		t.Fatalf("failed to close")
    83  	case <-donec:
    84  	}
    85  	select {
    86  	case <-time.After(5 * time.Second):
    87  		t.Fatalf("get failed to exit")
    88  	case <-getc:
    89  	}
    90  }
    91  
    92  func TestDialTimeout(t *testing.T) {
    93  	testutil.RegisterLeakDetection(t)
    94  
    95  	wantError := context.DeadlineExceeded
    96  
    97  	// grpc.WithBlock to block until connection up or timeout
    98  	testCfgs := []Config{
    99  		{
   100  			Endpoints:   []string{"http://254.0.0.1:12345"},
   101  			DialTimeout: 2 * time.Second,
   102  			DialOptions: []grpc.DialOption{grpc.WithBlock()},
   103  		},
   104  		{
   105  			Endpoints:   []string{"http://254.0.0.1:12345"},
   106  			DialTimeout: time.Second,
   107  			DialOptions: []grpc.DialOption{grpc.WithBlock()},
   108  			Username:    "abc",
   109  			Password:    "def",
   110  		},
   111  	}
   112  
   113  	for i, cfg := range testCfgs {
   114  		donec := make(chan error, 1)
   115  		go func(cfg Config) {
   116  			// without timeout, dial continues forever on ipv4 black hole
   117  			c, err := NewClient(t, cfg)
   118  			if c != nil || err == nil {
   119  				t.Errorf("#%d: new client should fail", i)
   120  			}
   121  			donec <- err
   122  		}(cfg)
   123  
   124  		time.Sleep(10 * time.Millisecond)
   125  
   126  		select {
   127  		case err := <-donec:
   128  			t.Errorf("#%d: dial didn't wait (%v)", i, err)
   129  		default:
   130  		}
   131  
   132  		select {
   133  		case <-time.After(5 * time.Second):
   134  			t.Errorf("#%d: failed to timeout dial on time", i)
   135  		case err := <-donec:
   136  			if err.Error() != wantError.Error() {
   137  				t.Errorf("#%d: unexpected error '%v', want '%v'", i, err, wantError)
   138  			}
   139  		}
   140  	}
   141  }
   142  
   143  func TestDialNoTimeout(t *testing.T) {
   144  	cfg := Config{Endpoints: []string{"127.0.0.1:12345"}}
   145  	c, err := NewClient(t, cfg)
   146  	if c == nil || err != nil {
   147  		t.Fatalf("new client with DialNoWait should succeed, got %v", err)
   148  	}
   149  	c.Close()
   150  }
   151  
   152  func TestMaxUnaryRetries(t *testing.T) {
   153  	maxUnaryRetries := uint(10)
   154  	cfg := Config{
   155  		Endpoints:       []string{"127.0.0.1:12345"},
   156  		MaxUnaryRetries: maxUnaryRetries,
   157  	}
   158  	c, err := NewClient(t, cfg)
   159  	if c == nil || err != nil {
   160  		t.Fatalf("new client with MaxUnaryRetries should succeed, got %v", err)
   161  	}
   162  	defer c.Close()
   163  
   164  	if c.cfg.MaxUnaryRetries != maxUnaryRetries {
   165  		t.Fatalf("client MaxUnaryRetries should be %d, got %d", maxUnaryRetries, c.cfg.MaxUnaryRetries)
   166  	}
   167  }
   168  
   169  func TestBackoff(t *testing.T) {
   170  	backoffWaitBetween := 100 * time.Millisecond
   171  	cfg := Config{
   172  		Endpoints:          []string{"127.0.0.1:12345"},
   173  		BackoffWaitBetween: backoffWaitBetween,
   174  	}
   175  	c, err := NewClient(t, cfg)
   176  	if c == nil || err != nil {
   177  		t.Fatalf("new client with BackoffWaitBetween should succeed, got %v", err)
   178  	}
   179  	defer c.Close()
   180  
   181  	if c.cfg.BackoffWaitBetween != backoffWaitBetween {
   182  		t.Fatalf("client BackoffWaitBetween should be %v, got %v", backoffWaitBetween, c.cfg.BackoffWaitBetween)
   183  	}
   184  }
   185  
   186  func TestBackoffJitterFraction(t *testing.T) {
   187  	backoffJitterFraction := float64(0.9)
   188  	cfg := Config{
   189  		Endpoints:             []string{"127.0.0.1:12345"},
   190  		BackoffJitterFraction: backoffJitterFraction,
   191  	}
   192  	c, err := NewClient(t, cfg)
   193  	if c == nil || err != nil {
   194  		t.Fatalf("new client with BackoffJitterFraction should succeed, got %v", err)
   195  	}
   196  	defer c.Close()
   197  
   198  	if c.cfg.BackoffJitterFraction != backoffJitterFraction {
   199  		t.Fatalf("client BackoffJitterFraction should be %v, got %v", backoffJitterFraction, c.cfg.BackoffJitterFraction)
   200  	}
   201  }
   202  
   203  func TestIsHaltErr(t *testing.T) {
   204  	if !isHaltErr(context.TODO(), fmt.Errorf("etcdserver: some etcdserver error")) {
   205  		t.Errorf(`error prefixed with "etcdserver: " should be Halted by default`)
   206  	}
   207  	if isHaltErr(context.TODO(), rpctypes.ErrGRPCStopped) {
   208  		t.Errorf("error %v should not halt", rpctypes.ErrGRPCStopped)
   209  	}
   210  	if isHaltErr(context.TODO(), rpctypes.ErrGRPCNoLeader) {
   211  		t.Errorf("error %v should not halt", rpctypes.ErrGRPCNoLeader)
   212  	}
   213  	ctx, cancel := context.WithCancel(context.TODO())
   214  	if isHaltErr(ctx, nil) {
   215  		t.Errorf("no error and active context should not be Halted")
   216  	}
   217  	cancel()
   218  	if !isHaltErr(ctx, nil) {
   219  		t.Errorf("cancel on context should be Halted")
   220  	}
   221  }
   222  
   223  func TestCloseCtxClient(t *testing.T) {
   224  	ctx := context.Background()
   225  	c := NewCtxClient(ctx)
   226  	err := c.Close()
   227  	// Close returns ctx.toErr, a nil error means an open Done channel
   228  	if err == nil {
   229  		t.Errorf("failed to Close the client. %v", err)
   230  	}
   231  }
   232  
   233  func TestWithLogger(t *testing.T) {
   234  	ctx := context.Background()
   235  	c := NewCtxClient(ctx)
   236  	if c.lg == nil {
   237  		t.Errorf("unexpected nil in *zap.Logger")
   238  	}
   239  
   240  	c.WithLogger(nil)
   241  	if c.lg != nil {
   242  		t.Errorf("WithLogger should modify *zap.Logger")
   243  	}
   244  }
   245  
   246  func TestZapWithLogger(t *testing.T) {
   247  	ctx := context.Background()
   248  	lg := zap.NewNop()
   249  	c := NewCtxClient(ctx, WithZapLogger(lg))
   250  
   251  	if c.lg != lg {
   252  		t.Errorf("WithZapLogger should modify *zap.Logger")
   253  	}
   254  }
   255  
   256  func TestAuthTokenBundleNoOverwrite(t *testing.T) {
   257  	// Create a mock AuthServer to handle Authenticate RPCs.
   258  	lis, err := net.Listen("unix", "etcd-auth-test:0")
   259  	if err != nil {
   260  		t.Fatal(err)
   261  	}
   262  	defer lis.Close()
   263  	addr := "unix:" + lis.Addr().String()
   264  	srv := grpc.NewServer()
   265  	etcdserverpb.RegisterAuthServer(srv, mockAuthServer{})
   266  	go srv.Serve(lis)
   267  	defer srv.Stop()
   268  
   269  	// Create a client, which should call Authenticate on the mock server to
   270  	// exchange username/password for an auth token.
   271  	c, err := NewClient(t, Config{
   272  		DialTimeout: 5 * time.Second,
   273  		Endpoints:   []string{addr},
   274  		Username:    "foo",
   275  		Password:    "bar",
   276  	})
   277  	if err != nil {
   278  		t.Fatal(err)
   279  	}
   280  	defer c.Close()
   281  	oldTokenBundle := c.authTokenBundle
   282  
   283  	// Call the public Dial again, which should preserve the original
   284  	// authTokenBundle.
   285  	gc, err := c.Dial(addr)
   286  	if err != nil {
   287  		t.Fatal(err)
   288  	}
   289  	defer gc.Close()
   290  	newTokenBundle := c.authTokenBundle
   291  
   292  	if oldTokenBundle != newTokenBundle {
   293  		t.Error("Client.authTokenBundle has been overwritten during Client.Dial")
   294  	}
   295  }
   296  
   297  type mockAuthServer struct {
   298  	*etcdserverpb.UnimplementedAuthServer
   299  }
   300  
   301  func (mockAuthServer) Authenticate(context.Context, *etcdserverpb.AuthenticateRequest) (*etcdserverpb.AuthenticateResponse, error) {
   302  	return &etcdserverpb.AuthenticateResponse{Token: "mock-token"}, nil
   303  }
   304  
   305  func TestSyncFiltersMembers(t *testing.T) {
   306  	c, _ := NewClient(t, Config{Endpoints: []string{"http://254.0.0.1:12345"}})
   307  	defer c.Close()
   308  	c.Cluster = &mockCluster{
   309  		[]*etcdserverpb.Member{
   310  			{ID: 0, Name: "", ClientURLs: []string{"http://254.0.0.1:12345"}, IsLearner: false},
   311  			{ID: 1, Name: "isStarted", ClientURLs: []string{"http://254.0.0.2:12345"}, IsLearner: true},
   312  			{ID: 2, Name: "isStartedAndNotLearner", ClientURLs: []string{"http://254.0.0.3:12345"}, IsLearner: false},
   313  		},
   314  	}
   315  	c.Sync(context.Background())
   316  
   317  	endpoints := c.Endpoints()
   318  	if len(endpoints) != 1 || endpoints[0] != "http://254.0.0.3:12345" {
   319  		t.Error("Client.Sync uses learner and/or non-started member client URLs")
   320  	}
   321  }
   322  
   323  type mockCluster struct {
   324  	members []*etcdserverpb.Member
   325  }
   326  
   327  func (mc *mockCluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
   328  	return &MemberListResponse{Members: mc.members}, nil
   329  }
   330  
   331  func (mc *mockCluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
   332  	return nil, nil
   333  }
   334  
   335  func (mc *mockCluster) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
   336  	return nil, nil
   337  }
   338  
   339  func (mc *mockCluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
   340  	return nil, nil
   341  }
   342  
   343  func (mc *mockCluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
   344  	return nil, nil
   345  }
   346  
   347  func (mc *mockCluster) MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error) {
   348  	return nil, nil
   349  }
   350  
   351  func TestClientRejectOldCluster(t *testing.T) {
   352  	testutil.RegisterLeakDetection(t)
   353  	var tests = []struct {
   354  		name          string
   355  		endpoints     []string
   356  		versions      []string
   357  		expectedError error
   358  	}{
   359  		{
   360  			name:          "all new versions with the same value",
   361  			endpoints:     []string{"192.168.3.41:22379", "192.168.3.41:22479", "192.168.3.41:22579"},
   362  			versions:      []string{"3.5.4", "3.5.4", "3.5.4"},
   363  			expectedError: nil,
   364  		},
   365  		{
   366  			name:          "all new versions with different values",
   367  			endpoints:     []string{"192.168.3.41:22379", "192.168.3.41:22479", "192.168.3.41:22579"},
   368  			versions:      []string{"3.5.4", "3.5.4", "3.4.0"},
   369  			expectedError: nil,
   370  		},
   371  		{
   372  			name:          "all old versions with different values",
   373  			endpoints:     []string{"192.168.3.41:22379", "192.168.3.41:22479", "192.168.3.41:22579"},
   374  			versions:      []string{"3.3.0", "3.3.0", "3.4.0"},
   375  			expectedError: ErrOldCluster,
   376  		},
   377  		{
   378  			name:          "all old versions with the same value",
   379  			endpoints:     []string{"192.168.3.41:22379", "192.168.3.41:22479", "192.168.3.41:22579"},
   380  			versions:      []string{"3.3.0", "3.3.0", "3.3.0"},
   381  			expectedError: ErrOldCluster,
   382  		},
   383  	}
   384  
   385  	for _, tt := range tests {
   386  		t.Run(tt.name, func(t *testing.T) {
   387  			if len(tt.endpoints) != len(tt.versions) || len(tt.endpoints) == 0 {
   388  				t.Errorf("Unexpected endpoints and versions length, len(endpoints):%d, len(versions):%d", len(tt.endpoints), len(tt.versions))
   389  				return
   390  			}
   391  			endpointToVersion := make(map[string]string)
   392  			for j := range tt.endpoints {
   393  				endpointToVersion[tt.endpoints[j]] = tt.versions[j]
   394  			}
   395  			c := &Client{
   396  				ctx: context.Background(),
   397  				cfg: Config{
   398  					Endpoints: tt.endpoints,
   399  				},
   400  				mu: new(sync.RWMutex),
   401  				Maintenance: &mockMaintenance{
   402  					Version: endpointToVersion,
   403  				},
   404  			}
   405  
   406  			if err := c.checkVersion(); err != tt.expectedError {
   407  				t.Errorf("heckVersion err:%v", err)
   408  			}
   409  		})
   410  
   411  	}
   412  
   413  }
   414  
   415  type mockMaintenance struct {
   416  	Version map[string]string
   417  }
   418  
   419  func (mm mockMaintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
   420  	return &StatusResponse{Version: mm.Version[endpoint]}, nil
   421  }
   422  
   423  func (mm mockMaintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
   424  	return nil, nil
   425  }
   426  
   427  func (mm mockMaintenance) AlarmDisarm(ctx context.Context, m *AlarmMember) (*AlarmResponse, error) {
   428  	return nil, nil
   429  }
   430  
   431  func (mm mockMaintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
   432  	return nil, nil
   433  }
   434  
   435  func (mm mockMaintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) {
   436  	return nil, nil
   437  }
   438  
   439  func (mm mockMaintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
   440  	return nil, nil
   441  }
   442  
   443  func (mm mockMaintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
   444  	return nil, nil
   445  }
   446  

View as plain text