...

Source file src/golang.org/x/time/rate/rate.go

Documentation: golang.org/x/time/rate

     1  // Copyright 2015 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Package rate provides a rate limiter.
     6  package rate
     7  
     8  import (
     9  	"context"
    10  	"fmt"
    11  	"math"
    12  	"sync"
    13  	"time"
    14  )
    15  
    16  // Limit defines the maximum frequency of some events.
    17  // Limit is represented as number of events per second.
    18  // A zero Limit allows no events.
    19  type Limit float64
    20  
    21  // Inf is the infinite rate limit; it allows all events (even if burst is zero).
    22  const Inf = Limit(math.MaxFloat64)
    23  
    24  // Every converts a minimum time interval between events to a Limit.
    25  func Every(interval time.Duration) Limit {
    26  	if interval <= 0 {
    27  		return Inf
    28  	}
    29  	return 1 / Limit(interval.Seconds())
    30  }
    31  
    32  // A Limiter controls how frequently events are allowed to happen.
    33  // It implements a "token bucket" of size b, initially full and refilled
    34  // at rate r tokens per second.
    35  // Informally, in any large enough time interval, the Limiter limits the
    36  // rate to r tokens per second, with a maximum burst size of b events.
    37  // As a special case, if r == Inf (the infinite rate), b is ignored.
    38  // See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
    39  //
    40  // The zero value is a valid Limiter, but it will reject all events.
    41  // Use NewLimiter to create non-zero Limiters.
    42  //
    43  // Limiter has three main methods, Allow, Reserve, and Wait.
    44  // Most callers should use Wait.
    45  //
    46  // Each of the three methods consumes a single token.
    47  // They differ in their behavior when no token is available.
    48  // If no token is available, Allow returns false.
    49  // If no token is available, Reserve returns a reservation for a future token
    50  // and the amount of time the caller must wait before using it.
    51  // If no token is available, Wait blocks until one can be obtained
    52  // or its associated context.Context is canceled.
    53  //
    54  // The methods AllowN, ReserveN, and WaitN consume n tokens.
    55  //
    56  // Limiter is safe for simultaneous use by multiple goroutines.
    57  type Limiter struct {
    58  	mu     sync.Mutex
    59  	limit  Limit
    60  	burst  int
    61  	tokens float64
    62  	// last is the last time the limiter's tokens field was updated
    63  	last time.Time
    64  	// lastEvent is the latest time of a rate-limited event (past or future)
    65  	lastEvent time.Time
    66  }
    67  
    68  // Limit returns the maximum overall event rate.
    69  func (lim *Limiter) Limit() Limit {
    70  	lim.mu.Lock()
    71  	defer lim.mu.Unlock()
    72  	return lim.limit
    73  }
    74  
    75  // Burst returns the maximum burst size. Burst is the maximum number of tokens
    76  // that can be consumed in a single call to Allow, Reserve, or Wait, so higher
    77  // Burst values allow more events to happen at once.
    78  // A zero Burst allows no events, unless limit == Inf.
    79  func (lim *Limiter) Burst() int {
    80  	lim.mu.Lock()
    81  	defer lim.mu.Unlock()
    82  	return lim.burst
    83  }
    84  
    85  // TokensAt returns the number of tokens available at time t.
    86  func (lim *Limiter) TokensAt(t time.Time) float64 {
    87  	lim.mu.Lock()
    88  	_, tokens := lim.advance(t) // does not mutate lim
    89  	lim.mu.Unlock()
    90  	return tokens
    91  }
    92  
    93  // Tokens returns the number of tokens available now.
    94  func (lim *Limiter) Tokens() float64 {
    95  	return lim.TokensAt(time.Now())
    96  }
    97  
    98  // NewLimiter returns a new Limiter that allows events up to rate r and permits
    99  // bursts of at most b tokens.
   100  func NewLimiter(r Limit, b int) *Limiter {
   101  	return &Limiter{
   102  		limit: r,
   103  		burst: b,
   104  	}
   105  }
   106  
   107  // Allow reports whether an event may happen now.
   108  func (lim *Limiter) Allow() bool {
   109  	return lim.AllowN(time.Now(), 1)
   110  }
   111  
   112  // AllowN reports whether n events may happen at time t.
   113  // Use this method if you intend to drop / skip events that exceed the rate limit.
   114  // Otherwise use Reserve or Wait.
   115  func (lim *Limiter) AllowN(t time.Time, n int) bool {
   116  	return lim.reserveN(t, n, 0).ok
   117  }
   118  
   119  // A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
   120  // A Reservation may be canceled, which may enable the Limiter to permit additional events.
   121  type Reservation struct {
   122  	ok        bool
   123  	lim       *Limiter
   124  	tokens    int
   125  	timeToAct time.Time
   126  	// This is the Limit at reservation time, it can change later.
   127  	limit Limit
   128  }
   129  
   130  // OK returns whether the limiter can provide the requested number of tokens
   131  // within the maximum wait time.  If OK is false, Delay returns InfDuration, and
   132  // Cancel does nothing.
   133  func (r *Reservation) OK() bool {
   134  	return r.ok
   135  }
   136  
   137  // Delay is shorthand for DelayFrom(time.Now()).
   138  func (r *Reservation) Delay() time.Duration {
   139  	return r.DelayFrom(time.Now())
   140  }
   141  
   142  // InfDuration is the duration returned by Delay when a Reservation is not OK.
   143  const InfDuration = time.Duration(math.MaxInt64)
   144  
   145  // DelayFrom returns the duration for which the reservation holder must wait
   146  // before taking the reserved action.  Zero duration means act immediately.
   147  // InfDuration means the limiter cannot grant the tokens requested in this
   148  // Reservation within the maximum wait time.
   149  func (r *Reservation) DelayFrom(t time.Time) time.Duration {
   150  	if !r.ok {
   151  		return InfDuration
   152  	}
   153  	delay := r.timeToAct.Sub(t)
   154  	if delay < 0 {
   155  		return 0
   156  	}
   157  	return delay
   158  }
   159  
   160  // Cancel is shorthand for CancelAt(time.Now()).
   161  func (r *Reservation) Cancel() {
   162  	r.CancelAt(time.Now())
   163  }
   164  
   165  // CancelAt indicates that the reservation holder will not perform the reserved action
   166  // and reverses the effects of this Reservation on the rate limit as much as possible,
   167  // considering that other reservations may have already been made.
   168  func (r *Reservation) CancelAt(t time.Time) {
   169  	if !r.ok {
   170  		return
   171  	}
   172  
   173  	r.lim.mu.Lock()
   174  	defer r.lim.mu.Unlock()
   175  
   176  	if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {
   177  		return
   178  	}
   179  
   180  	// calculate tokens to restore
   181  	// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
   182  	// after r was obtained. These tokens should not be restored.
   183  	restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
   184  	if restoreTokens <= 0 {
   185  		return
   186  	}
   187  	// advance time to now
   188  	t, tokens := r.lim.advance(t)
   189  	// calculate new number of tokens
   190  	tokens += restoreTokens
   191  	if burst := float64(r.lim.burst); tokens > burst {
   192  		tokens = burst
   193  	}
   194  	// update state
   195  	r.lim.last = t
   196  	r.lim.tokens = tokens
   197  	if r.timeToAct == r.lim.lastEvent {
   198  		prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
   199  		if !prevEvent.Before(t) {
   200  			r.lim.lastEvent = prevEvent
   201  		}
   202  	}
   203  }
   204  
   205  // Reserve is shorthand for ReserveN(time.Now(), 1).
   206  func (lim *Limiter) Reserve() *Reservation {
   207  	return lim.ReserveN(time.Now(), 1)
   208  }
   209  
   210  // ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
   211  // The Limiter takes this Reservation into account when allowing future events.
   212  // The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size.
   213  // Usage example:
   214  //
   215  //	r := lim.ReserveN(time.Now(), 1)
   216  //	if !r.OK() {
   217  //	  // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
   218  //	  return
   219  //	}
   220  //	time.Sleep(r.Delay())
   221  //	Act()
   222  //
   223  // Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
   224  // If you need to respect a deadline or cancel the delay, use Wait instead.
   225  // To drop or skip events exceeding rate limit, use Allow instead.
   226  func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {
   227  	r := lim.reserveN(t, n, InfDuration)
   228  	return &r
   229  }
   230  
   231  // Wait is shorthand for WaitN(ctx, 1).
   232  func (lim *Limiter) Wait(ctx context.Context) (err error) {
   233  	return lim.WaitN(ctx, 1)
   234  }
   235  
   236  // WaitN blocks until lim permits n events to happen.
   237  // It returns an error if n exceeds the Limiter's burst size, the Context is
   238  // canceled, or the expected wait time exceeds the Context's Deadline.
   239  // The burst limit is ignored if the rate limit is Inf.
   240  func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
   241  	// The test code calls lim.wait with a fake timer generator.
   242  	// This is the real timer generator.
   243  	newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {
   244  		timer := time.NewTimer(d)
   245  		return timer.C, timer.Stop, func() {}
   246  	}
   247  
   248  	return lim.wait(ctx, n, time.Now(), newTimer)
   249  }
   250  
   251  // wait is the internal implementation of WaitN.
   252  func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
   253  	lim.mu.Lock()
   254  	burst := lim.burst
   255  	limit := lim.limit
   256  	lim.mu.Unlock()
   257  
   258  	if n > burst && limit != Inf {
   259  		return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
   260  	}
   261  	// Check if ctx is already cancelled
   262  	select {
   263  	case <-ctx.Done():
   264  		return ctx.Err()
   265  	default:
   266  	}
   267  	// Determine wait limit
   268  	waitLimit := InfDuration
   269  	if deadline, ok := ctx.Deadline(); ok {
   270  		waitLimit = deadline.Sub(t)
   271  	}
   272  	// Reserve
   273  	r := lim.reserveN(t, n, waitLimit)
   274  	if !r.ok {
   275  		return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
   276  	}
   277  	// Wait if necessary
   278  	delay := r.DelayFrom(t)
   279  	if delay == 0 {
   280  		return nil
   281  	}
   282  	ch, stop, advance := newTimer(delay)
   283  	defer stop()
   284  	advance() // only has an effect when testing
   285  	select {
   286  	case <-ch:
   287  		// We can proceed.
   288  		return nil
   289  	case <-ctx.Done():
   290  		// Context was canceled before we could proceed.  Cancel the
   291  		// reservation, which may permit other events to proceed sooner.
   292  		r.Cancel()
   293  		return ctx.Err()
   294  	}
   295  }
   296  
   297  // SetLimit is shorthand for SetLimitAt(time.Now(), newLimit).
   298  func (lim *Limiter) SetLimit(newLimit Limit) {
   299  	lim.SetLimitAt(time.Now(), newLimit)
   300  }
   301  
   302  // SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
   303  // or underutilized by those which reserved (using Reserve or Wait) but did not yet act
   304  // before SetLimitAt was called.
   305  func (lim *Limiter) SetLimitAt(t time.Time, newLimit Limit) {
   306  	lim.mu.Lock()
   307  	defer lim.mu.Unlock()
   308  
   309  	t, tokens := lim.advance(t)
   310  
   311  	lim.last = t
   312  	lim.tokens = tokens
   313  	lim.limit = newLimit
   314  }
   315  
   316  // SetBurst is shorthand for SetBurstAt(time.Now(), newBurst).
   317  func (lim *Limiter) SetBurst(newBurst int) {
   318  	lim.SetBurstAt(time.Now(), newBurst)
   319  }
   320  
   321  // SetBurstAt sets a new burst size for the limiter.
   322  func (lim *Limiter) SetBurstAt(t time.Time, newBurst int) {
   323  	lim.mu.Lock()
   324  	defer lim.mu.Unlock()
   325  
   326  	t, tokens := lim.advance(t)
   327  
   328  	lim.last = t
   329  	lim.tokens = tokens
   330  	lim.burst = newBurst
   331  }
   332  
   333  // reserveN is a helper method for AllowN, ReserveN, and WaitN.
   334  // maxFutureReserve specifies the maximum reservation wait duration allowed.
   335  // reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
   336  func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
   337  	lim.mu.Lock()
   338  	defer lim.mu.Unlock()
   339  
   340  	if lim.limit == Inf {
   341  		return Reservation{
   342  			ok:        true,
   343  			lim:       lim,
   344  			tokens:    n,
   345  			timeToAct: t,
   346  		}
   347  	} else if lim.limit == 0 {
   348  		var ok bool
   349  		if lim.burst >= n {
   350  			ok = true
   351  			lim.burst -= n
   352  		}
   353  		return Reservation{
   354  			ok:        ok,
   355  			lim:       lim,
   356  			tokens:    lim.burst,
   357  			timeToAct: t,
   358  		}
   359  	}
   360  
   361  	t, tokens := lim.advance(t)
   362  
   363  	// Calculate the remaining number of tokens resulting from the request.
   364  	tokens -= float64(n)
   365  
   366  	// Calculate the wait duration
   367  	var waitDuration time.Duration
   368  	if tokens < 0 {
   369  		waitDuration = lim.limit.durationFromTokens(-tokens)
   370  	}
   371  
   372  	// Decide result
   373  	ok := n <= lim.burst && waitDuration <= maxFutureReserve
   374  
   375  	// Prepare reservation
   376  	r := Reservation{
   377  		ok:    ok,
   378  		lim:   lim,
   379  		limit: lim.limit,
   380  	}
   381  	if ok {
   382  		r.tokens = n
   383  		r.timeToAct = t.Add(waitDuration)
   384  
   385  		// Update state
   386  		lim.last = t
   387  		lim.tokens = tokens
   388  		lim.lastEvent = r.timeToAct
   389  	}
   390  
   391  	return r
   392  }
   393  
   394  // advance calculates and returns an updated state for lim resulting from the passage of time.
   395  // lim is not changed.
   396  // advance requires that lim.mu is held.
   397  func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
   398  	last := lim.last
   399  	if t.Before(last) {
   400  		last = t
   401  	}
   402  
   403  	// Calculate the new number of tokens, due to time that passed.
   404  	elapsed := t.Sub(last)
   405  	delta := lim.limit.tokensFromDuration(elapsed)
   406  	tokens := lim.tokens + delta
   407  	if burst := float64(lim.burst); tokens > burst {
   408  		tokens = burst
   409  	}
   410  	return t, tokens
   411  }
   412  
   413  // durationFromTokens is a unit conversion function from the number of tokens to the duration
   414  // of time it takes to accumulate them at a rate of limit tokens per second.
   415  func (limit Limit) durationFromTokens(tokens float64) time.Duration {
   416  	if limit <= 0 {
   417  		return InfDuration
   418  	}
   419  	seconds := tokens / float64(limit)
   420  	return time.Duration(float64(time.Second) * seconds)
   421  }
   422  
   423  // tokensFromDuration is a unit conversion function from a time duration to the number of tokens
   424  // which could be accumulated during that duration at a rate of limit tokens per second.
   425  func (limit Limit) tokensFromDuration(d time.Duration) float64 {
   426  	if limit <= 0 {
   427  		return 0
   428  	}
   429  	return d.Seconds() * float64(limit)
   430  }
   431  

View as plain text