...

Source file src/go.etcd.io/etcd/server/v3/lease/lessor.go

Documentation: go.etcd.io/etcd/server/v3/lease

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package lease
    16  
    17  import (
    18  	"container/heap"
    19  	"context"
    20  	"encoding/binary"
    21  	"errors"
    22  	"fmt"
    23  	"math"
    24  	"sort"
    25  	"sync"
    26  	"time"
    27  
    28  	"github.com/coreos/go-semver/semver"
    29  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    30  	"go.etcd.io/etcd/server/v3/lease/leasepb"
    31  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    32  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    33  	"go.uber.org/zap"
    34  )
    35  
    36  // NoLease is a special LeaseID representing the absence of a lease.
    37  const NoLease = LeaseID(0)
    38  
    39  // MaxLeaseTTL is the maximum lease TTL value
    40  const MaxLeaseTTL = 9000000000
    41  
    42  var v3_6 = semver.Version{Major: 3, Minor: 6}
    43  
    44  var (
    45  	forever = time.Time{}
    46  
    47  	// maximum number of leases to revoke per second; configurable for tests
    48  	leaseRevokeRate = 1000
    49  
    50  	// maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests
    51  	leaseCheckpointRate = 1000
    52  
    53  	// the default interval of lease checkpoint
    54  	defaultLeaseCheckpointInterval = 5 * time.Minute
    55  
    56  	// maximum number of lease checkpoints to batch into a single consensus log entry
    57  	maxLeaseCheckpointBatchSize = 1000
    58  
    59  	// the default interval to check if the expired lease is revoked
    60  	defaultExpiredleaseRetryInterval = 3 * time.Second
    61  
    62  	ErrNotPrimary       = errors.New("not a primary lessor")
    63  	ErrLeaseNotFound    = errors.New("lease not found")
    64  	ErrLeaseExists      = errors.New("lease already exists")
    65  	ErrLeaseTTLTooLarge = errors.New("too large lease TTL")
    66  )
    67  
    68  // TxnDelete is a TxnWrite that only permits deletes. Defined here
    69  // to avoid circular dependency with mvcc.
    70  type TxnDelete interface {
    71  	DeleteRange(key, end []byte) (n, rev int64)
    72  	End()
    73  }
    74  
    75  // RangeDeleter is a TxnDelete constructor.
    76  type RangeDeleter func() TxnDelete
    77  
    78  // Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to
    79  // avoid circular dependency with mvcc.
    80  type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error
    81  
    82  type LeaseID int64
    83  
    84  // Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
    85  type Lessor interface {
    86  	// SetRangeDeleter lets the lessor create TxnDeletes to the store.
    87  	// Lessor deletes the items in the revoked or expired lease by creating
    88  	// new TxnDeletes.
    89  	SetRangeDeleter(rd RangeDeleter)
    90  
    91  	SetCheckpointer(cp Checkpointer)
    92  
    93  	// Grant grants a lease that expires at least after TTL seconds.
    94  	Grant(id LeaseID, ttl int64) (*Lease, error)
    95  	// Revoke revokes a lease with given ID. The item attached to the
    96  	// given lease will be removed. If the ID does not exist, an error
    97  	// will be returned.
    98  	Revoke(id LeaseID) error
    99  
   100  	// Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set
   101  	// the expiry of leases to less than the full TTL when possible.
   102  	Checkpoint(id LeaseID, remainingTTL int64) error
   103  
   104  	// Attach attaches given leaseItem to the lease with given LeaseID.
   105  	// If the lease does not exist, an error will be returned.
   106  	Attach(id LeaseID, items []LeaseItem) error
   107  
   108  	// GetLease returns LeaseID for given item.
   109  	// If no lease found, NoLease value will be returned.
   110  	GetLease(item LeaseItem) LeaseID
   111  
   112  	// Detach detaches given leaseItem from the lease with given LeaseID.
   113  	// If the lease does not exist, an error will be returned.
   114  	Detach(id LeaseID, items []LeaseItem) error
   115  
   116  	// Promote promotes the lessor to be the primary lessor. Primary lessor manages
   117  	// the expiration and renew of leases.
   118  	// Newly promoted lessor renew the TTL of all lease to extend + previous TTL.
   119  	Promote(extend time.Duration)
   120  
   121  	// Demote demotes the lessor from being the primary lessor.
   122  	Demote()
   123  
   124  	// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
   125  	// an error will be returned.
   126  	Renew(id LeaseID) (int64, error)
   127  
   128  	// Lookup gives the lease at a given lease id, if any
   129  	Lookup(id LeaseID) *Lease
   130  
   131  	// Leases lists all leases.
   132  	Leases() []*Lease
   133  
   134  	// ExpiredLeasesC returns a chan that is used to receive expired leases.
   135  	ExpiredLeasesC() <-chan []*Lease
   136  
   137  	// Recover recovers the lessor state from the given backend and RangeDeleter.
   138  	Recover(b backend.Backend, rd RangeDeleter)
   139  
   140  	// Stop stops the lessor for managing leases. The behavior of calling Stop multiple
   141  	// times is undefined.
   142  	Stop()
   143  }
   144  
   145  // lessor implements Lessor interface.
   146  // TODO: use clockwork for testability.
   147  type lessor struct {
   148  	mu sync.RWMutex
   149  
   150  	// demotec is set when the lessor is the primary.
   151  	// demotec will be closed if the lessor is demoted.
   152  	demotec chan struct{}
   153  
   154  	leaseMap             map[LeaseID]*Lease
   155  	leaseExpiredNotifier *LeaseExpiredNotifier
   156  	leaseCheckpointHeap  LeaseQueue
   157  	itemMap              map[LeaseItem]LeaseID
   158  
   159  	// When a lease expires, the lessor will delete the
   160  	// leased range (or key) by the RangeDeleter.
   161  	rd RangeDeleter
   162  
   163  	// When a lease's deadline should be persisted to preserve the remaining TTL across leader
   164  	// elections and restarts, the lessor will checkpoint the lease by the Checkpointer.
   165  	cp Checkpointer
   166  
   167  	// backend to persist leases. We only persist lease ID and expiry for now.
   168  	// The leased items can be recovered by iterating all the keys in kv.
   169  	b backend.Backend
   170  
   171  	// minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any
   172  	// requests for shorter TTLs are extended to the minimum TTL.
   173  	minLeaseTTL int64
   174  
   175  	expiredC chan []*Lease
   176  	// stopC is a channel whose closure indicates that the lessor should be stopped.
   177  	stopC chan struct{}
   178  	// doneC is a channel whose closure indicates that the lessor is stopped.
   179  	doneC chan struct{}
   180  
   181  	lg *zap.Logger
   182  
   183  	// Wait duration between lease checkpoints.
   184  	checkpointInterval time.Duration
   185  	// the interval to check if the expired lease is revoked
   186  	expiredLeaseRetryInterval time.Duration
   187  	// whether lessor should always persist remaining TTL (always enabled in v3.6).
   188  	checkpointPersist bool
   189  	// cluster is used to adapt lessor logic based on cluster version
   190  	cluster cluster
   191  }
   192  
   193  type cluster interface {
   194  	// Version is the cluster-wide minimum major.minor version.
   195  	Version() *semver.Version
   196  }
   197  
   198  type LessorConfig struct {
   199  	MinLeaseTTL                int64
   200  	CheckpointInterval         time.Duration
   201  	ExpiredLeasesRetryInterval time.Duration
   202  	CheckpointPersist          bool
   203  }
   204  
   205  func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
   206  	return newLessor(lg, b, cluster, cfg)
   207  }
   208  
   209  func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
   210  	checkpointInterval := cfg.CheckpointInterval
   211  	expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
   212  	if checkpointInterval == 0 {
   213  		checkpointInterval = defaultLeaseCheckpointInterval
   214  	}
   215  	if expiredLeaseRetryInterval == 0 {
   216  		expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval
   217  	}
   218  	l := &lessor{
   219  		leaseMap:                  make(map[LeaseID]*Lease),
   220  		itemMap:                   make(map[LeaseItem]LeaseID),
   221  		leaseExpiredNotifier:      newLeaseExpiredNotifier(),
   222  		leaseCheckpointHeap:       make(LeaseQueue, 0),
   223  		b:                         b,
   224  		minLeaseTTL:               cfg.MinLeaseTTL,
   225  		checkpointInterval:        checkpointInterval,
   226  		expiredLeaseRetryInterval: expiredLeaseRetryInterval,
   227  		checkpointPersist:         cfg.CheckpointPersist,
   228  		// expiredC is a small buffered chan to avoid unnecessary blocking.
   229  		expiredC: make(chan []*Lease, 16),
   230  		stopC:    make(chan struct{}),
   231  		doneC:    make(chan struct{}),
   232  		lg:       lg,
   233  		cluster:  cluster,
   234  	}
   235  	l.initAndRecover()
   236  
   237  	go l.runLoop()
   238  
   239  	return l
   240  }
   241  
   242  // isPrimary indicates if this lessor is the primary lessor. The primary
   243  // lessor manages lease expiration and renew.
   244  //
   245  // in etcd, raft leader is the primary. Thus there might be two primary
   246  // leaders at the same time (raft allows concurrent leader but with different term)
   247  // for at most a leader election timeout.
   248  // The old primary leader cannot affect the correctness since its proposal has a
   249  // smaller term and will not be committed.
   250  //
   251  // TODO: raft follower do not forward lease management proposals. There might be a
   252  // very small window (within second normally which depends on go scheduling) that
   253  // a raft follow is the primary between the raft leader demotion and lessor demotion.
   254  // Usually this should not be a problem. Lease should not be that sensitive to timing.
   255  func (le *lessor) isPrimary() bool {
   256  	return le.demotec != nil
   257  }
   258  
   259  func (le *lessor) SetRangeDeleter(rd RangeDeleter) {
   260  	le.mu.Lock()
   261  	defer le.mu.Unlock()
   262  
   263  	le.rd = rd
   264  }
   265  
   266  func (le *lessor) SetCheckpointer(cp Checkpointer) {
   267  	le.mu.Lock()
   268  	defer le.mu.Unlock()
   269  
   270  	le.cp = cp
   271  }
   272  
   273  func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
   274  	if id == NoLease {
   275  		return nil, ErrLeaseNotFound
   276  	}
   277  
   278  	if ttl > MaxLeaseTTL {
   279  		return nil, ErrLeaseTTLTooLarge
   280  	}
   281  
   282  	// TODO: when lessor is under high load, it should give out lease
   283  	// with longer TTL to reduce renew load.
   284  	l := NewLease(id, ttl)
   285  
   286  	le.mu.Lock()
   287  	defer le.mu.Unlock()
   288  
   289  	if _, ok := le.leaseMap[id]; ok {
   290  		return nil, ErrLeaseExists
   291  	}
   292  
   293  	if l.ttl < le.minLeaseTTL {
   294  		l.ttl = le.minLeaseTTL
   295  	}
   296  
   297  	if le.isPrimary() {
   298  		l.refresh(0)
   299  	} else {
   300  		l.forever()
   301  	}
   302  
   303  	le.leaseMap[id] = l
   304  	l.persistTo(le.b)
   305  
   306  	leaseTotalTTLs.Observe(float64(l.ttl))
   307  	leaseGranted.Inc()
   308  
   309  	if le.isPrimary() {
   310  		item := &LeaseWithTime{id: l.ID, time: l.expiry}
   311  		le.leaseExpiredNotifier.RegisterOrUpdate(item)
   312  		le.scheduleCheckpointIfNeeded(l)
   313  	}
   314  
   315  	return l, nil
   316  }
   317  
   318  func (le *lessor) Revoke(id LeaseID) error {
   319  	le.mu.Lock()
   320  
   321  	l := le.leaseMap[id]
   322  	if l == nil {
   323  		le.mu.Unlock()
   324  		return ErrLeaseNotFound
   325  	}
   326  	defer close(l.revokec)
   327  	// unlock before doing external work
   328  	le.mu.Unlock()
   329  
   330  	if le.rd == nil {
   331  		return nil
   332  	}
   333  
   334  	txn := le.rd()
   335  
   336  	// sort keys so deletes are in same order among all members,
   337  	// otherwise the backend hashes will be different
   338  	keys := l.Keys()
   339  	sort.StringSlice(keys).Sort()
   340  	for _, key := range keys {
   341  		txn.DeleteRange([]byte(key), nil)
   342  	}
   343  
   344  	le.mu.Lock()
   345  	defer le.mu.Unlock()
   346  	delete(le.leaseMap, l.ID)
   347  	// lease deletion needs to be in the same backend transaction with the
   348  	// kv deletion. Or we might end up with not executing the revoke or not
   349  	// deleting the keys if etcdserver fails in between.
   350  	le.b.BatchTx().UnsafeDelete(buckets.Lease, int64ToBytes(int64(l.ID)))
   351  
   352  	txn.End()
   353  
   354  	leaseRevoked.Inc()
   355  	return nil
   356  }
   357  
   358  func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
   359  	le.mu.Lock()
   360  	defer le.mu.Unlock()
   361  
   362  	if l, ok := le.leaseMap[id]; ok {
   363  		// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
   364  		l.remainingTTL = remainingTTL
   365  		if le.shouldPersistCheckpoints() {
   366  			l.persistTo(le.b)
   367  		}
   368  		if le.isPrimary() {
   369  			// schedule the next checkpoint as needed
   370  			le.scheduleCheckpointIfNeeded(l)
   371  		}
   372  	}
   373  	return nil
   374  }
   375  
   376  func (le *lessor) shouldPersistCheckpoints() bool {
   377  	cv := le.cluster.Version()
   378  	return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6))
   379  }
   380  
   381  func greaterOrEqual(first, second semver.Version) bool {
   382  	return !first.LessThan(second)
   383  }
   384  
   385  // Renew renews an existing lease. If the given lease does not exist or
   386  // has expired, an error will be returned.
   387  func (le *lessor) Renew(id LeaseID) (int64, error) {
   388  	le.mu.RLock()
   389  	if !le.isPrimary() {
   390  		// forward renew request to primary instead of returning error.
   391  		le.mu.RUnlock()
   392  		return -1, ErrNotPrimary
   393  	}
   394  
   395  	demotec := le.demotec
   396  
   397  	l := le.leaseMap[id]
   398  	if l == nil {
   399  		le.mu.RUnlock()
   400  		return -1, ErrLeaseNotFound
   401  	}
   402  	// Clear remaining TTL when we renew if it is set
   403  	clearRemainingTTL := le.cp != nil && l.remainingTTL > 0
   404  
   405  	le.mu.RUnlock()
   406  	if l.expired() {
   407  		select {
   408  		// A expired lease might be pending for revoking or going through
   409  		// quorum to be revoked. To be accurate, renew request must wait for the
   410  		// deletion to complete.
   411  		case <-l.revokec:
   412  			return -1, ErrLeaseNotFound
   413  		// The expired lease might fail to be revoked if the primary changes.
   414  		// The caller will retry on ErrNotPrimary.
   415  		case <-demotec:
   416  			return -1, ErrNotPrimary
   417  		case <-le.stopC:
   418  			return -1, ErrNotPrimary
   419  		}
   420  	}
   421  
   422  	// Clear remaining TTL when we renew if it is set
   423  	// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
   424  	// of RAFT entries written per lease to a max of 2 per checkpoint interval.
   425  	if clearRemainingTTL {
   426  		if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}); err != nil {
   427  			return -1, err
   428  		}
   429  	}
   430  
   431  	le.mu.Lock()
   432  	l.refresh(0)
   433  	item := &LeaseWithTime{id: l.ID, time: l.expiry}
   434  	le.leaseExpiredNotifier.RegisterOrUpdate(item)
   435  	le.mu.Unlock()
   436  
   437  	leaseRenewed.Inc()
   438  	return l.ttl, nil
   439  }
   440  
   441  func (le *lessor) Lookup(id LeaseID) *Lease {
   442  	le.mu.RLock()
   443  	defer le.mu.RUnlock()
   444  	return le.leaseMap[id]
   445  }
   446  
   447  func (le *lessor) unsafeLeases() []*Lease {
   448  	leases := make([]*Lease, 0, len(le.leaseMap))
   449  	for _, l := range le.leaseMap {
   450  		leases = append(leases, l)
   451  	}
   452  	return leases
   453  }
   454  
   455  func (le *lessor) Leases() []*Lease {
   456  	le.mu.RLock()
   457  	ls := le.unsafeLeases()
   458  	le.mu.RUnlock()
   459  	sort.Sort(leasesByExpiry(ls))
   460  	return ls
   461  }
   462  
   463  func (le *lessor) Promote(extend time.Duration) {
   464  	le.mu.Lock()
   465  	defer le.mu.Unlock()
   466  
   467  	le.demotec = make(chan struct{})
   468  
   469  	// refresh the expiries of all leases.
   470  	for _, l := range le.leaseMap {
   471  		l.refresh(extend)
   472  		item := &LeaseWithTime{id: l.ID, time: l.expiry}
   473  		le.leaseExpiredNotifier.RegisterOrUpdate(item)
   474  		le.scheduleCheckpointIfNeeded(l)
   475  	}
   476  
   477  	if len(le.leaseMap) < leaseRevokeRate {
   478  		// no possibility of lease pile-up
   479  		return
   480  	}
   481  
   482  	// adjust expiries in case of overlap
   483  	leases := le.unsafeLeases()
   484  	sort.Sort(leasesByExpiry(leases))
   485  
   486  	baseWindow := leases[0].Remaining()
   487  	nextWindow := baseWindow + time.Second
   488  	expires := 0
   489  	// have fewer expires than the total revoke rate so piled up leases
   490  	// don't consume the entire revoke limit
   491  	targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
   492  	for _, l := range leases {
   493  		remaining := l.Remaining()
   494  		if remaining > nextWindow {
   495  			baseWindow = remaining
   496  			nextWindow = baseWindow + time.Second
   497  			expires = 1
   498  			continue
   499  		}
   500  		expires++
   501  		if expires <= targetExpiresPerSecond {
   502  			continue
   503  		}
   504  		rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
   505  		// If leases are extended by n seconds, leases n seconds ahead of the
   506  		// base window should be extended by only one second.
   507  		rateDelay -= float64(remaining - baseWindow)
   508  		delay := time.Duration(rateDelay)
   509  		nextWindow = baseWindow + delay
   510  		l.refresh(delay + extend)
   511  		item := &LeaseWithTime{id: l.ID, time: l.expiry}
   512  		le.leaseExpiredNotifier.RegisterOrUpdate(item)
   513  		le.scheduleCheckpointIfNeeded(l)
   514  	}
   515  }
   516  
   517  type leasesByExpiry []*Lease
   518  
   519  func (le leasesByExpiry) Len() int           { return len(le) }
   520  func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() }
   521  func (le leasesByExpiry) Swap(i, j int)      { le[i], le[j] = le[j], le[i] }
   522  
   523  func (le *lessor) Demote() {
   524  	le.mu.Lock()
   525  	defer le.mu.Unlock()
   526  
   527  	// set the expiries of all leases to forever
   528  	for _, l := range le.leaseMap {
   529  		l.forever()
   530  	}
   531  
   532  	le.clearScheduledLeasesCheckpoints()
   533  	le.clearLeaseExpiredNotifier()
   534  
   535  	if le.demotec != nil {
   536  		close(le.demotec)
   537  		le.demotec = nil
   538  	}
   539  }
   540  
   541  // Attach attaches items to the lease with given ID. When the lease
   542  // expires, the attached items will be automatically removed.
   543  // If the given lease does not exist, an error will be returned.
   544  func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
   545  	le.mu.Lock()
   546  	defer le.mu.Unlock()
   547  
   548  	l := le.leaseMap[id]
   549  	if l == nil {
   550  		return ErrLeaseNotFound
   551  	}
   552  
   553  	l.mu.Lock()
   554  	for _, it := range items {
   555  		l.itemSet[it] = struct{}{}
   556  		le.itemMap[it] = id
   557  	}
   558  	l.mu.Unlock()
   559  	return nil
   560  }
   561  
   562  func (le *lessor) GetLease(item LeaseItem) LeaseID {
   563  	le.mu.RLock()
   564  	id := le.itemMap[item]
   565  	le.mu.RUnlock()
   566  	return id
   567  }
   568  
   569  // Detach detaches items from the lease with given ID.
   570  // If the given lease does not exist, an error will be returned.
   571  func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
   572  	le.mu.Lock()
   573  	defer le.mu.Unlock()
   574  
   575  	l := le.leaseMap[id]
   576  	if l == nil {
   577  		return ErrLeaseNotFound
   578  	}
   579  
   580  	l.mu.Lock()
   581  	for _, it := range items {
   582  		delete(l.itemSet, it)
   583  		delete(le.itemMap, it)
   584  	}
   585  	l.mu.Unlock()
   586  	return nil
   587  }
   588  
   589  func (le *lessor) Recover(b backend.Backend, rd RangeDeleter) {
   590  	le.mu.Lock()
   591  	defer le.mu.Unlock()
   592  
   593  	le.b = b
   594  	le.rd = rd
   595  	le.leaseMap = make(map[LeaseID]*Lease)
   596  	le.itemMap = make(map[LeaseItem]LeaseID)
   597  	le.initAndRecover()
   598  }
   599  
   600  func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
   601  	return le.expiredC
   602  }
   603  
   604  func (le *lessor) Stop() {
   605  	close(le.stopC)
   606  	<-le.doneC
   607  }
   608  
   609  func (le *lessor) runLoop() {
   610  	defer close(le.doneC)
   611  
   612  	for {
   613  		le.revokeExpiredLeases()
   614  		le.checkpointScheduledLeases()
   615  
   616  		select {
   617  		case <-time.After(500 * time.Millisecond):
   618  		case <-le.stopC:
   619  			return
   620  		}
   621  	}
   622  }
   623  
   624  // revokeExpiredLeases finds all leases past their expiry and sends them to expired channel for
   625  // to be revoked.
   626  func (le *lessor) revokeExpiredLeases() {
   627  	var ls []*Lease
   628  
   629  	// rate limit
   630  	revokeLimit := leaseRevokeRate / 2
   631  
   632  	le.mu.RLock()
   633  	if le.isPrimary() {
   634  		ls = le.findExpiredLeases(revokeLimit)
   635  	}
   636  	le.mu.RUnlock()
   637  
   638  	if len(ls) != 0 {
   639  		select {
   640  		case <-le.stopC:
   641  			return
   642  		case le.expiredC <- ls:
   643  		default:
   644  			// the receiver of expiredC is probably busy handling
   645  			// other stuff
   646  			// let's try this next time after 500ms
   647  		}
   648  	}
   649  }
   650  
   651  // checkpointScheduledLeases finds all scheduled lease checkpoints that are due and
   652  // submits them to the checkpointer to persist them to the consensus log.
   653  func (le *lessor) checkpointScheduledLeases() {
   654  	// rate limit
   655  	for i := 0; i < leaseCheckpointRate/2; i++ {
   656  		var cps []*pb.LeaseCheckpoint
   657  		le.mu.Lock()
   658  		if le.isPrimary() {
   659  			cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)
   660  		}
   661  		le.mu.Unlock()
   662  
   663  		if len(cps) != 0 {
   664  			if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}); err != nil {
   665  				return
   666  			}
   667  		}
   668  		if len(cps) < maxLeaseCheckpointBatchSize {
   669  			return
   670  		}
   671  	}
   672  }
   673  
   674  func (le *lessor) clearScheduledLeasesCheckpoints() {
   675  	le.leaseCheckpointHeap = make(LeaseQueue, 0)
   676  }
   677  
   678  func (le *lessor) clearLeaseExpiredNotifier() {
   679  	le.leaseExpiredNotifier = newLeaseExpiredNotifier()
   680  }
   681  
   682  // expireExists returns true if expiry items exist.
   683  // It pops only when expiry item exists.
   684  // "next" is true, to indicate that it may exist in next attempt.
   685  func (le *lessor) expireExists() (l *Lease, ok bool, next bool) {
   686  	if le.leaseExpiredNotifier.Len() == 0 {
   687  		return nil, false, false
   688  	}
   689  
   690  	item := le.leaseExpiredNotifier.Poll()
   691  	l = le.leaseMap[item.id]
   692  	if l == nil {
   693  		// lease has expired or been revoked
   694  		// no need to revoke (nothing is expiry)
   695  		le.leaseExpiredNotifier.Unregister() // O(log N)
   696  		return nil, false, true
   697  	}
   698  	now := time.Now()
   699  	if now.Before(item.time) /* item.time: expiration time */ {
   700  		// Candidate expirations are caught up, reinsert this item
   701  		// and no need to revoke (nothing is expiry)
   702  		return l, false, false
   703  	}
   704  
   705  	// recheck if revoke is complete after retry interval
   706  	item.time = now.Add(le.expiredLeaseRetryInterval)
   707  	le.leaseExpiredNotifier.RegisterOrUpdate(item)
   708  	return l, true, false
   709  }
   710  
   711  // findExpiredLeases loops leases in the leaseMap until reaching expired limit
   712  // and returns the expired leases that needed to be revoked.
   713  func (le *lessor) findExpiredLeases(limit int) []*Lease {
   714  	leases := make([]*Lease, 0, 16)
   715  
   716  	for {
   717  		l, ok, next := le.expireExists()
   718  		if !ok && !next {
   719  			break
   720  		}
   721  		if !ok {
   722  			continue
   723  		}
   724  		if next {
   725  			continue
   726  		}
   727  
   728  		if l.expired() {
   729  			leases = append(leases, l)
   730  
   731  			// reach expired limit
   732  			if len(leases) == limit {
   733  				break
   734  			}
   735  		}
   736  	}
   737  
   738  	return leases
   739  }
   740  
   741  func (le *lessor) scheduleCheckpointIfNeeded(lease *Lease) {
   742  	if le.cp == nil {
   743  		return
   744  	}
   745  
   746  	if lease.RemainingTTL() > int64(le.checkpointInterval.Seconds()) {
   747  		if le.lg != nil {
   748  			le.lg.Debug("Scheduling lease checkpoint",
   749  				zap.Int64("leaseID", int64(lease.ID)),
   750  				zap.Duration("intervalSeconds", le.checkpointInterval),
   751  			)
   752  		}
   753  		heap.Push(&le.leaseCheckpointHeap, &LeaseWithTime{
   754  			id:   lease.ID,
   755  			time: time.Now().Add(le.checkpointInterval),
   756  		})
   757  	}
   758  }
   759  
   760  func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCheckpoint {
   761  	if le.cp == nil {
   762  		return nil
   763  	}
   764  
   765  	now := time.Now()
   766  	cps := []*pb.LeaseCheckpoint{}
   767  	for le.leaseCheckpointHeap.Len() > 0 && len(cps) < checkpointLimit {
   768  		lt := le.leaseCheckpointHeap[0]
   769  		if lt.time.After(now) /* lt.time: next checkpoint time */ {
   770  			return cps
   771  		}
   772  		heap.Pop(&le.leaseCheckpointHeap)
   773  		var l *Lease
   774  		var ok bool
   775  		if l, ok = le.leaseMap[lt.id]; !ok {
   776  			continue
   777  		}
   778  		if !now.Before(l.expiry) {
   779  			continue
   780  		}
   781  		remainingTTL := int64(math.Ceil(l.expiry.Sub(now).Seconds()))
   782  		if remainingTTL >= l.ttl {
   783  			continue
   784  		}
   785  		if le.lg != nil {
   786  			le.lg.Debug("Checkpointing lease",
   787  				zap.Int64("leaseID", int64(lt.id)),
   788  				zap.Int64("remainingTTL", remainingTTL),
   789  			)
   790  		}
   791  		cps = append(cps, &pb.LeaseCheckpoint{ID: int64(lt.id), Remaining_TTL: remainingTTL})
   792  	}
   793  	return cps
   794  }
   795  
   796  func (le *lessor) initAndRecover() {
   797  	tx := le.b.BatchTx()
   798  	tx.LockOutsideApply()
   799  
   800  	tx.UnsafeCreateBucket(buckets.Lease)
   801  	lpbs := unsafeGetAllLeases(tx)
   802  	tx.Unlock()
   803  	for _, lpb := range lpbs {
   804  		ID := LeaseID(lpb.ID)
   805  		if lpb.TTL < le.minLeaseTTL {
   806  			lpb.TTL = le.minLeaseTTL
   807  		}
   808  		le.leaseMap[ID] = &Lease{
   809  			ID:  ID,
   810  			ttl: lpb.TTL,
   811  			// itemSet will be filled in when recover key-value pairs
   812  			// set expiry to forever, refresh when promoted
   813  			itemSet:      make(map[LeaseItem]struct{}),
   814  			expiry:       forever,
   815  			revokec:      make(chan struct{}),
   816  			remainingTTL: lpb.RemainingTTL,
   817  		}
   818  	}
   819  	le.leaseExpiredNotifier.Init()
   820  	heap.Init(&le.leaseCheckpointHeap)
   821  
   822  	le.b.ForceCommit()
   823  }
   824  
   825  type Lease struct {
   826  	ID           LeaseID
   827  	ttl          int64 // time to live of the lease in seconds
   828  	remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used
   829  	// expiryMu protects concurrent accesses to expiry
   830  	expiryMu sync.RWMutex
   831  	// expiry is time when lease should expire. no expiration when expiry.IsZero() is true
   832  	expiry time.Time
   833  
   834  	// mu protects concurrent accesses to itemSet
   835  	mu      sync.RWMutex
   836  	itemSet map[LeaseItem]struct{}
   837  	revokec chan struct{}
   838  }
   839  
   840  func NewLease(id LeaseID, ttl int64) *Lease {
   841  	return &Lease{
   842  		ID:      id,
   843  		ttl:     ttl,
   844  		itemSet: make(map[LeaseItem]struct{}),
   845  		revokec: make(chan struct{}),
   846  	}
   847  }
   848  
   849  func (l *Lease) expired() bool {
   850  	return l.Remaining() <= 0
   851  }
   852  
   853  func (l *Lease) persistTo(b backend.Backend) {
   854  	key := int64ToBytes(int64(l.ID))
   855  
   856  	lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
   857  	val, err := lpb.Marshal()
   858  	if err != nil {
   859  		panic("failed to marshal lease proto item")
   860  	}
   861  
   862  	b.BatchTx().LockInsideApply()
   863  	b.BatchTx().UnsafePut(buckets.Lease, key, val)
   864  	b.BatchTx().Unlock()
   865  }
   866  
   867  // TTL returns the TTL of the Lease.
   868  func (l *Lease) TTL() int64 {
   869  	return l.ttl
   870  }
   871  
   872  // SetLeaseItem sets the given lease item, this func is thread-safe
   873  func (l *Lease) SetLeaseItem(item LeaseItem) {
   874  	l.mu.Lock()
   875  	defer l.mu.Unlock()
   876  	l.itemSet[item] = struct{}{}
   877  }
   878  
   879  // RemainingTTL returns the last checkpointed remaining TTL of the lease.
   880  // TODO(jpbetz): do not expose this utility method
   881  func (l *Lease) RemainingTTL() int64 {
   882  	if l.remainingTTL > 0 {
   883  		return l.remainingTTL
   884  	}
   885  	return l.ttl
   886  }
   887  
   888  // refresh refreshes the expiry of the lease.
   889  func (l *Lease) refresh(extend time.Duration) {
   890  	newExpiry := time.Now().Add(extend + time.Duration(l.RemainingTTL())*time.Second)
   891  	l.expiryMu.Lock()
   892  	defer l.expiryMu.Unlock()
   893  	l.expiry = newExpiry
   894  }
   895  
   896  // forever sets the expiry of lease to be forever.
   897  func (l *Lease) forever() {
   898  	l.expiryMu.Lock()
   899  	defer l.expiryMu.Unlock()
   900  	l.expiry = forever
   901  }
   902  
   903  // Keys returns all the keys attached to the lease.
   904  func (l *Lease) Keys() []string {
   905  	l.mu.RLock()
   906  	keys := make([]string, 0, len(l.itemSet))
   907  	for k := range l.itemSet {
   908  		keys = append(keys, k.Key)
   909  	}
   910  	l.mu.RUnlock()
   911  	return keys
   912  }
   913  
   914  // Remaining returns the remaining time of the lease.
   915  func (l *Lease) Remaining() time.Duration {
   916  	l.expiryMu.RLock()
   917  	defer l.expiryMu.RUnlock()
   918  	if l.expiry.IsZero() {
   919  		return time.Duration(math.MaxInt64)
   920  	}
   921  	return time.Until(l.expiry)
   922  }
   923  
   924  type LeaseItem struct {
   925  	Key string
   926  }
   927  
   928  func int64ToBytes(n int64) []byte {
   929  	bytes := make([]byte, 8)
   930  	binary.BigEndian.PutUint64(bytes, uint64(n))
   931  	return bytes
   932  }
   933  
   934  func bytesToLeaseID(bytes []byte) int64 {
   935  	if len(bytes) != 8 {
   936  		panic(fmt.Errorf("lease ID must be 8-byte"))
   937  	}
   938  	return int64(binary.BigEndian.Uint64(bytes))
   939  }
   940  
   941  func unsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease {
   942  	ls := make([]*leasepb.Lease, 0)
   943  	err := tx.UnsafeForEach(buckets.Lease, func(k, v []byte) error {
   944  		var lpb leasepb.Lease
   945  		err := lpb.Unmarshal(v)
   946  		if err != nil {
   947  			return fmt.Errorf("failed to Unmarshal lease proto item; lease ID=%016x", bytesToLeaseID(k))
   948  		}
   949  		ls = append(ls, &lpb)
   950  		return nil
   951  	})
   952  	if err != nil {
   953  		panic(err)
   954  	}
   955  	return ls
   956  }
   957  
   958  // FakeLessor is a fake implementation of Lessor interface.
   959  // Used for testing only.
   960  type FakeLessor struct{}
   961  
   962  func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {}
   963  
   964  func (fl *FakeLessor) SetCheckpointer(cp Checkpointer) {}
   965  
   966  func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil }
   967  
   968  func (fl *FakeLessor) Revoke(id LeaseID) error { return nil }
   969  
   970  func (fl *FakeLessor) Checkpoint(id LeaseID, remainingTTL int64) error { return nil }
   971  
   972  func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil }
   973  
   974  func (fl *FakeLessor) GetLease(item LeaseItem) LeaseID            { return 0 }
   975  func (fl *FakeLessor) Detach(id LeaseID, items []LeaseItem) error { return nil }
   976  
   977  func (fl *FakeLessor) Promote(extend time.Duration) {}
   978  
   979  func (fl *FakeLessor) Demote() {}
   980  
   981  func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil }
   982  
   983  func (fl *FakeLessor) Lookup(id LeaseID) *Lease { return nil }
   984  
   985  func (fl *FakeLessor) Leases() []*Lease { return nil }
   986  
   987  func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil }
   988  
   989  func (fl *FakeLessor) Recover(b backend.Backend, rd RangeDeleter) {}
   990  
   991  func (fl *FakeLessor) Stop() {}
   992  
   993  type FakeTxnDelete struct {
   994  	backend.BatchTx
   995  }
   996  
   997  func (ftd *FakeTxnDelete) DeleteRange(key, end []byte) (n, rev int64) { return 0, 0 }
   998  func (ftd *FakeTxnDelete) End()                                       { ftd.Unlock() }
   999  

View as plain text