...

Source file src/go.etcd.io/etcd/server/v3/lease/lessor_test.go

Documentation: go.etcd.io/etcd/server/v3/lease

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package lease
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"io/ioutil"
    21  	"math"
    22  	"os"
    23  	"path/filepath"
    24  	"reflect"
    25  	"sort"
    26  	"sync"
    27  	"testing"
    28  	"time"
    29  
    30  	"github.com/coreos/go-semver/semver"
    31  	"github.com/stretchr/testify/assert"
    32  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    33  	"go.etcd.io/etcd/api/v3/version"
    34  	"go.etcd.io/etcd/server/v3/lease/leasepb"
    35  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    36  	betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
    37  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    38  	"go.uber.org/zap"
    39  )
    40  
    41  const (
    42  	minLeaseTTL         = int64(5)
    43  	minLeaseTTLDuration = time.Duration(minLeaseTTL) * time.Second
    44  )
    45  
    46  // TestLessorGrant ensures Lessor can grant wanted lease.
    47  // The granted lease should have a unique ID with a term
    48  // that is greater than minLeaseTTL.
    49  func TestLessorGrant(t *testing.T) {
    50  	lg := zap.NewNop()
    51  	dir, be := NewTestBackend(t)
    52  	defer os.RemoveAll(dir)
    53  	defer be.Close()
    54  
    55  	le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
    56  	defer le.Stop()
    57  	le.Promote(0)
    58  
    59  	l, err := le.Grant(1, 1)
    60  	if err != nil {
    61  		t.Fatalf("could not grant lease 1 (%v)", err)
    62  	}
    63  	if l.ttl != minLeaseTTL {
    64  		t.Fatalf("ttl = %v, expect minLeaseTTL %v", l.ttl, minLeaseTTL)
    65  	}
    66  
    67  	gl := le.Lookup(l.ID)
    68  
    69  	if !reflect.DeepEqual(gl, l) {
    70  		t.Errorf("lease = %v, want %v", gl, l)
    71  	}
    72  	if l.Remaining() < minLeaseTTLDuration-time.Second {
    73  		t.Errorf("term = %v, want at least %v", l.Remaining(), minLeaseTTLDuration-time.Second)
    74  	}
    75  
    76  	_, err = le.Grant(1, 1)
    77  	if err == nil {
    78  		t.Errorf("allocated the same lease")
    79  	}
    80  
    81  	var nl *Lease
    82  	nl, err = le.Grant(2, 1)
    83  	if err != nil {
    84  		t.Errorf("could not grant lease 2 (%v)", err)
    85  	}
    86  	if nl.ID == l.ID {
    87  		t.Errorf("new lease.id = %x, want != %x", nl.ID, l.ID)
    88  	}
    89  
    90  	lss := []*Lease{gl, nl}
    91  	leases := le.Leases()
    92  	for i := range lss {
    93  		if lss[i].ID != leases[i].ID {
    94  			t.Fatalf("lease ID expected %d, got %d", lss[i].ID, leases[i].ID)
    95  		}
    96  		if lss[i].ttl != leases[i].ttl {
    97  			t.Fatalf("ttl expected %d, got %d", lss[i].ttl, leases[i].ttl)
    98  		}
    99  	}
   100  
   101  	be.BatchTx().Lock()
   102  	_, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0)
   103  	if len(vs) != 1 {
   104  		t.Errorf("len(vs) = %d, want 1", len(vs))
   105  	}
   106  	be.BatchTx().Unlock()
   107  }
   108  
   109  // TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded
   110  // from concurrent map writes on 'itemSet'.
   111  func TestLeaseConcurrentKeys(t *testing.T) {
   112  	lg := zap.NewNop()
   113  	dir, be := NewTestBackend(t)
   114  	defer os.RemoveAll(dir)
   115  	defer be.Close()
   116  
   117  	le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
   118  	defer le.Stop()
   119  	le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
   120  
   121  	// grant a lease with long term (100 seconds) to
   122  	// avoid early termination during the test.
   123  	l, err := le.Grant(1, 100)
   124  	if err != nil {
   125  		t.Fatalf("could not grant lease for 100s ttl (%v)", err)
   126  	}
   127  
   128  	itemn := 10
   129  	items := make([]LeaseItem, itemn)
   130  	for i := 0; i < itemn; i++ {
   131  		items[i] = LeaseItem{Key: fmt.Sprintf("foo%d", i)}
   132  	}
   133  	if err = le.Attach(l.ID, items); err != nil {
   134  		t.Fatalf("failed to attach items to the lease: %v", err)
   135  	}
   136  
   137  	donec := make(chan struct{})
   138  	go func() {
   139  		le.Detach(l.ID, items)
   140  		close(donec)
   141  	}()
   142  
   143  	var wg sync.WaitGroup
   144  	wg.Add(itemn)
   145  	for i := 0; i < itemn; i++ {
   146  		go func() {
   147  			defer wg.Done()
   148  			l.Keys()
   149  		}()
   150  	}
   151  
   152  	<-donec
   153  	wg.Wait()
   154  }
   155  
   156  // TestLessorRevoke ensures Lessor can revoke a lease.
   157  // The items in the revoked lease should be removed from
   158  // the backend.
   159  // The revoked lease cannot be got from Lessor again.
   160  func TestLessorRevoke(t *testing.T) {
   161  	lg := zap.NewNop()
   162  	dir, be := NewTestBackend(t)
   163  	defer os.RemoveAll(dir)
   164  	defer be.Close()
   165  
   166  	le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
   167  	defer le.Stop()
   168  	var fd *fakeDeleter
   169  	le.SetRangeDeleter(func() TxnDelete {
   170  		fd = newFakeDeleter(be)
   171  		return fd
   172  	})
   173  
   174  	// grant a lease with long term (100 seconds) to
   175  	// avoid early termination during the test.
   176  	l, err := le.Grant(1, 100)
   177  	if err != nil {
   178  		t.Fatalf("could not grant lease for 100s ttl (%v)", err)
   179  	}
   180  
   181  	items := []LeaseItem{
   182  		{"foo"},
   183  		{"bar"},
   184  	}
   185  
   186  	if err = le.Attach(l.ID, items); err != nil {
   187  		t.Fatalf("failed to attach items to the lease: %v", err)
   188  	}
   189  
   190  	if err = le.Revoke(l.ID); err != nil {
   191  		t.Fatal("failed to revoke lease:", err)
   192  	}
   193  
   194  	if le.Lookup(l.ID) != nil {
   195  		t.Errorf("got revoked lease %x", l.ID)
   196  	}
   197  
   198  	wdeleted := []string{"bar_", "foo_"}
   199  	sort.Strings(fd.deleted)
   200  	if !reflect.DeepEqual(fd.deleted, wdeleted) {
   201  		t.Errorf("deleted= %v, want %v", fd.deleted, wdeleted)
   202  	}
   203  
   204  	be.BatchTx().Lock()
   205  	_, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0)
   206  	if len(vs) != 0 {
   207  		t.Errorf("len(vs) = %d, want 0", len(vs))
   208  	}
   209  	be.BatchTx().Unlock()
   210  }
   211  
   212  // TestLessorRenew ensures Lessor can renew an existing lease.
   213  func TestLessorRenew(t *testing.T) {
   214  	lg := zap.NewNop()
   215  	dir, be := NewTestBackend(t)
   216  	defer be.Close()
   217  	defer os.RemoveAll(dir)
   218  
   219  	le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
   220  	defer le.Stop()
   221  	le.Promote(0)
   222  
   223  	l, err := le.Grant(1, minLeaseTTL)
   224  	if err != nil {
   225  		t.Fatalf("failed to grant lease (%v)", err)
   226  	}
   227  
   228  	// manually change the ttl field
   229  	le.mu.Lock()
   230  	l.ttl = 10
   231  	le.mu.Unlock()
   232  	ttl, err := le.Renew(l.ID)
   233  	if err != nil {
   234  		t.Fatalf("failed to renew lease (%v)", err)
   235  	}
   236  	if ttl != l.ttl {
   237  		t.Errorf("ttl = %d, want %d", ttl, l.ttl)
   238  	}
   239  
   240  	l = le.Lookup(l.ID)
   241  	if l.Remaining() < 9*time.Second {
   242  		t.Errorf("failed to renew the lease")
   243  	}
   244  }
   245  
   246  func TestLessorRenewWithCheckpointer(t *testing.T) {
   247  	lg := zap.NewNop()
   248  	dir, be := NewTestBackend(t)
   249  	defer be.Close()
   250  	defer os.RemoveAll(dir)
   251  
   252  	le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
   253  	fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
   254  		for _, cp := range cp.GetCheckpoints() {
   255  			le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
   256  		}
   257  		return nil
   258  	}
   259  	defer le.Stop()
   260  	// Set checkpointer
   261  	le.SetCheckpointer(fakerCheckerpointer)
   262  	le.Promote(0)
   263  
   264  	l, err := le.Grant(1, minLeaseTTL)
   265  	if err != nil {
   266  		t.Fatalf("failed to grant lease (%v)", err)
   267  	}
   268  
   269  	// manually change the ttl field
   270  	le.mu.Lock()
   271  	l.ttl = 10
   272  	l.remainingTTL = 10
   273  	le.mu.Unlock()
   274  	ttl, err := le.Renew(l.ID)
   275  	if err != nil {
   276  		t.Fatalf("failed to renew lease (%v)", err)
   277  	}
   278  	if ttl != l.ttl {
   279  		t.Errorf("ttl = %d, want %d", ttl, l.ttl)
   280  	}
   281  	if l.remainingTTL != 0 {
   282  		t.Fatalf("remianingTTL = %d, want %d", l.remainingTTL, 0)
   283  	}
   284  
   285  	l = le.Lookup(l.ID)
   286  	if l.Remaining() < 9*time.Second {
   287  		t.Errorf("failed to renew the lease")
   288  	}
   289  }
   290  
   291  // TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many
   292  // expire at the same time.
   293  func TestLessorRenewExtendPileup(t *testing.T) {
   294  	oldRevokeRate := leaseRevokeRate
   295  	defer func() { leaseRevokeRate = oldRevokeRate }()
   296  	lg := zap.NewNop()
   297  	leaseRevokeRate = 10
   298  
   299  	dir, be := NewTestBackend(t)
   300  	defer os.RemoveAll(dir)
   301  
   302  	le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
   303  	ttl := int64(10)
   304  	for i := 1; i <= leaseRevokeRate*10; i++ {
   305  		if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
   306  			t.Fatal(err)
   307  		}
   308  		// ttls that overlap spillover for ttl=10
   309  		if _, err := le.Grant(LeaseID(2*i+1), ttl+1); err != nil {
   310  			t.Fatal(err)
   311  		}
   312  	}
   313  
   314  	// simulate stop and recovery
   315  	le.Stop()
   316  	be.Close()
   317  	bcfg := backend.DefaultBackendConfig()
   318  	bcfg.Path = filepath.Join(dir, "be")
   319  	be = backend.New(bcfg)
   320  	defer be.Close()
   321  	le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
   322  	defer le.Stop()
   323  
   324  	// extend after recovery should extend expiration on lease pile-up
   325  	le.Promote(0)
   326  
   327  	windowCounts := make(map[int64]int)
   328  	for _, l := range le.leaseMap {
   329  		// round up slightly for baseline ttl
   330  		s := int64(l.Remaining().Seconds() + 0.1)
   331  		windowCounts[s]++
   332  	}
   333  
   334  	for i := ttl; i < ttl+20; i++ {
   335  		c := windowCounts[i]
   336  		if c > leaseRevokeRate {
   337  			t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c)
   338  		}
   339  		if c < leaseRevokeRate/2 {
   340  			t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c)
   341  		}
   342  	}
   343  }
   344  
   345  func TestLessorDetach(t *testing.T) {
   346  	lg := zap.NewNop()
   347  	dir, be := NewTestBackend(t)
   348  	defer os.RemoveAll(dir)
   349  	defer be.Close()
   350  
   351  	le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
   352  	defer le.Stop()
   353  	le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
   354  
   355  	// grant a lease with long term (100 seconds) to
   356  	// avoid early termination during the test.
   357  	l, err := le.Grant(1, 100)
   358  	if err != nil {
   359  		t.Fatalf("could not grant lease for 100s ttl (%v)", err)
   360  	}
   361  
   362  	items := []LeaseItem{
   363  		{"foo"},
   364  		{"bar"},
   365  	}
   366  
   367  	if err := le.Attach(l.ID, items); err != nil {
   368  		t.Fatalf("failed to attach items to the lease: %v", err)
   369  	}
   370  
   371  	if err := le.Detach(l.ID, items[0:1]); err != nil {
   372  		t.Fatalf("failed to de-attach items to the lease: %v", err)
   373  	}
   374  
   375  	l = le.Lookup(l.ID)
   376  	if len(l.itemSet) != 1 {
   377  		t.Fatalf("len(l.itemSet) = %d, failed to de-attach items", len(l.itemSet))
   378  	}
   379  	if _, ok := l.itemSet[LeaseItem{"bar"}]; !ok {
   380  		t.Fatalf("de-attached wrong item, want %q exists", "bar")
   381  	}
   382  }
   383  
   384  // TestLessorRecover ensures Lessor recovers leases from
   385  // persist backend.
   386  func TestLessorRecover(t *testing.T) {
   387  	lg := zap.NewNop()
   388  	dir, be := NewTestBackend(t)
   389  	defer os.RemoveAll(dir)
   390  	defer be.Close()
   391  
   392  	le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
   393  	defer le.Stop()
   394  	l1, err1 := le.Grant(1, 10)
   395  	l2, err2 := le.Grant(2, 20)
   396  	if err1 != nil || err2 != nil {
   397  		t.Fatalf("could not grant initial leases (%v, %v)", err1, err2)
   398  	}
   399  
   400  	// Create a new lessor with the same backend
   401  	nle := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
   402  	defer nle.Stop()
   403  	nl1 := nle.Lookup(l1.ID)
   404  	if nl1 == nil || nl1.ttl != l1.ttl {
   405  		t.Errorf("nl1 = %v, want nl1.ttl= %d", nl1.ttl, l1.ttl)
   406  	}
   407  
   408  	nl2 := nle.Lookup(l2.ID)
   409  	if nl2 == nil || nl2.ttl != l2.ttl {
   410  		t.Errorf("nl2 = %v, want nl2.ttl= %d", nl2.ttl, l2.ttl)
   411  	}
   412  }
   413  
   414  func TestLessorExpire(t *testing.T) {
   415  	lg := zap.NewNop()
   416  	dir, be := NewTestBackend(t)
   417  	defer os.RemoveAll(dir)
   418  	defer be.Close()
   419  
   420  	testMinTTL := int64(1)
   421  
   422  	le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
   423  	defer le.Stop()
   424  
   425  	le.Promote(1 * time.Second)
   426  	l, err := le.Grant(1, testMinTTL)
   427  	if err != nil {
   428  		t.Fatalf("failed to create lease: %v", err)
   429  	}
   430  
   431  	select {
   432  	case el := <-le.ExpiredLeasesC():
   433  		if el[0].ID != l.ID {
   434  			t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID)
   435  		}
   436  	case <-time.After(10 * time.Second):
   437  		t.Fatalf("failed to receive expired lease")
   438  	}
   439  
   440  	donec := make(chan struct{}, 1)
   441  	go func() {
   442  		// expired lease cannot be renewed
   443  		if _, err := le.Renew(l.ID); err != ErrLeaseNotFound {
   444  			t.Errorf("unexpected renew")
   445  		}
   446  		donec <- struct{}{}
   447  	}()
   448  
   449  	select {
   450  	case <-donec:
   451  		t.Fatalf("renew finished before lease revocation")
   452  	case <-time.After(50 * time.Millisecond):
   453  	}
   454  
   455  	// expired lease can be revoked
   456  	if err := le.Revoke(l.ID); err != nil {
   457  		t.Fatalf("failed to revoke expired lease: %v", err)
   458  	}
   459  
   460  	select {
   461  	case <-donec:
   462  	case <-time.After(10 * time.Second):
   463  		t.Fatalf("renew has not returned after lease revocation")
   464  	}
   465  }
   466  
   467  func TestLessorExpireAndDemote(t *testing.T) {
   468  	lg := zap.NewNop()
   469  	dir, be := NewTestBackend(t)
   470  	defer os.RemoveAll(dir)
   471  	defer be.Close()
   472  
   473  	testMinTTL := int64(1)
   474  
   475  	le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
   476  	defer le.Stop()
   477  
   478  	le.Promote(1 * time.Second)
   479  	l, err := le.Grant(1, testMinTTL)
   480  	if err != nil {
   481  		t.Fatalf("failed to create lease: %v", err)
   482  	}
   483  
   484  	select {
   485  	case el := <-le.ExpiredLeasesC():
   486  		if el[0].ID != l.ID {
   487  			t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID)
   488  		}
   489  	case <-time.After(10 * time.Second):
   490  		t.Fatalf("failed to receive expired lease")
   491  	}
   492  
   493  	donec := make(chan struct{}, 1)
   494  	go func() {
   495  		// expired lease cannot be renewed
   496  		if _, err := le.Renew(l.ID); err != ErrNotPrimary {
   497  			t.Errorf("unexpected renew: %v", err)
   498  		}
   499  		donec <- struct{}{}
   500  	}()
   501  
   502  	select {
   503  	case <-donec:
   504  		t.Fatalf("renew finished before demotion")
   505  	case <-time.After(50 * time.Millisecond):
   506  	}
   507  
   508  	// demote will cause the renew request to fail with ErrNotPrimary
   509  	le.Demote()
   510  
   511  	select {
   512  	case <-donec:
   513  	case <-time.After(10 * time.Second):
   514  		t.Fatalf("renew has not returned after lessor demotion")
   515  	}
   516  }
   517  
   518  func TestLessorMaxTTL(t *testing.T) {
   519  	lg := zap.NewNop()
   520  	dir, be := NewTestBackend(t)
   521  	defer os.RemoveAll(dir)
   522  	defer be.Close()
   523  
   524  	le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
   525  	defer le.Stop()
   526  
   527  	_, err := le.Grant(1, MaxLeaseTTL+1)
   528  	if err != ErrLeaseTTLTooLarge {
   529  		t.Fatalf("grant unexpectedly succeeded")
   530  	}
   531  }
   532  
   533  func TestLessorCheckpointScheduling(t *testing.T) {
   534  	lg := zap.NewNop()
   535  
   536  	dir, be := NewTestBackend(t)
   537  	defer os.RemoveAll(dir)
   538  	defer be.Close()
   539  
   540  	le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
   541  	defer le.Stop()
   542  	le.minLeaseTTL = 1
   543  	checkpointedC := make(chan struct{})
   544  	le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error {
   545  		close(checkpointedC)
   546  		if len(lc.Checkpoints) != 1 {
   547  			t.Errorf("expected 1 checkpoint but got %d", len(lc.Checkpoints))
   548  		}
   549  		c := lc.Checkpoints[0]
   550  		if c.Remaining_TTL != 1 {
   551  			t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
   552  		}
   553  		return nil
   554  	})
   555  	_, err := le.Grant(1, 2)
   556  	if err != nil {
   557  		t.Fatal(err)
   558  	}
   559  	le.Promote(0)
   560  
   561  	// TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds.
   562  	select {
   563  	case <-checkpointedC:
   564  	case <-time.After(2 * time.Second):
   565  		t.Fatal("expected checkpointer to be called, but it was not")
   566  	}
   567  }
   568  
   569  func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
   570  	lg := zap.NewNop()
   571  	dir, be := NewTestBackend(t)
   572  	defer os.RemoveAll(dir)
   573  	defer be.Close()
   574  
   575  	le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
   576  	defer le.Stop()
   577  	l, err := le.Grant(1, 10)
   578  	if err != nil {
   579  		t.Fatal(err)
   580  	}
   581  	le.Checkpoint(l.ID, 5)
   582  	le.Promote(0)
   583  	remaining := l.Remaining().Seconds()
   584  	if !(remaining > 4 && remaining < 5) {
   585  		t.Fatalf("expected expiry to be less than 1s in the future, but got %f seconds", remaining)
   586  	}
   587  }
   588  
   589  func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) {
   590  	const ttl int64 = 10
   591  	const checkpointTTL int64 = 5
   592  
   593  	tcs := []struct {
   594  		name               string
   595  		cluster            cluster
   596  		checkpointPersist  bool
   597  		expectRemainingTTL int64
   598  	}{
   599  		{
   600  			name:               "Etcd v3.6 and newer persist remainingTTL on checkpoint",
   601  			cluster:            clusterV3_6(),
   602  			expectRemainingTTL: checkpointTTL,
   603  		},
   604  		{
   605  			name:               "Etcd v3.5 and older persist remainingTTL if CheckpointPersist is set",
   606  			cluster:            clusterLatest(),
   607  			checkpointPersist:  true,
   608  			expectRemainingTTL: checkpointTTL,
   609  		},
   610  		{
   611  			name:               "Etcd with version unknown persists remainingTTL if CheckpointPersist is set",
   612  			cluster:            clusterNil(),
   613  			checkpointPersist:  true,
   614  			expectRemainingTTL: checkpointTTL,
   615  		},
   616  		{
   617  			name:               "Etcd v3.5 and older reset remainingTTL on checkpoint",
   618  			cluster:            clusterLatest(),
   619  			expectRemainingTTL: ttl,
   620  		},
   621  		{
   622  			name:               "Etcd with version unknown fallbacks to v3.5 behavior",
   623  			cluster:            clusterNil(),
   624  			expectRemainingTTL: ttl,
   625  		},
   626  	}
   627  	for _, tc := range tcs {
   628  		t.Run(tc.name, func(t *testing.T) {
   629  			lg := zap.NewNop()
   630  			dir, be := NewTestBackend(t)
   631  			defer os.RemoveAll(dir)
   632  			defer be.Close()
   633  
   634  			cfg := LessorConfig{MinLeaseTTL: minLeaseTTL}
   635  			cfg.CheckpointPersist = tc.checkpointPersist
   636  			le := newLessor(lg, be, tc.cluster, cfg)
   637  			l, err := le.Grant(2, ttl)
   638  			if err != nil {
   639  				t.Fatal(err)
   640  			}
   641  			if l.RemainingTTL() != ttl {
   642  				t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), ttl)
   643  			}
   644  			le.Checkpoint(2, checkpointTTL)
   645  			if l.RemainingTTL() != checkpointTTL {
   646  				t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), checkpointTTL)
   647  			}
   648  			le.Stop()
   649  			le2 := newLessor(lg, be, clusterV3_6(), cfg)
   650  			l = le2.Lookup(2)
   651  			if l.RemainingTTL() != tc.expectRemainingTTL {
   652  				t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), tc.expectRemainingTTL)
   653  			}
   654  		})
   655  	}
   656  }
   657  
   658  func TestLeaseBackend(t *testing.T) {
   659  	tcs := []struct {
   660  		name  string
   661  		setup func(tx backend.BatchTx)
   662  		want  []*leasepb.Lease
   663  	}{
   664  		{
   665  			name:  "Empty by default",
   666  			setup: func(tx backend.BatchTx) {},
   667  			want:  []*leasepb.Lease{},
   668  		},
   669  		{
   670  			name: "Returns data put before",
   671  			setup: func(tx backend.BatchTx) {
   672  				mustUnsafePutLease(tx, &leasepb.Lease{
   673  					ID:  -1,
   674  					TTL: 2,
   675  				})
   676  			},
   677  			want: []*leasepb.Lease{
   678  				{
   679  					ID:  -1,
   680  					TTL: 2,
   681  				},
   682  			},
   683  		},
   684  		{
   685  			name: "Skips deleted",
   686  			setup: func(tx backend.BatchTx) {
   687  				mustUnsafePutLease(tx, &leasepb.Lease{
   688  					ID:  -1,
   689  					TTL: 2,
   690  				})
   691  				mustUnsafePutLease(tx, &leasepb.Lease{
   692  					ID:  math.MinInt64,
   693  					TTL: 2,
   694  				})
   695  				mustUnsafePutLease(tx, &leasepb.Lease{
   696  					ID:  math.MaxInt64,
   697  					TTL: 3,
   698  				})
   699  				tx.UnsafeDelete(buckets.Lease, int64ToBytes(-1))
   700  			},
   701  			want: []*leasepb.Lease{
   702  				{
   703  					ID:  math.MaxInt64,
   704  					TTL: 3,
   705  				},
   706  				{
   707  					ID:  math.MinInt64, // bytes bigger than MaxInt64
   708  					TTL: 2,
   709  				},
   710  			},
   711  		},
   712  	}
   713  
   714  	for _, tc := range tcs {
   715  		t.Run(tc.name, func(t *testing.T) {
   716  			be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
   717  			tx := be.BatchTx()
   718  			tx.Lock()
   719  			tx.UnsafeCreateBucket(buckets.Lease)
   720  			tc.setup(tx)
   721  			tx.Unlock()
   722  
   723  			be.ForceCommit()
   724  			be.Close()
   725  
   726  			be2 := backend.NewDefaultBackend(tmpPath)
   727  			defer be2.Close()
   728  			leases := unsafeGetAllLeases(be2.ReadTx())
   729  
   730  			assert.Equal(t, tc.want, leases)
   731  		})
   732  	}
   733  }
   734  
   735  func mustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
   736  	key := int64ToBytes(lpb.ID)
   737  
   738  	val, err := lpb.Marshal()
   739  	if err != nil {
   740  		panic("failed to marshal lease proto item")
   741  	}
   742  	tx.UnsafePut(buckets.Lease, key, val)
   743  }
   744  
   745  type fakeDeleter struct {
   746  	deleted []string
   747  	tx      backend.BatchTx
   748  }
   749  
   750  func newFakeDeleter(be backend.Backend) *fakeDeleter {
   751  	fd := &fakeDeleter{nil, be.BatchTx()}
   752  	fd.tx.Lock()
   753  	return fd
   754  }
   755  
   756  func (fd *fakeDeleter) End() { fd.tx.Unlock() }
   757  
   758  func (fd *fakeDeleter) DeleteRange(key, end []byte) (int64, int64) {
   759  	fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
   760  	return 0, 0
   761  }
   762  
   763  func NewTestBackend(t *testing.T) (string, backend.Backend) {
   764  	tmpPath, err := ioutil.TempDir("", "lease")
   765  	if err != nil {
   766  		t.Fatalf("failed to create tmpdir (%v)", err)
   767  	}
   768  	bcfg := backend.DefaultBackendConfig()
   769  	bcfg.Path = filepath.Join(tmpPath, "be")
   770  	return tmpPath, backend.New(bcfg)
   771  }
   772  
   773  func clusterV3_6() cluster {
   774  	return fakeCluster{semver.New("3.6.0")}
   775  }
   776  
   777  func clusterLatest() cluster {
   778  	return fakeCluster{semver.New(version.Cluster(version.Version) + ".0")}
   779  }
   780  
   781  func clusterNil() cluster {
   782  	return fakeCluster{}
   783  }
   784  
   785  type fakeCluster struct {
   786  	version *semver.Version
   787  }
   788  
   789  func (c fakeCluster) Version() *semver.Version {
   790  	return c.version
   791  }
   792  

View as plain text