...

Source file src/github.com/letsencrypt/boulder/ratelimits/source_redis.go

Documentation: github.com/letsencrypt/boulder/ratelimits

     1  package ratelimits
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"net"
     7  	"time"
     8  
     9  	"github.com/jmhodges/clock"
    10  	"github.com/prometheus/client_golang/prometheus"
    11  	"github.com/redis/go-redis/v9"
    12  )
    13  
    14  // Compile-time check that RedisSource implements the source interface.
    15  var _ source = (*RedisSource)(nil)
    16  
    17  // RedisSource is a ratelimits source backed by sharded Redis.
    18  type RedisSource struct {
    19  	client  *redis.Ring
    20  	clk     clock.Clock
    21  	latency *prometheus.HistogramVec
    22  }
    23  
    24  // NewRedisSource returns a new Redis backed source using the provided
    25  // *redis.Ring client.
    26  func NewRedisSource(client *redis.Ring, clk clock.Clock, stats prometheus.Registerer) *RedisSource {
    27  	latency := prometheus.NewHistogramVec(
    28  		prometheus.HistogramOpts{
    29  			Name: "ratelimits_latency",
    30  			Help: "Histogram of Redis call latencies labeled by call=[set|get|delete|ping] and result=[success|error]",
    31  			// Exponential buckets ranging from 0.0005s to 3s.
    32  			Buckets: prometheus.ExponentialBucketsRange(0.0005, 3, 8),
    33  		},
    34  		[]string{"call", "result"},
    35  	)
    36  	stats.MustRegister(latency)
    37  
    38  	return &RedisSource{
    39  		client:  client,
    40  		clk:     clk,
    41  		latency: latency,
    42  	}
    43  }
    44  
    45  // resultForError returns a string representing the result of the operation
    46  // based on the provided error.
    47  func resultForError(err error) string {
    48  	if errors.Is(redis.Nil, err) {
    49  		// Bucket key does not exist.
    50  		return "notFound"
    51  	} else if errors.Is(err, context.DeadlineExceeded) {
    52  		// Client read or write deadline exceeded.
    53  		return "deadlineExceeded"
    54  	} else if errors.Is(err, context.Canceled) {
    55  		// Caller canceled the operation.
    56  		return "canceled"
    57  	}
    58  	var netErr net.Error
    59  	if errors.As(err, &netErr) && netErr.Timeout() {
    60  		// Dialer timed out connecting to Redis.
    61  		return "timeout"
    62  	}
    63  	var redisErr redis.Error
    64  	if errors.Is(err, redisErr) {
    65  		// An internal error was returned by the Redis server.
    66  		return "redisError"
    67  	}
    68  	return "failed"
    69  }
    70  
    71  // Set stores the TAT at the specified bucketKey ('name:id'). It returns an
    72  // error if the operation failed and nil otherwise. If the bucketKey does not
    73  // exist, it will be created.
    74  func (r *RedisSource) Set(ctx context.Context, bucketKey string, tat time.Time) error {
    75  	start := r.clk.Now()
    76  
    77  	err := r.client.Set(ctx, bucketKey, tat.UnixNano(), 0).Err()
    78  	if err != nil {
    79  		r.latency.With(prometheus.Labels{"call": "set", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
    80  		return err
    81  	}
    82  
    83  	r.latency.With(prometheus.Labels{"call": "set", "result": "success"}).Observe(time.Since(start).Seconds())
    84  	return nil
    85  }
    86  
    87  // Get retrieves the TAT at the specified bucketKey ('name:id'). It returns the
    88  // TAT and nil if the operation succeeded, or an error if the operation failed.
    89  // If the bucketKey does not exist, it returns ErrBucketNotFound.
    90  func (r *RedisSource) Get(ctx context.Context, bucketKey string) (time.Time, error) {
    91  	start := r.clk.Now()
    92  
    93  	tatNano, err := r.client.Get(ctx, bucketKey).Int64()
    94  	if err != nil {
    95  		if errors.Is(err, redis.Nil) {
    96  			// Bucket key does not exist.
    97  			r.latency.With(prometheus.Labels{"call": "get", "result": "notFound"}).Observe(time.Since(start).Seconds())
    98  			return time.Time{}, ErrBucketNotFound
    99  		}
   100  		r.latency.With(prometheus.Labels{"call": "get", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
   101  		return time.Time{}, err
   102  	}
   103  
   104  	r.latency.With(prometheus.Labels{"call": "get", "result": "success"}).Observe(time.Since(start).Seconds())
   105  	return time.Unix(0, tatNano).UTC(), nil
   106  }
   107  
   108  // Delete deletes the TAT at the specified bucketKey ('name:id'). It returns an
   109  // error if the operation failed and nil otherwise. A nil return value does not
   110  // indicate that the bucketKey existed.
   111  func (r *RedisSource) Delete(ctx context.Context, bucketKey string) error {
   112  	start := r.clk.Now()
   113  
   114  	err := r.client.Del(ctx, bucketKey).Err()
   115  	if err != nil {
   116  		r.latency.With(prometheus.Labels{"call": "delete", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
   117  		return err
   118  	}
   119  
   120  	r.latency.With(prometheus.Labels{"call": "delete", "result": "success"}).Observe(time.Since(start).Seconds())
   121  	return nil
   122  }
   123  
   124  // Ping checks that each shard of the *redis.Ring is reachable using the PING
   125  // command. It returns an error if any shard is unreachable and nil otherwise.
   126  func (r *RedisSource) Ping(ctx context.Context) error {
   127  	start := r.clk.Now()
   128  
   129  	err := r.client.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
   130  		return shard.Ping(ctx).Err()
   131  	})
   132  	if err != nil {
   133  		r.latency.With(prometheus.Labels{"call": "ping", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
   134  		return err
   135  	}
   136  	r.latency.With(prometheus.Labels{"call": "ping", "result": "success"}).Observe(time.Since(start).Seconds())
   137  	return nil
   138  }
   139  

View as plain text