...

Source file src/github.com/letsencrypt/boulder/ratelimits/limiter.go

Documentation: github.com/letsencrypt/boulder/ratelimits

     1  package ratelimits
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"time"
     8  
     9  	"github.com/jmhodges/clock"
    10  	"github.com/prometheus/client_golang/prometheus"
    11  )
    12  
    13  const (
    14  	// Allowed is used for rate limit metrics, it's the value of the 'decision'
    15  	// label when a request was allowed.
    16  	Allowed = "allowed"
    17  
    18  	// Denied is used for rate limit metrics, it's the value of the 'decision'
    19  	// label when a request was denied.
    20  	Denied = "denied"
    21  )
    22  
    23  // ErrInvalidCost indicates that the cost specified was <= 0.
    24  var ErrInvalidCost = fmt.Errorf("invalid cost, must be > 0")
    25  
    26  // ErrInvalidCostForCheck indicates that the check cost specified was < 0.
    27  var ErrInvalidCostForCheck = fmt.Errorf("invalid check cost, must be >= 0")
    28  
    29  // ErrInvalidCostOverLimit indicates that the cost specified was > limit.Burst.
    30  var ErrInvalidCostOverLimit = fmt.Errorf("invalid cost, must be <= limit.Burst")
    31  
    32  // errLimitDisabled indicates that the limit name specified is valid but is not
    33  // currently configured.
    34  var errLimitDisabled = errors.New("limit disabled")
    35  
    36  // disabledLimitDecision is an "allowed" *Decision that should be returned when
    37  // a checked limit is found to be disabled.
    38  var disabledLimitDecision = &Decision{true, 0, 0, 0, time.Time{}}
    39  
    40  // Limiter provides a high-level interface for rate limiting requests by
    41  // utilizing a leaky bucket-style approach.
    42  type Limiter struct {
    43  	// defaults stores default limits by 'name'.
    44  	defaults limits
    45  
    46  	// overrides stores override limits by 'name:id'.
    47  	overrides limits
    48  
    49  	// source is used to store buckets. It must be safe for concurrent use.
    50  	source source
    51  	clk    clock.Clock
    52  
    53  	spendLatency       *prometheus.HistogramVec
    54  	overrideUsageGauge *prometheus.GaugeVec
    55  }
    56  
    57  // NewLimiter returns a new *Limiter. The provided source must be safe for
    58  // concurrent use. The defaults and overrides paths are expected to be paths to
    59  // YAML files that contain the default and override limits, respectively. The
    60  // overrides file is optional, all other arguments are required.
    61  func NewLimiter(clk clock.Clock, source source, defaults, overrides string, stats prometheus.Registerer) (*Limiter, error) {
    62  	limiter := &Limiter{source: source, clk: clk}
    63  
    64  	var err error
    65  	limiter.defaults, err = loadAndParseDefaultLimits(defaults)
    66  	if err != nil {
    67  		return nil, err
    68  	}
    69  
    70  	limiter.spendLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
    71  		Name: "ratelimits_spend_latency",
    72  		Help: fmt.Sprintf("Latency of ratelimit checks labeled by limit=[name] and decision=[%s|%s], in seconds", Allowed, Denied),
    73  		// Exponential buckets ranging from 0.0005s to 3s.
    74  		Buckets: prometheus.ExponentialBuckets(0.0005, 3, 8),
    75  	}, []string{"limit", "decision"})
    76  	stats.MustRegister(limiter.spendLatency)
    77  
    78  	if overrides == "" {
    79  		// No overrides specified, initialize an empty map.
    80  		limiter.overrides = make(limits)
    81  		return limiter, nil
    82  	}
    83  
    84  	limiter.overrides, err = loadAndParseOverrideLimits(overrides)
    85  	if err != nil {
    86  		return nil, err
    87  	}
    88  
    89  	limiter.overrideUsageGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
    90  		Name: "ratelimits_override_usage",
    91  		Help: "Proportion of override limit used, by limit name and client id.",
    92  	}, []string{"limit", "client_id"})
    93  	stats.MustRegister(limiter.overrideUsageGauge)
    94  
    95  	return limiter, nil
    96  }
    97  
    98  type Decision struct {
    99  	// Allowed is true if the bucket possessed enough capacity to allow the
   100  	// request given the cost.
   101  	Allowed bool
   102  
   103  	// Remaining is the number of requests the client is allowed to make before
   104  	// they're rate limited.
   105  	Remaining int64
   106  
   107  	// RetryIn is the duration the client MUST wait before they're allowed to
   108  	// make a request.
   109  	RetryIn time.Duration
   110  
   111  	// ResetIn is the duration the bucket will take to refill to its maximum
   112  	// capacity, assuming no further requests are made.
   113  	ResetIn time.Duration
   114  
   115  	// newTAT indicates the time at which the bucket will be full. It is the
   116  	// theoretical arrival time (TAT) of next request. It must be no more than
   117  	// (burst * (period / count)) in the future at any single point in time.
   118  	newTAT time.Time
   119  }
   120  
   121  // Check returns a *Decision that indicates whether there's enough capacity to
   122  // allow the request, given the cost, for the specified limit Name and client
   123  // id. However, it DOES NOT deduct the cost of the request from the bucket's
   124  // capacity. Hence, the returned *Decision represents the hypothetical state of
   125  // the bucket if the cost WERE to be deducted. The returned *Decision will
   126  // always include the number of remaining requests in the bucket, the required
   127  // wait time before the client can make another request, and the time until the
   128  // bucket refills to its maximum capacity (resets). If no bucket exists for the
   129  // given limit Name and client id, a new one will be created WITHOUT the
   130  // request's cost deducted from its initial capacity. If the specified limit is
   131  // disabled, ErrLimitDisabled is returned.
   132  func (l *Limiter) Check(ctx context.Context, name Name, id string, cost int64) (*Decision, error) {
   133  	if cost < 0 {
   134  		return nil, ErrInvalidCostForCheck
   135  	}
   136  
   137  	limit, err := l.getLimit(name, id)
   138  	if err != nil {
   139  		if errors.Is(err, errLimitDisabled) {
   140  			return disabledLimitDecision, nil
   141  		}
   142  		return nil, err
   143  	}
   144  
   145  	if cost > limit.Burst {
   146  		return nil, ErrInvalidCostOverLimit
   147  	}
   148  
   149  	// Remove cancellation from the request context so that transactions are not
   150  	// interrupted by a client disconnect.
   151  	ctx = context.WithoutCancel(ctx)
   152  	tat, err := l.source.Get(ctx, bucketKey(name, id))
   153  	if err != nil {
   154  		if !errors.Is(err, ErrBucketNotFound) {
   155  			return nil, err
   156  		}
   157  		// First request from this client. The cost is not deducted from the
   158  		// initial capacity because this is only a check.
   159  		d, err := l.initialize(ctx, limit, name, id, 0)
   160  		if err != nil {
   161  			return nil, err
   162  		}
   163  		return maybeSpend(l.clk, limit, d.newTAT, cost), nil
   164  	}
   165  	return maybeSpend(l.clk, limit, tat, cost), nil
   166  }
   167  
   168  // Spend returns a *Decision that indicates if enough capacity was available to
   169  // process the request, given the cost, for the specified limit Name and client
   170  // id. If capacity existed, the cost of the request HAS been deducted from the
   171  // bucket's capacity, otherwise no cost was deducted. The returned *Decision
   172  // will always include the number of remaining requests in the bucket, the
   173  // required wait time before the client can make another request, and the time
   174  // until the bucket refills to its maximum capacity (resets). If no bucket
   175  // exists for the given limit Name and client id, a new one will be created WITH
   176  // the request's cost deducted from its initial capacity. If the specified limit
   177  // is disabled, ErrLimitDisabled is returned.
   178  func (l *Limiter) Spend(ctx context.Context, name Name, id string, cost int64) (*Decision, error) {
   179  	if cost <= 0 {
   180  		return nil, ErrInvalidCost
   181  	}
   182  
   183  	limit, err := l.getLimit(name, id)
   184  	if err != nil {
   185  		if errors.Is(err, errLimitDisabled) {
   186  			return disabledLimitDecision, nil
   187  		}
   188  		return nil, err
   189  	}
   190  
   191  	if cost > limit.Burst {
   192  		return nil, ErrInvalidCostOverLimit
   193  	}
   194  
   195  	start := l.clk.Now()
   196  	status := Denied
   197  	defer func() {
   198  		l.spendLatency.WithLabelValues(name.String(), status).Observe(l.clk.Since(start).Seconds())
   199  	}()
   200  
   201  	// Remove cancellation from the request context so that transactions are not
   202  	// interrupted by a client disconnect.
   203  	ctx = context.WithoutCancel(ctx)
   204  	tat, err := l.source.Get(ctx, bucketKey(name, id))
   205  	if err != nil {
   206  		if errors.Is(err, ErrBucketNotFound) {
   207  			// First request from this client.
   208  			d, err := l.initialize(ctx, limit, name, id, cost)
   209  			if err != nil {
   210  				return nil, err
   211  			}
   212  			if d.Allowed {
   213  				status = Allowed
   214  			}
   215  			return d, nil
   216  		}
   217  		return nil, err
   218  	}
   219  
   220  	d := maybeSpend(l.clk, limit, tat, cost)
   221  
   222  	if limit.isOverride {
   223  		// Calculate the current utilization of the override limit for the
   224  		// specified client id.
   225  		utilization := float64(limit.Burst-d.Remaining) / float64(limit.Burst)
   226  		l.overrideUsageGauge.WithLabelValues(name.String(), id).Set(utilization)
   227  	}
   228  
   229  	if !d.Allowed {
   230  		return d, nil
   231  	}
   232  
   233  	err = l.source.Set(ctx, bucketKey(name, id), d.newTAT)
   234  	if err != nil {
   235  		return nil, err
   236  	}
   237  	status = Allowed
   238  	return d, nil
   239  }
   240  
   241  // Refund attempts to refund the cost to the bucket identified by limit name and
   242  // client id. The returned *Decision indicates whether the refund was successful
   243  // or not. If the refund was successful, the cost of the request was added back
   244  // to the bucket's capacity. If the refund is not possible (i.e., the bucket is
   245  // already full or the refund amount is invalid), no cost is refunded.
   246  //
   247  // Note: The amount refunded cannot cause the bucket to exceed its maximum
   248  // capacity. However, partial refunds are allowed and are considered successful.
   249  // For instance, if a bucket has a maximum capacity of 10 and currently has 5
   250  // requests remaining, a refund request of 7 will result in the bucket reaching
   251  // its maximum capacity of 10, not 12.
   252  func (l *Limiter) Refund(ctx context.Context, name Name, id string, cost int64) (*Decision, error) {
   253  	if cost <= 0 {
   254  		return nil, ErrInvalidCost
   255  	}
   256  
   257  	limit, err := l.getLimit(name, id)
   258  	if err != nil {
   259  		if errors.Is(err, errLimitDisabled) {
   260  			return disabledLimitDecision, nil
   261  		}
   262  		return nil, err
   263  	}
   264  
   265  	// Remove cancellation from the request context so that transactions are not
   266  	// interrupted by a client disconnect.
   267  	ctx = context.WithoutCancel(ctx)
   268  	tat, err := l.source.Get(ctx, bucketKey(name, id))
   269  	if err != nil {
   270  		return nil, err
   271  	}
   272  	d := maybeRefund(l.clk, limit, tat, cost)
   273  	if !d.Allowed {
   274  		// The bucket is already at maximum capacity.
   275  		return d, nil
   276  	}
   277  	return d, l.source.Set(ctx, bucketKey(name, id), d.newTAT)
   278  
   279  }
   280  
   281  // Reset resets the specified bucket.
   282  func (l *Limiter) Reset(ctx context.Context, name Name, id string) error {
   283  	// Remove cancellation from the request context so that transactions are not
   284  	// interrupted by a client disconnect.
   285  	ctx = context.WithoutCancel(ctx)
   286  	return l.source.Delete(ctx, bucketKey(name, id))
   287  }
   288  
   289  // initialize creates a new bucket, specified by limit name and id, with the
   290  // cost of the request factored into the initial state.
   291  func (l *Limiter) initialize(ctx context.Context, rl limit, name Name, id string, cost int64) (*Decision, error) {
   292  	d := maybeSpend(l.clk, rl, l.clk.Now(), cost)
   293  
   294  	// Remove cancellation from the request context so that transactions are not
   295  	// interrupted by a client disconnect.
   296  	ctx = context.WithoutCancel(ctx)
   297  	err := l.source.Set(ctx, bucketKey(name, id), d.newTAT)
   298  	if err != nil {
   299  		return nil, err
   300  	}
   301  	return d, nil
   302  
   303  }
   304  
   305  // GetLimit returns the limit for the specified by name and id, name is
   306  // required, id is optional. If id is left unspecified, the default limit for
   307  // the limit specified by name is returned. If no default limit exists for the
   308  // specified name, ErrLimitDisabled is returned.
   309  func (l *Limiter) getLimit(name Name, id string) (limit, error) {
   310  	if !name.isValid() {
   311  		// This should never happen. Callers should only be specifying the limit
   312  		// Name enums defined in this package.
   313  		return limit{}, fmt.Errorf("specified name enum %q, is invalid", name)
   314  	}
   315  	if id != "" {
   316  		// Check for override.
   317  		ol, ok := l.overrides[bucketKey(name, id)]
   318  		if ok {
   319  			return ol, nil
   320  		}
   321  	}
   322  	dl, ok := l.defaults[nameToEnumString(name)]
   323  	if ok {
   324  		return dl, nil
   325  	}
   326  	return limit{}, errLimitDisabled
   327  }
   328  

View as plain text