...

Source file src/k8s.io/client-go/tools/leaderelection/leaderelection.go

Documentation: k8s.io/client-go/tools/leaderelection

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package leaderelection implements leader election of a set of endpoints.
    18  // It uses an annotation in the endpoints object to store the record of the
    19  // election state. This implementation does not guarantee that only one
    20  // client is acting as a leader (a.k.a. fencing).
    21  //
    22  // A client only acts on timestamps captured locally to infer the state of the
    23  // leader election. The client does not consider timestamps in the leader
    24  // election record to be accurate because these timestamps may not have been
    25  // produced by a local clock. The implemention does not depend on their
    26  // accuracy and only uses their change to indicate that another client has
    27  // renewed the leader lease. Thus the implementation is tolerant to arbitrary
    28  // clock skew, but is not tolerant to arbitrary clock skew rate.
    29  //
    30  // However the level of tolerance to skew rate can be configured by setting
    31  // RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a
    32  // maximum tolerated ratio of time passed on the fastest node to time passed on
    33  // the slowest node can be approximately achieved with a configuration that sets
    34  // the same ratio of LeaseDuration to RenewDeadline. For example if a user wanted
    35  // to tolerate some nodes progressing forward in time twice as fast as other nodes,
    36  // the user could set LeaseDuration to 60 seconds and RenewDeadline to 30 seconds.
    37  //
    38  // While not required, some method of clock synchronization between nodes in the
    39  // cluster is highly recommended. It's important to keep in mind when configuring
    40  // this client that the tolerance to skew rate varies inversely to master
    41  // availability.
    42  //
    43  // Larger clusters often have a more lenient SLA for API latency. This should be
    44  // taken into account when configuring the client. The rate of leader transitions
    45  // should be monitored and RetryPeriod and LeaseDuration should be increased
    46  // until the rate is stable and acceptably low. It's important to keep in mind
    47  // when configuring this client that the tolerance to API latency varies inversely
    48  // to master availability.
    49  //
    50  // DISCLAIMER: this is an alpha API. This library will likely change significantly
    51  // or even be removed entirely in subsequent releases. Depend on this API at
    52  // your own risk.
    53  package leaderelection
    54  
    55  import (
    56  	"bytes"
    57  	"context"
    58  	"fmt"
    59  	"sync"
    60  	"time"
    61  
    62  	"k8s.io/apimachinery/pkg/api/errors"
    63  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    64  	"k8s.io/apimachinery/pkg/util/runtime"
    65  	"k8s.io/apimachinery/pkg/util/wait"
    66  	rl "k8s.io/client-go/tools/leaderelection/resourcelock"
    67  	"k8s.io/klog/v2"
    68  	"k8s.io/utils/clock"
    69  )
    70  
    71  const (
    72  	JitterFactor = 1.2
    73  )
    74  
    75  // NewLeaderElector creates a LeaderElector from a LeaderElectionConfig
    76  func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
    77  	if lec.LeaseDuration <= lec.RenewDeadline {
    78  		return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
    79  	}
    80  	if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
    81  		return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
    82  	}
    83  	if lec.LeaseDuration < 1 {
    84  		return nil, fmt.Errorf("leaseDuration must be greater than zero")
    85  	}
    86  	if lec.RenewDeadline < 1 {
    87  		return nil, fmt.Errorf("renewDeadline must be greater than zero")
    88  	}
    89  	if lec.RetryPeriod < 1 {
    90  		return nil, fmt.Errorf("retryPeriod must be greater than zero")
    91  	}
    92  	if lec.Callbacks.OnStartedLeading == nil {
    93  		return nil, fmt.Errorf("OnStartedLeading callback must not be nil")
    94  	}
    95  	if lec.Callbacks.OnStoppedLeading == nil {
    96  		return nil, fmt.Errorf("OnStoppedLeading callback must not be nil")
    97  	}
    98  
    99  	if lec.Lock == nil {
   100  		return nil, fmt.Errorf("Lock must not be nil.")
   101  	}
   102  	id := lec.Lock.Identity()
   103  	if id == "" {
   104  		return nil, fmt.Errorf("Lock identity is empty")
   105  	}
   106  
   107  	le := LeaderElector{
   108  		config:  lec,
   109  		clock:   clock.RealClock{},
   110  		metrics: globalMetricsFactory.newLeaderMetrics(),
   111  	}
   112  	le.metrics.leaderOff(le.config.Name)
   113  	return &le, nil
   114  }
   115  
   116  type LeaderElectionConfig struct {
   117  	// Lock is the resource that will be used for locking
   118  	Lock rl.Interface
   119  
   120  	// LeaseDuration is the duration that non-leader candidates will
   121  	// wait to force acquire leadership. This is measured against time of
   122  	// last observed ack.
   123  	//
   124  	// A client needs to wait a full LeaseDuration without observing a change to
   125  	// the record before it can attempt to take over. When all clients are
   126  	// shutdown and a new set of clients are started with different names against
   127  	// the same leader record, they must wait the full LeaseDuration before
   128  	// attempting to acquire the lease. Thus LeaseDuration should be as short as
   129  	// possible (within your tolerance for clock skew rate) to avoid a possible
   130  	// long waits in the scenario.
   131  	//
   132  	// Core clients default this value to 15 seconds.
   133  	LeaseDuration time.Duration
   134  	// RenewDeadline is the duration that the acting master will retry
   135  	// refreshing leadership before giving up.
   136  	//
   137  	// Core clients default this value to 10 seconds.
   138  	RenewDeadline time.Duration
   139  	// RetryPeriod is the duration the LeaderElector clients should wait
   140  	// between tries of actions.
   141  	//
   142  	// Core clients default this value to 2 seconds.
   143  	RetryPeriod time.Duration
   144  
   145  	// Callbacks are callbacks that are triggered during certain lifecycle
   146  	// events of the LeaderElector
   147  	Callbacks LeaderCallbacks
   148  
   149  	// WatchDog is the associated health checker
   150  	// WatchDog may be null if it's not needed/configured.
   151  	WatchDog *HealthzAdaptor
   152  
   153  	// ReleaseOnCancel should be set true if the lock should be released
   154  	// when the run context is cancelled. If you set this to true, you must
   155  	// ensure all code guarded by this lease has successfully completed
   156  	// prior to cancelling the context, or you may have two processes
   157  	// simultaneously acting on the critical path.
   158  	ReleaseOnCancel bool
   159  
   160  	// Name is the name of the resource lock for debugging
   161  	Name string
   162  }
   163  
   164  // LeaderCallbacks are callbacks that are triggered during certain
   165  // lifecycle events of the LeaderElector. These are invoked asynchronously.
   166  //
   167  // possible future callbacks:
   168  //   - OnChallenge()
   169  type LeaderCallbacks struct {
   170  	// OnStartedLeading is called when a LeaderElector client starts leading
   171  	OnStartedLeading func(context.Context)
   172  	// OnStoppedLeading is called when a LeaderElector client stops leading
   173  	OnStoppedLeading func()
   174  	// OnNewLeader is called when the client observes a leader that is
   175  	// not the previously observed leader. This includes the first observed
   176  	// leader when the client starts.
   177  	OnNewLeader func(identity string)
   178  }
   179  
   180  // LeaderElector is a leader election client.
   181  type LeaderElector struct {
   182  	config LeaderElectionConfig
   183  	// internal bookkeeping
   184  	observedRecord    rl.LeaderElectionRecord
   185  	observedRawRecord []byte
   186  	observedTime      time.Time
   187  	// used to implement OnNewLeader(), may lag slightly from the
   188  	// value observedRecord.HolderIdentity if the transition has
   189  	// not yet been reported.
   190  	reportedLeader string
   191  
   192  	// clock is wrapper around time to allow for less flaky testing
   193  	clock clock.Clock
   194  
   195  	// used to lock the observedRecord
   196  	observedRecordLock sync.Mutex
   197  
   198  	metrics leaderMetricsAdapter
   199  }
   200  
   201  // Run starts the leader election loop. Run will not return
   202  // before leader election loop is stopped by ctx or it has
   203  // stopped holding the leader lease
   204  func (le *LeaderElector) Run(ctx context.Context) {
   205  	defer runtime.HandleCrash()
   206  	defer le.config.Callbacks.OnStoppedLeading()
   207  
   208  	if !le.acquire(ctx) {
   209  		return // ctx signalled done
   210  	}
   211  	ctx, cancel := context.WithCancel(ctx)
   212  	defer cancel()
   213  	go le.config.Callbacks.OnStartedLeading(ctx)
   214  	le.renew(ctx)
   215  }
   216  
   217  // RunOrDie starts a client with the provided config or panics if the config
   218  // fails to validate. RunOrDie blocks until leader election loop is
   219  // stopped by ctx or it has stopped holding the leader lease
   220  func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
   221  	le, err := NewLeaderElector(lec)
   222  	if err != nil {
   223  		panic(err)
   224  	}
   225  	if lec.WatchDog != nil {
   226  		lec.WatchDog.SetLeaderElection(le)
   227  	}
   228  	le.Run(ctx)
   229  }
   230  
   231  // GetLeader returns the identity of the last observed leader or returns the empty string if
   232  // no leader has yet been observed.
   233  // This function is for informational purposes. (e.g. monitoring, logs, etc.)
   234  func (le *LeaderElector) GetLeader() string {
   235  	return le.getObservedRecord().HolderIdentity
   236  }
   237  
   238  // IsLeader returns true if the last observed leader was this client else returns false.
   239  func (le *LeaderElector) IsLeader() bool {
   240  	return le.getObservedRecord().HolderIdentity == le.config.Lock.Identity()
   241  }
   242  
   243  // acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
   244  // Returns false if ctx signals done.
   245  func (le *LeaderElector) acquire(ctx context.Context) bool {
   246  	ctx, cancel := context.WithCancel(ctx)
   247  	defer cancel()
   248  	succeeded := false
   249  	desc := le.config.Lock.Describe()
   250  	klog.Infof("attempting to acquire leader lease %v...", desc)
   251  	wait.JitterUntil(func() {
   252  		succeeded = le.tryAcquireOrRenew(ctx)
   253  		le.maybeReportTransition()
   254  		if !succeeded {
   255  			klog.V(4).Infof("failed to acquire lease %v", desc)
   256  			return
   257  		}
   258  		le.config.Lock.RecordEvent("became leader")
   259  		le.metrics.leaderOn(le.config.Name)
   260  		klog.Infof("successfully acquired lease %v", desc)
   261  		cancel()
   262  	}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
   263  	return succeeded
   264  }
   265  
   266  // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
   267  func (le *LeaderElector) renew(ctx context.Context) {
   268  	defer le.config.Lock.RecordEvent("stopped leading")
   269  	ctx, cancel := context.WithCancel(ctx)
   270  	defer cancel()
   271  	wait.Until(func() {
   272  		timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
   273  		defer timeoutCancel()
   274  		err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
   275  			return le.tryAcquireOrRenew(timeoutCtx), nil
   276  		}, timeoutCtx.Done())
   277  
   278  		le.maybeReportTransition()
   279  		desc := le.config.Lock.Describe()
   280  		if err == nil {
   281  			klog.V(5).Infof("successfully renewed lease %v", desc)
   282  			return
   283  		}
   284  		le.metrics.leaderOff(le.config.Name)
   285  		klog.Infof("failed to renew lease %v: %v", desc, err)
   286  		cancel()
   287  	}, le.config.RetryPeriod, ctx.Done())
   288  
   289  	// if we hold the lease, give it up
   290  	if le.config.ReleaseOnCancel {
   291  		le.release()
   292  	}
   293  }
   294  
   295  // release attempts to release the leader lease if we have acquired it.
   296  func (le *LeaderElector) release() bool {
   297  	if !le.IsLeader() {
   298  		return true
   299  	}
   300  	now := metav1.NewTime(le.clock.Now())
   301  	leaderElectionRecord := rl.LeaderElectionRecord{
   302  		LeaderTransitions:    le.observedRecord.LeaderTransitions,
   303  		LeaseDurationSeconds: 1,
   304  		RenewTime:            now,
   305  		AcquireTime:          now,
   306  	}
   307  	if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
   308  		klog.Errorf("Failed to release lock: %v", err)
   309  		return false
   310  	}
   311  
   312  	le.setObservedRecord(&leaderElectionRecord)
   313  	return true
   314  }
   315  
   316  // tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
   317  // else it tries to renew the lease if it has already been acquired. Returns true
   318  // on success else returns false.
   319  func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
   320  	now := metav1.NewTime(le.clock.Now())
   321  	leaderElectionRecord := rl.LeaderElectionRecord{
   322  		HolderIdentity:       le.config.Lock.Identity(),
   323  		LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
   324  		RenewTime:            now,
   325  		AcquireTime:          now,
   326  	}
   327  
   328  	// 1. fast path for the leader to update optimistically assuming that the record observed
   329  	// last time is the current version.
   330  	if le.IsLeader() && le.isLeaseValid(now.Time) {
   331  		oldObservedRecord := le.getObservedRecord()
   332  		leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime
   333  		leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions
   334  
   335  		err := le.config.Lock.Update(ctx, leaderElectionRecord)
   336  		if err == nil {
   337  			le.setObservedRecord(&leaderElectionRecord)
   338  			return true
   339  		}
   340  		klog.Errorf("Failed to update lock optimitically: %v, falling back to slow path", err)
   341  	}
   342  
   343  	// 2. obtain or create the ElectionRecord
   344  	oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
   345  	if err != nil {
   346  		if !errors.IsNotFound(err) {
   347  			klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
   348  			return false
   349  		}
   350  		if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
   351  			klog.Errorf("error initially creating leader election record: %v", err)
   352  			return false
   353  		}
   354  
   355  		le.setObservedRecord(&leaderElectionRecord)
   356  
   357  		return true
   358  	}
   359  
   360  	// 3. Record obtained, check the Identity & Time
   361  	if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
   362  		le.setObservedRecord(oldLeaderElectionRecord)
   363  
   364  		le.observedRawRecord = oldLeaderElectionRawRecord
   365  	}
   366  	if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() {
   367  		klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
   368  		return false
   369  	}
   370  
   371  	// 4. We're going to try to update. The leaderElectionRecord is set to it's default
   372  	// here. Let's correct it before updating.
   373  	if le.IsLeader() {
   374  		leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
   375  		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
   376  		le.metrics.slowpathExercised(le.config.Name)
   377  	} else {
   378  		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
   379  	}
   380  
   381  	// update the lock itself
   382  	if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
   383  		klog.Errorf("Failed to update lock: %v", err)
   384  		return false
   385  	}
   386  
   387  	le.setObservedRecord(&leaderElectionRecord)
   388  	return true
   389  }
   390  
   391  func (le *LeaderElector) maybeReportTransition() {
   392  	if le.observedRecord.HolderIdentity == le.reportedLeader {
   393  		return
   394  	}
   395  	le.reportedLeader = le.observedRecord.HolderIdentity
   396  	if le.config.Callbacks.OnNewLeader != nil {
   397  		go le.config.Callbacks.OnNewLeader(le.reportedLeader)
   398  	}
   399  }
   400  
   401  // Check will determine if the current lease is expired by more than timeout.
   402  func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
   403  	if !le.IsLeader() {
   404  		// Currently not concerned with the case that we are hot standby
   405  		return nil
   406  	}
   407  	// If we are more than timeout seconds after the lease duration that is past the timeout
   408  	// on the lease renew. Time to start reporting ourselves as unhealthy. We should have
   409  	// died but conditions like deadlock can prevent this. (See #70819)
   410  	if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease {
   411  		return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name)
   412  	}
   413  
   414  	return nil
   415  }
   416  
   417  func (le *LeaderElector) isLeaseValid(now time.Time) bool {
   418  	return le.observedTime.Add(time.Second * time.Duration(le.getObservedRecord().LeaseDurationSeconds)).After(now)
   419  }
   420  
   421  // setObservedRecord will set a new observedRecord and update observedTime to the current time.
   422  // Protect critical sections with lock.
   423  func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) {
   424  	le.observedRecordLock.Lock()
   425  	defer le.observedRecordLock.Unlock()
   426  
   427  	le.observedRecord = *observedRecord
   428  	le.observedTime = le.clock.Now()
   429  }
   430  
   431  // getObservedRecord returns observersRecord.
   432  // Protect critical sections with lock.
   433  func (le *LeaderElector) getObservedRecord() rl.LeaderElectionRecord {
   434  	le.observedRecordLock.Lock()
   435  	defer le.observedRecordLock.Unlock()
   436  
   437  	return le.observedRecord
   438  }
   439  

View as plain text