...

Source file src/k8s.io/kubernetes/pkg/util/async/bounded_frequency_runner.go

Documentation: k8s.io/kubernetes/pkg/util/async

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package async
    18  
    19  import (
    20  	"fmt"
    21  	"sync"
    22  	"time"
    23  
    24  	"k8s.io/client-go/util/flowcontrol"
    25  
    26  	"k8s.io/klog/v2"
    27  )
    28  
    29  // BoundedFrequencyRunner manages runs of a user-provided function.
    30  // See NewBoundedFrequencyRunner for examples.
    31  type BoundedFrequencyRunner struct {
    32  	name        string        // the name of this instance
    33  	minInterval time.Duration // the min time between runs, modulo bursts
    34  	maxInterval time.Duration // the max time between runs
    35  
    36  	run chan struct{} // try an async run
    37  
    38  	mu      sync.Mutex  // guards runs of fn and all mutations
    39  	fn      func()      // function to run
    40  	lastRun time.Time   // time of last run
    41  	timer   timer       // timer for deferred runs
    42  	limiter rateLimiter // rate limiter for on-demand runs
    43  
    44  	retry     chan struct{} // schedule a retry
    45  	retryMu   sync.Mutex    // guards retryTime
    46  	retryTime time.Time     // when to retry
    47  }
    48  
    49  // designed so that flowcontrol.RateLimiter satisfies
    50  type rateLimiter interface {
    51  	TryAccept() bool
    52  	Stop()
    53  }
    54  
    55  type nullLimiter struct{}
    56  
    57  func (nullLimiter) TryAccept() bool {
    58  	return true
    59  }
    60  
    61  func (nullLimiter) Stop() {}
    62  
    63  var _ rateLimiter = nullLimiter{}
    64  
    65  // for testing
    66  type timer interface {
    67  	// C returns the timer's selectable channel.
    68  	C() <-chan time.Time
    69  
    70  	// See time.Timer.Reset.
    71  	Reset(d time.Duration) bool
    72  
    73  	// See time.Timer.Stop.
    74  	Stop() bool
    75  
    76  	// See time.Now.
    77  	Now() time.Time
    78  
    79  	// Remaining returns the time until the timer will go off (if it is running).
    80  	Remaining() time.Duration
    81  
    82  	// See time.Since.
    83  	Since(t time.Time) time.Duration
    84  
    85  	// See time.Sleep.
    86  	Sleep(d time.Duration)
    87  }
    88  
    89  // implement our timer in terms of std time.Timer.
    90  type realTimer struct {
    91  	timer *time.Timer
    92  	next  time.Time
    93  }
    94  
    95  func (rt *realTimer) C() <-chan time.Time {
    96  	return rt.timer.C
    97  }
    98  
    99  func (rt *realTimer) Reset(d time.Duration) bool {
   100  	rt.next = time.Now().Add(d)
   101  	return rt.timer.Reset(d)
   102  }
   103  
   104  func (rt *realTimer) Stop() bool {
   105  	return rt.timer.Stop()
   106  }
   107  
   108  func (rt *realTimer) Now() time.Time {
   109  	return time.Now()
   110  }
   111  
   112  func (rt *realTimer) Remaining() time.Duration {
   113  	return rt.next.Sub(time.Now())
   114  }
   115  
   116  func (rt *realTimer) Since(t time.Time) time.Duration {
   117  	return time.Since(t)
   118  }
   119  
   120  func (rt *realTimer) Sleep(d time.Duration) {
   121  	time.Sleep(d)
   122  }
   123  
   124  var _ timer = &realTimer{}
   125  
   126  // NewBoundedFrequencyRunner creates a new BoundedFrequencyRunner instance,
   127  // which will manage runs of the specified function.
   128  //
   129  // All runs will be async to the caller of BoundedFrequencyRunner.Run, but
   130  // multiple runs are serialized. If the function needs to hold locks, it must
   131  // take them internally.
   132  //
   133  // Runs of the function will have at least minInterval between them (from
   134  // completion to next start), except that up to bursts may be allowed.  Burst
   135  // runs are "accumulated" over time, one per minInterval up to burstRuns total.
   136  // This can be used, for example, to mitigate the impact of expensive operations
   137  // being called in response to user-initiated operations. Run requests that
   138  // would violate the minInterval are coalesced and run at the next opportunity.
   139  //
   140  // The function will be run at least once per maxInterval. For example, this can
   141  // force periodic refreshes of state in the absence of anyone calling Run.
   142  //
   143  // Examples:
   144  //
   145  // NewBoundedFrequencyRunner("name", fn, time.Second, 5*time.Second, 1)
   146  // - fn will have at least 1 second between runs
   147  // - fn will have no more than 5 seconds between runs
   148  //
   149  // NewBoundedFrequencyRunner("name", fn, 3*time.Second, 10*time.Second, 3)
   150  // - fn will have at least 3 seconds between runs, with up to 3 burst runs
   151  // - fn will have no more than 10 seconds between runs
   152  //
   153  // The maxInterval must be greater than or equal to the minInterval,  If the
   154  // caller passes a maxInterval less than minInterval, this function will panic.
   155  func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
   156  	timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately
   157  	<-timer.C()                                  // consume the first tick
   158  	return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
   159  }
   160  
   161  // Make an instance with dependencies injected.
   162  func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
   163  	if maxInterval < minInterval {
   164  		panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval))
   165  	}
   166  	if timer == nil {
   167  		panic(fmt.Sprintf("%s: timer must be non-nil", name))
   168  	}
   169  
   170  	bfr := &BoundedFrequencyRunner{
   171  		name:        name,
   172  		fn:          fn,
   173  		minInterval: minInterval,
   174  		maxInterval: maxInterval,
   175  		run:         make(chan struct{}, 1),
   176  		retry:       make(chan struct{}, 1),
   177  		timer:       timer,
   178  	}
   179  	if minInterval == 0 {
   180  		bfr.limiter = nullLimiter{}
   181  	} else {
   182  		// allow burst updates in short succession
   183  		qps := float32(time.Second) / float32(minInterval)
   184  		bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
   185  	}
   186  	return bfr
   187  }
   188  
   189  // Loop handles the periodic timer and run requests.  This is expected to be
   190  // called as a goroutine.
   191  func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
   192  	klog.V(3).Infof("%s Loop running", bfr.name)
   193  	bfr.timer.Reset(bfr.maxInterval)
   194  	for {
   195  		select {
   196  		case <-stop:
   197  			bfr.stop()
   198  			klog.V(3).Infof("%s Loop stopping", bfr.name)
   199  			return
   200  		case <-bfr.timer.C():
   201  			bfr.tryRun()
   202  		case <-bfr.run:
   203  			bfr.tryRun()
   204  		case <-bfr.retry:
   205  			bfr.doRetry()
   206  		}
   207  	}
   208  }
   209  
   210  // Run the function as soon as possible.  If this is called while Loop is not
   211  // running, the call may be deferred indefinitely.
   212  // If there is already a queued request to call the underlying function, it
   213  // may be dropped - it is just guaranteed that we will try calling the
   214  // underlying function as soon as possible starting from now.
   215  func (bfr *BoundedFrequencyRunner) Run() {
   216  	// If it takes a lot of time to run the underlying function, noone is really
   217  	// processing elements from <run> channel. So to avoid blocking here on the
   218  	// putting element to it, we simply skip it if there is already an element
   219  	// in it.
   220  	select {
   221  	case bfr.run <- struct{}{}:
   222  	default:
   223  	}
   224  }
   225  
   226  // RetryAfter ensures that the function will run again after no later than interval. This
   227  // can be called from inside a run of the BoundedFrequencyRunner's function, or
   228  // asynchronously.
   229  func (bfr *BoundedFrequencyRunner) RetryAfter(interval time.Duration) {
   230  	// This could be called either with or without bfr.mu held, so we can't grab that
   231  	// lock, and therefore we can't update the timer directly.
   232  
   233  	// If the Loop thread is currently running fn then it may be a while before it
   234  	// processes our retry request. But we want to retry at interval from now, not at
   235  	// interval from "whenever doRetry eventually gets called". So we convert to
   236  	// absolute time.
   237  	retryTime := bfr.timer.Now().Add(interval)
   238  
   239  	// We can't just write retryTime to a channel because there could be multiple
   240  	// RetryAfter calls before Loop gets a chance to read from the channel. So we
   241  	// record the soonest requested retry time in bfr.retryTime and then only signal
   242  	// the Loop thread once, just like Run does.
   243  	bfr.retryMu.Lock()
   244  	defer bfr.retryMu.Unlock()
   245  	if !bfr.retryTime.IsZero() && bfr.retryTime.Before(retryTime) {
   246  		return
   247  	}
   248  	bfr.retryTime = retryTime
   249  
   250  	select {
   251  	case bfr.retry <- struct{}{}:
   252  	default:
   253  	}
   254  }
   255  
   256  // assumes the lock is not held
   257  func (bfr *BoundedFrequencyRunner) stop() {
   258  	bfr.mu.Lock()
   259  	defer bfr.mu.Unlock()
   260  	bfr.limiter.Stop()
   261  	bfr.timer.Stop()
   262  }
   263  
   264  // assumes the lock is not held
   265  func (bfr *BoundedFrequencyRunner) doRetry() {
   266  	bfr.mu.Lock()
   267  	defer bfr.mu.Unlock()
   268  	bfr.retryMu.Lock()
   269  	defer bfr.retryMu.Unlock()
   270  
   271  	if bfr.retryTime.IsZero() {
   272  		return
   273  	}
   274  
   275  	// Timer wants an interval not an absolute time, so convert retryTime back now
   276  	retryInterval := bfr.retryTime.Sub(bfr.timer.Now())
   277  	bfr.retryTime = time.Time{}
   278  	if retryInterval < bfr.timer.Remaining() {
   279  		klog.V(3).Infof("%s: retrying in %v", bfr.name, retryInterval)
   280  		bfr.timer.Stop()
   281  		bfr.timer.Reset(retryInterval)
   282  	}
   283  }
   284  
   285  // assumes the lock is not held
   286  func (bfr *BoundedFrequencyRunner) tryRun() {
   287  	bfr.mu.Lock()
   288  	defer bfr.mu.Unlock()
   289  
   290  	if bfr.limiter.TryAccept() {
   291  		// We're allowed to run the function right now.
   292  		bfr.fn()
   293  		bfr.lastRun = bfr.timer.Now()
   294  		bfr.timer.Stop()
   295  		bfr.timer.Reset(bfr.maxInterval)
   296  		klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
   297  		return
   298  	}
   299  
   300  	// It can't run right now, figure out when it can run next.
   301  	elapsed := bfr.timer.Since(bfr.lastRun)   // how long since last run
   302  	nextPossible := bfr.minInterval - elapsed // time to next possible run
   303  	nextScheduled := bfr.timer.Remaining()    // time to next scheduled run
   304  	klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
   305  
   306  	// It's hard to avoid race conditions in the unit tests unless we always reset
   307  	// the timer here, even when it's unchanged
   308  	if nextPossible < nextScheduled {
   309  		nextScheduled = nextPossible
   310  	}
   311  	bfr.timer.Stop()
   312  	bfr.timer.Reset(nextScheduled)
   313  }
   314  

View as plain text