...

Source file src/k8s.io/client-go/util/flowcontrol/throttle.go

Documentation: k8s.io/client-go/util/flowcontrol

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package flowcontrol
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"sync"
    23  	"time"
    24  
    25  	"golang.org/x/time/rate"
    26  	"k8s.io/utils/clock"
    27  )
    28  
    29  type PassiveRateLimiter interface {
    30  	// TryAccept returns true if a token is taken immediately. Otherwise,
    31  	// it returns false.
    32  	TryAccept() bool
    33  	// Stop stops the rate limiter, subsequent calls to CanAccept will return false
    34  	Stop()
    35  	// QPS returns QPS of this rate limiter
    36  	QPS() float32
    37  }
    38  
    39  type RateLimiter interface {
    40  	PassiveRateLimiter
    41  	// Accept returns once a token becomes available.
    42  	Accept()
    43  	// Wait returns nil if a token is taken before the Context is done.
    44  	Wait(ctx context.Context) error
    45  }
    46  
    47  type tokenBucketPassiveRateLimiter struct {
    48  	limiter *rate.Limiter
    49  	qps     float32
    50  	clock   clock.PassiveClock
    51  }
    52  
    53  type tokenBucketRateLimiter struct {
    54  	tokenBucketPassiveRateLimiter
    55  	clock Clock
    56  }
    57  
    58  // NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
    59  // The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
    60  // smoothed qps rate of 'qps'.
    61  // The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
    62  // The maximum number of tokens in the bucket is capped at 'burst'.
    63  func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
    64  	limiter := rate.NewLimiter(rate.Limit(qps), burst)
    65  	return newTokenBucketRateLimiterWithClock(limiter, clock.RealClock{}, qps)
    66  }
    67  
    68  // NewTokenBucketPassiveRateLimiter is similar to NewTokenBucketRateLimiter except that it returns
    69  // a PassiveRateLimiter which does not have Accept() and Wait() methods.
    70  func NewTokenBucketPassiveRateLimiter(qps float32, burst int) PassiveRateLimiter {
    71  	limiter := rate.NewLimiter(rate.Limit(qps), burst)
    72  	return newTokenBucketRateLimiterWithPassiveClock(limiter, clock.RealClock{}, qps)
    73  }
    74  
    75  // An injectable, mockable clock interface.
    76  type Clock interface {
    77  	clock.PassiveClock
    78  	Sleep(time.Duration)
    79  }
    80  
    81  var _ Clock = (*clock.RealClock)(nil)
    82  
    83  // NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
    84  // but allows an injectable clock, for testing.
    85  func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
    86  	limiter := rate.NewLimiter(rate.Limit(qps), burst)
    87  	return newTokenBucketRateLimiterWithClock(limiter, c, qps)
    88  }
    89  
    90  // NewTokenBucketPassiveRateLimiterWithClock is similar to NewTokenBucketRateLimiterWithClock
    91  // except that it returns a PassiveRateLimiter which does not have Accept() and Wait() methods
    92  // and uses a PassiveClock.
    93  func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter {
    94  	limiter := rate.NewLimiter(rate.Limit(qps), burst)
    95  	return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps)
    96  }
    97  
    98  func newTokenBucketRateLimiterWithClock(limiter *rate.Limiter, c Clock, qps float32) *tokenBucketRateLimiter {
    99  	return &tokenBucketRateLimiter{
   100  		tokenBucketPassiveRateLimiter: *newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps),
   101  		clock:                         c,
   102  	}
   103  }
   104  
   105  func newTokenBucketRateLimiterWithPassiveClock(limiter *rate.Limiter, c clock.PassiveClock, qps float32) *tokenBucketPassiveRateLimiter {
   106  	return &tokenBucketPassiveRateLimiter{
   107  		limiter: limiter,
   108  		qps:     qps,
   109  		clock:   c,
   110  	}
   111  }
   112  
   113  func (tbprl *tokenBucketPassiveRateLimiter) Stop() {
   114  }
   115  
   116  func (tbprl *tokenBucketPassiveRateLimiter) QPS() float32 {
   117  	return tbprl.qps
   118  }
   119  
   120  func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
   121  	return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
   122  }
   123  
   124  // Accept will block until a token becomes available
   125  func (tbrl *tokenBucketRateLimiter) Accept() {
   126  	now := tbrl.clock.Now()
   127  	tbrl.clock.Sleep(tbrl.limiter.ReserveN(now, 1).DelayFrom(now))
   128  }
   129  
   130  func (tbrl *tokenBucketRateLimiter) Wait(ctx context.Context) error {
   131  	return tbrl.limiter.Wait(ctx)
   132  }
   133  
   134  type fakeAlwaysRateLimiter struct{}
   135  
   136  func NewFakeAlwaysRateLimiter() RateLimiter {
   137  	return &fakeAlwaysRateLimiter{}
   138  }
   139  
   140  func (t *fakeAlwaysRateLimiter) TryAccept() bool {
   141  	return true
   142  }
   143  
   144  func (t *fakeAlwaysRateLimiter) Stop() {}
   145  
   146  func (t *fakeAlwaysRateLimiter) Accept() {}
   147  
   148  func (t *fakeAlwaysRateLimiter) QPS() float32 {
   149  	return 1
   150  }
   151  
   152  func (t *fakeAlwaysRateLimiter) Wait(ctx context.Context) error {
   153  	return nil
   154  }
   155  
   156  type fakeNeverRateLimiter struct {
   157  	wg sync.WaitGroup
   158  }
   159  
   160  func NewFakeNeverRateLimiter() RateLimiter {
   161  	rl := fakeNeverRateLimiter{}
   162  	rl.wg.Add(1)
   163  	return &rl
   164  }
   165  
   166  func (t *fakeNeverRateLimiter) TryAccept() bool {
   167  	return false
   168  }
   169  
   170  func (t *fakeNeverRateLimiter) Stop() {
   171  	t.wg.Done()
   172  }
   173  
   174  func (t *fakeNeverRateLimiter) Accept() {
   175  	t.wg.Wait()
   176  }
   177  
   178  func (t *fakeNeverRateLimiter) QPS() float32 {
   179  	return 1
   180  }
   181  
   182  func (t *fakeNeverRateLimiter) Wait(ctx context.Context) error {
   183  	return errors.New("can not be accept")
   184  }
   185  
   186  var (
   187  	_ RateLimiter = (*tokenBucketRateLimiter)(nil)
   188  	_ RateLimiter = (*fakeAlwaysRateLimiter)(nil)
   189  	_ RateLimiter = (*fakeNeverRateLimiter)(nil)
   190  )
   191  
   192  var _ PassiveRateLimiter = (*tokenBucketPassiveRateLimiter)(nil)
   193  

View as plain text