...

Source file src/k8s.io/client-go/util/workqueue/default_rate_limiters.go

Documentation: k8s.io/client-go/util/workqueue

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package workqueue
    18  
    19  import (
    20  	"math"
    21  	"sync"
    22  	"time"
    23  
    24  	"golang.org/x/time/rate"
    25  )
    26  
    27  type RateLimiter interface {
    28  	// When gets an item and gets to decide how long that item should wait
    29  	When(item interface{}) time.Duration
    30  	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for failing
    31  	// or for success, we'll stop tracking it
    32  	Forget(item interface{})
    33  	// NumRequeues returns back how many failures the item has had
    34  	NumRequeues(item interface{}) int
    35  }
    36  
    37  // DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue.  It has
    38  // both overall and per-item rate limiting.  The overall is a token bucket and the per-item is exponential
    39  func DefaultControllerRateLimiter() RateLimiter {
    40  	return NewMaxOfRateLimiter(
    41  		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
    42  		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
    43  		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
    44  	)
    45  }
    46  
    47  // BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
    48  type BucketRateLimiter struct {
    49  	*rate.Limiter
    50  }
    51  
    52  var _ RateLimiter = &BucketRateLimiter{}
    53  
    54  func (r *BucketRateLimiter) When(item interface{}) time.Duration {
    55  	return r.Limiter.Reserve().Delay()
    56  }
    57  
    58  func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
    59  	return 0
    60  }
    61  
    62  func (r *BucketRateLimiter) Forget(item interface{}) {
    63  }
    64  
    65  // ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
    66  // dealing with max failures and expiration are up to the caller
    67  type ItemExponentialFailureRateLimiter struct {
    68  	failuresLock sync.Mutex
    69  	failures     map[interface{}]int
    70  
    71  	baseDelay time.Duration
    72  	maxDelay  time.Duration
    73  }
    74  
    75  var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
    76  
    77  func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
    78  	return &ItemExponentialFailureRateLimiter{
    79  		failures:  map[interface{}]int{},
    80  		baseDelay: baseDelay,
    81  		maxDelay:  maxDelay,
    82  	}
    83  }
    84  
    85  func DefaultItemBasedRateLimiter() RateLimiter {
    86  	return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
    87  }
    88  
    89  func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
    90  	r.failuresLock.Lock()
    91  	defer r.failuresLock.Unlock()
    92  
    93  	exp := r.failures[item]
    94  	r.failures[item] = r.failures[item] + 1
    95  
    96  	// The backoff is capped such that 'calculated' value never overflows.
    97  	backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
    98  	if backoff > math.MaxInt64 {
    99  		return r.maxDelay
   100  	}
   101  
   102  	calculated := time.Duration(backoff)
   103  	if calculated > r.maxDelay {
   104  		return r.maxDelay
   105  	}
   106  
   107  	return calculated
   108  }
   109  
   110  func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
   111  	r.failuresLock.Lock()
   112  	defer r.failuresLock.Unlock()
   113  
   114  	return r.failures[item]
   115  }
   116  
   117  func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
   118  	r.failuresLock.Lock()
   119  	defer r.failuresLock.Unlock()
   120  
   121  	delete(r.failures, item)
   122  }
   123  
   124  // ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
   125  type ItemFastSlowRateLimiter struct {
   126  	failuresLock sync.Mutex
   127  	failures     map[interface{}]int
   128  
   129  	maxFastAttempts int
   130  	fastDelay       time.Duration
   131  	slowDelay       time.Duration
   132  }
   133  
   134  var _ RateLimiter = &ItemFastSlowRateLimiter{}
   135  
   136  func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
   137  	return &ItemFastSlowRateLimiter{
   138  		failures:        map[interface{}]int{},
   139  		fastDelay:       fastDelay,
   140  		slowDelay:       slowDelay,
   141  		maxFastAttempts: maxFastAttempts,
   142  	}
   143  }
   144  
   145  func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
   146  	r.failuresLock.Lock()
   147  	defer r.failuresLock.Unlock()
   148  
   149  	r.failures[item] = r.failures[item] + 1
   150  
   151  	if r.failures[item] <= r.maxFastAttempts {
   152  		return r.fastDelay
   153  	}
   154  
   155  	return r.slowDelay
   156  }
   157  
   158  func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
   159  	r.failuresLock.Lock()
   160  	defer r.failuresLock.Unlock()
   161  
   162  	return r.failures[item]
   163  }
   164  
   165  func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
   166  	r.failuresLock.Lock()
   167  	defer r.failuresLock.Unlock()
   168  
   169  	delete(r.failures, item)
   170  }
   171  
   172  // MaxOfRateLimiter calls every RateLimiter and returns the worst case response
   173  // When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
   174  // were separately delayed a longer time.
   175  type MaxOfRateLimiter struct {
   176  	limiters []RateLimiter
   177  }
   178  
   179  func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
   180  	ret := time.Duration(0)
   181  	for _, limiter := range r.limiters {
   182  		curr := limiter.When(item)
   183  		if curr > ret {
   184  			ret = curr
   185  		}
   186  	}
   187  
   188  	return ret
   189  }
   190  
   191  func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
   192  	return &MaxOfRateLimiter{limiters: limiters}
   193  }
   194  
   195  func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
   196  	ret := 0
   197  	for _, limiter := range r.limiters {
   198  		curr := limiter.NumRequeues(item)
   199  		if curr > ret {
   200  			ret = curr
   201  		}
   202  	}
   203  
   204  	return ret
   205  }
   206  
   207  func (r *MaxOfRateLimiter) Forget(item interface{}) {
   208  	for _, limiter := range r.limiters {
   209  		limiter.Forget(item)
   210  	}
   211  }
   212  
   213  // WithMaxWaitRateLimiter have maxDelay which avoids waiting too long
   214  type WithMaxWaitRateLimiter struct {
   215  	limiter  RateLimiter
   216  	maxDelay time.Duration
   217  }
   218  
   219  func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
   220  	return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay}
   221  }
   222  
   223  func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
   224  	delay := w.limiter.When(item)
   225  	if delay > w.maxDelay {
   226  		return w.maxDelay
   227  	}
   228  
   229  	return delay
   230  }
   231  
   232  func (w WithMaxWaitRateLimiter) Forget(item interface{}) {
   233  	w.limiter.Forget(item)
   234  }
   235  
   236  func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int {
   237  	return w.limiter.NumRequeues(item)
   238  }
   239  

View as plain text