...

Source file src/github.com/letsencrypt/boulder/cmd/akamai-purger/main.go

Documentation: github.com/letsencrypt/boulder/cmd/akamai-purger

     1  package notmain
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"flag"
     7  	"fmt"
     8  	"math"
     9  	"os"
    10  	"strings"
    11  	"sync"
    12  	"time"
    13  
    14  	"github.com/prometheus/client_golang/prometheus"
    15  	"google.golang.org/protobuf/types/known/emptypb"
    16  
    17  	"github.com/letsencrypt/boulder/akamai"
    18  	akamaipb "github.com/letsencrypt/boulder/akamai/proto"
    19  	"github.com/letsencrypt/boulder/cmd"
    20  	"github.com/letsencrypt/boulder/config"
    21  	bgrpc "github.com/letsencrypt/boulder/grpc"
    22  	blog "github.com/letsencrypt/boulder/log"
    23  )
    24  
    25  const (
    26  	// akamaiBytesPerResponse is the total bytes of all 3 URLs associated with a
    27  	// single OCSP response cached by Akamai. Each response is composed of 3
    28  	// URLs; the POST Cache Key URL is 61 bytes and the encoded and unencoded
    29  	// GET URLs are 163 bytes and 151 bytes respectively. This totals 375 bytes,
    30  	// which we round up to 400.
    31  	akamaiBytesPerResponse = 400
    32  
    33  	// urlsPerQueueEntry is the number of URLs associated with a single cached
    34  	// OCSP response.
    35  	urlsPerQueueEntry = 3
    36  
    37  	// defaultEntriesPerBatch is the default value for 'queueEntriesPerBatch'.
    38  	defaultEntriesPerBatch = 2
    39  
    40  	// defaultPurgeBatchInterval is the default value for 'purgeBatchInterval'.
    41  	defaultPurgeBatchInterval = time.Millisecond * 32
    42  
    43  	// defaultQueueSize is the default value for 'maxQueueSize'. A queue size of
    44  	// 1.25M cached OCSP responses, assuming 3 URLs per request, is about 6
    45  	// hours of work using the default settings detailed above.
    46  	defaultQueueSize = 1250000
    47  
    48  	// akamaiBytesPerReqLimit is the limit of bytes allowed in a single request
    49  	// to the Fast-Purge API. With a limit of no more than 50,000 bytes, we
    50  	// subtract 1 byte to get the limit, and subtract an additional 19 bytes for
    51  	// overhead of the 'objects' key and array.
    52  	akamaiBytesPerReqLimit = 50000 - 1 - 19
    53  
    54  	// akamaiAPIReqPerSecondLimit is the limit of requests, per second, that
    55  	// we're allowed to make to the Fast-Purge API.
    56  	akamaiAPIReqPerSecondLimit = 50
    57  
    58  	// akamaiURLsPerSecondLimit is the limit of URLs, sent per second, that
    59  	// we're allowed to make to the Fast-Purge API.
    60  	akamaiURLsPerSecondLimit = 200
    61  )
    62  
    63  // Throughput is a container for all throuput related akamai-purger
    64  // configuration settings.
    65  type Throughput struct {
    66  	// QueueEntriesPerBatch the number of cached OCSP responses to included in each
    67  	// purge request. One cached OCSP response is composed of 3 URLs totaling <
    68  	// 400 bytes. If this value isn't provided it will default to
    69  	// 'defaultQueueEntriesPerBatch'.
    70  	QueueEntriesPerBatch int
    71  
    72  	// PurgeBatchInterval is the duration waited between dispatching an Akamai
    73  	// purge request containing 'QueueEntriesPerBatch' * 3 URLs. If this value
    74  	// isn't provided it will default to 'defaultPurgeBatchInterval'.
    75  	PurgeBatchInterval config.Duration `validate:"-"`
    76  }
    77  
    78  func (t *Throughput) useOptimizedDefaults() {
    79  	if t.QueueEntriesPerBatch == 0 {
    80  		t.QueueEntriesPerBatch = defaultEntriesPerBatch
    81  	}
    82  	if t.PurgeBatchInterval.Duration == 0 {
    83  		t.PurgeBatchInterval.Duration = defaultPurgeBatchInterval
    84  	}
    85  }
    86  
    87  // validate ensures that the provided throughput configuration will not violate
    88  // the Akamai Fast-Purge API limits. For more information see the official
    89  // documentation:
    90  // https://techdocs.akamai.com/purge-cache/reference/rate-limiting
    91  func (t *Throughput) validate() error {
    92  	if t.PurgeBatchInterval.Duration == 0 {
    93  		return errors.New("'purgeBatchInterval' must be > 0")
    94  	}
    95  	if t.QueueEntriesPerBatch <= 0 {
    96  		return errors.New("'queueEntriesPerBatch' must be > 0")
    97  	}
    98  
    99  	// Send no more than the 50,000 bytes of objects we’re allotted per request.
   100  	bytesPerRequest := (t.QueueEntriesPerBatch * akamaiBytesPerResponse)
   101  	if bytesPerRequest > akamaiBytesPerReqLimit {
   102  		return fmt.Errorf("config exceeds Akamai's bytes per request limit (%d bytes) by %d",
   103  			akamaiBytesPerReqLimit, bytesPerRequest-akamaiBytesPerReqLimit)
   104  	}
   105  
   106  	// Send no more than the 50 API requests we’re allotted each second.
   107  	requestsPerSecond := int(math.Ceil(float64(time.Second) / float64(t.PurgeBatchInterval.Duration)))
   108  	if requestsPerSecond > akamaiAPIReqPerSecondLimit {
   109  		return fmt.Errorf("config exceeds Akamai's requests per second limit (%d requests) by %d",
   110  			akamaiAPIReqPerSecondLimit, requestsPerSecond-akamaiAPIReqPerSecondLimit)
   111  	}
   112  
   113  	// Purge no more than the 200 URLs we’re allotted each second.
   114  	urlsPurgedPerSecond := requestsPerSecond * (t.QueueEntriesPerBatch * urlsPerQueueEntry)
   115  	if urlsPurgedPerSecond > akamaiURLsPerSecondLimit {
   116  		return fmt.Errorf("config exceeds Akamai's URLs per second limit (%d URLs) by %d",
   117  			akamaiURLsPerSecondLimit, urlsPurgedPerSecond-akamaiURLsPerSecondLimit)
   118  	}
   119  	return nil
   120  }
   121  
   122  type Config struct {
   123  	AkamaiPurger struct {
   124  		cmd.ServiceConfig
   125  
   126  		// MaxQueueSize is the maximum size of the purger stack. If this value
   127  		// isn't provided it will default to `defaultQueueSize`.
   128  		MaxQueueSize int
   129  
   130  		BaseURL      string `validate:"required,url"`
   131  		ClientToken  string `validate:"required"`
   132  		ClientSecret string `validate:"required"`
   133  		AccessToken  string `validate:"required"`
   134  		V3Network    string `validate:"required,oneof=staging production"`
   135  
   136  		// Throughput is a container for all throughput related akamai-purger
   137  		// settings.
   138  		Throughput Throughput
   139  
   140  		// PurgeRetries is the maximum number of attempts that will be made to purge a
   141  		// batch of URLs before the batch is added back to the stack.
   142  		PurgeRetries int
   143  
   144  		// PurgeRetryBackoff is the base duration that will be waited before
   145  		// attempting to purge a batch of URLs which previously failed to be
   146  		// purged.
   147  		PurgeRetryBackoff config.Duration `validate:"-"`
   148  	}
   149  	Syslog        cmd.SyslogConfig
   150  	OpenTelemetry cmd.OpenTelemetryConfig
   151  }
   152  
   153  // cachePurgeClient is testing interface.
   154  type cachePurgeClient interface {
   155  	Purge(urls []string) error
   156  }
   157  
   158  // akamaiPurger is a mutex protected container for a gRPC server which receives
   159  // requests containing a slice of URLs associated with an OCSP response cached
   160  // by Akamai. This slice of URLs is stored on a stack, and dispatched in batches
   161  // to Akamai's Fast Purge API at regular intervals.
   162  type akamaiPurger struct {
   163  	sync.Mutex
   164  	akamaipb.UnimplementedAkamaiPurgerServer
   165  
   166  	// toPurge functions as a stack where each entry contains the three OCSP
   167  	// response URLs associated with a given certificate.
   168  	toPurge         [][]string
   169  	maxStackSize    int
   170  	entriesPerBatch int
   171  	client          cachePurgeClient
   172  	log             blog.Logger
   173  }
   174  
   175  func (ap *akamaiPurger) len() int {
   176  	ap.Lock()
   177  	defer ap.Unlock()
   178  	return len(ap.toPurge)
   179  }
   180  
   181  func (ap *akamaiPurger) purgeBatch(batch [][]string) error {
   182  	// Flatten the batch of stack entries into a single slice of URLs.
   183  	var urls []string
   184  	for _, url := range batch {
   185  		urls = append(urls, url...)
   186  	}
   187  
   188  	err := ap.client.Purge(urls)
   189  	if err != nil {
   190  		ap.log.Errf("Failed to purge %d OCSP responses (%s): %s", len(batch), strings.Join(urls, ","), err)
   191  		return err
   192  	}
   193  	return nil
   194  }
   195  
   196  func (ap *akamaiPurger) takeBatch() [][]string {
   197  	ap.Lock()
   198  	defer ap.Unlock()
   199  	stackSize := len(ap.toPurge)
   200  
   201  	// If the stack is empty, return immediately.
   202  	if stackSize <= 0 {
   203  		return nil
   204  	}
   205  
   206  	// If the stack contains less than a full batch, set the batch size to the
   207  	// current stack size.
   208  	batchSize := ap.entriesPerBatch
   209  	if stackSize < batchSize {
   210  		batchSize = stackSize
   211  	}
   212  
   213  	batchBegin := stackSize - batchSize
   214  	batch := ap.toPurge[batchBegin:]
   215  	ap.toPurge = ap.toPurge[:batchBegin]
   216  	return batch
   217  }
   218  
   219  // Purge is an exported gRPC method which receives purge requests containing
   220  // URLs and prepends them to the purger stack.
   221  func (ap *akamaiPurger) Purge(ctx context.Context, req *akamaipb.PurgeRequest) (*emptypb.Empty, error) {
   222  	ap.Lock()
   223  	defer ap.Unlock()
   224  	stackSize := len(ap.toPurge)
   225  	if stackSize >= ap.maxStackSize {
   226  		// Drop the oldest entry from the bottom of the stack to make room.
   227  		ap.toPurge = ap.toPurge[1:]
   228  	}
   229  	// Add the entry from the new request to the top of the stack.
   230  	ap.toPurge = append(ap.toPurge, req.Urls)
   231  	return &emptypb.Empty{}, nil
   232  }
   233  
   234  func main() {
   235  	daemonFlags := flag.NewFlagSet("daemon", flag.ContinueOnError)
   236  	grpcAddr := daemonFlags.String("addr", "", "gRPC listen address override")
   237  	debugAddr := daemonFlags.String("debug-addr", "", "Debug server address override")
   238  	configFile := daemonFlags.String("config", "", "File path to the configuration file for this service")
   239  
   240  	manualFlags := flag.NewFlagSet("manual", flag.ExitOnError)
   241  	manualConfigFile := manualFlags.String("config", "", "File path to the configuration file for this service")
   242  	tag := manualFlags.String("tag", "", "Single cache tag to purge")
   243  	tagFile := manualFlags.String("tag-file", "", "File containing cache tags to purge, one per line")
   244  
   245  	if len(os.Args) < 2 {
   246  		fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0])
   247  		daemonFlags.PrintDefaults()
   248  		fmt.Fprintln(os.Stderr, "OR:")
   249  		fmt.Fprintf(os.Stderr, "%s manual <flags>\n", os.Args[0])
   250  		manualFlags.PrintDefaults()
   251  		os.Exit(1)
   252  	}
   253  
   254  	// Check if the purger is being started in daemon (URL purging gRPC service)
   255  	// or manual (ad-hoc tag purging) mode.
   256  	var manualMode bool
   257  	if os.Args[1] == "manual" {
   258  		manualMode = true
   259  		_ = manualFlags.Parse(os.Args[2:])
   260  		if *configFile == "" {
   261  			manualFlags.Usage()
   262  			os.Exit(1)
   263  		}
   264  		if *tag == "" && *tagFile == "" {
   265  			cmd.Fail("Must specify one of --tag or --tag-file for manual purge")
   266  		} else if *tag != "" && *tagFile != "" {
   267  			cmd.Fail("Cannot specify both of --tag and --tag-file for manual purge")
   268  		}
   269  		configFile = manualConfigFile
   270  	} else {
   271  		err := daemonFlags.Parse(os.Args[1:])
   272  		if err != nil {
   273  			fmt.Fprintf(os.Stderr, "OR:\n%s manual -config conf.json [-tag Foo] [-tag-file]\n", os.Args[0])
   274  			os.Exit(1)
   275  		}
   276  		if *configFile == "" {
   277  			daemonFlags.Usage()
   278  			os.Exit(1)
   279  		}
   280  	}
   281  
   282  	var c Config
   283  	err := cmd.ReadConfigFile(*configFile, &c)
   284  	cmd.FailOnError(err, "Reading JSON config file into config structure")
   285  
   286  	// Make references to the service config cleaner.
   287  	apc := &c.AkamaiPurger
   288  
   289  	if *grpcAddr != "" {
   290  		apc.GRPC.Address = *grpcAddr
   291  	}
   292  	if *debugAddr != "" {
   293  		apc.DebugAddr = *debugAddr
   294  	}
   295  
   296  	scope, logger, oTelShutdown := cmd.StatsAndLogging(c.Syslog, c.OpenTelemetry, apc.DebugAddr)
   297  	defer oTelShutdown(context.Background())
   298  	logger.Info(cmd.VersionString())
   299  
   300  	// Unless otherwise specified, use optimized throughput settings.
   301  	if (apc.Throughput == Throughput{}) {
   302  		apc.Throughput.useOptimizedDefaults()
   303  	}
   304  	cmd.FailOnError(apc.Throughput.validate(), "")
   305  
   306  	if apc.MaxQueueSize == 0 {
   307  		apc.MaxQueueSize = defaultQueueSize
   308  	}
   309  
   310  	ccu, err := akamai.NewCachePurgeClient(
   311  		apc.BaseURL,
   312  		apc.ClientToken,
   313  		apc.ClientSecret,
   314  		apc.AccessToken,
   315  		apc.V3Network,
   316  		apc.PurgeRetries,
   317  		apc.PurgeRetryBackoff.Duration,
   318  		logger,
   319  		scope,
   320  	)
   321  	cmd.FailOnError(err, "Failed to setup Akamai CCU client")
   322  
   323  	ap := &akamaiPurger{
   324  		maxStackSize:    apc.MaxQueueSize,
   325  		entriesPerBatch: apc.Throughput.QueueEntriesPerBatch,
   326  		client:          ccu,
   327  		log:             logger,
   328  	}
   329  
   330  	var gaugePurgeQueueLength = prometheus.NewGaugeFunc(
   331  		prometheus.GaugeOpts{
   332  			Name: "ccu_purge_queue_length",
   333  			Help: "The length of the akamai-purger queue. Captured on each prometheus scrape.",
   334  		},
   335  		func() float64 { return float64(ap.len()) },
   336  	)
   337  	scope.MustRegister(gaugePurgeQueueLength)
   338  
   339  	if manualMode {
   340  		manualPurge(ccu, *tag, *tagFile)
   341  	} else {
   342  		daemon(c, ap, logger, scope)
   343  	}
   344  }
   345  
   346  // manualPurge is called ad-hoc to purge either a single tag, or a batch of tags,
   347  // passed on the CLI. All tags will be added to a single request, please ensure
   348  // that you don't violate the Fast-Purge API limits for tags detailed here:
   349  // https://techdocs.akamai.com/purge-cache/reference/rate-limiting
   350  func manualPurge(purgeClient *akamai.CachePurgeClient, tag, tagFile string) {
   351  	var tags []string
   352  	if tag != "" {
   353  		tags = []string{tag}
   354  	} else {
   355  		contents, err := os.ReadFile(tagFile)
   356  		cmd.FailOnError(err, fmt.Sprintf("While reading %q", tagFile))
   357  		tags = strings.Split(string(contents), "\n")
   358  	}
   359  
   360  	err := purgeClient.PurgeTags(tags)
   361  	cmd.FailOnError(err, "Purging tags")
   362  }
   363  
   364  // daemon initializes the akamai-purger gRPC service.
   365  func daemon(c Config, ap *akamaiPurger, logger blog.Logger, scope prometheus.Registerer) {
   366  	clk := cmd.Clock()
   367  
   368  	tlsConfig, err := c.AkamaiPurger.TLS.Load(scope)
   369  	cmd.FailOnError(err, "tlsConfig config")
   370  
   371  	stop, stopped := make(chan bool, 1), make(chan bool, 1)
   372  	ticker := time.NewTicker(c.AkamaiPurger.Throughput.PurgeBatchInterval.Duration)
   373  	go func() {
   374  	loop:
   375  		for {
   376  			select {
   377  			case <-ticker.C:
   378  				batch := ap.takeBatch()
   379  				if batch == nil {
   380  					continue
   381  				}
   382  				_ = ap.purgeBatch(batch)
   383  			case <-stop:
   384  				break loop
   385  			}
   386  		}
   387  
   388  		// As we may have missed a tick by calling ticker.Stop() and
   389  		// writing to the stop channel call ap.purge one last time just
   390  		// in case there is anything that still needs to be purged.
   391  		stackLen := ap.len()
   392  		if stackLen > 0 {
   393  			logger.Infof("Shutting down; purging OCSP responses for %d certificates before exit.", stackLen)
   394  			batch := ap.takeBatch()
   395  			err := ap.purgeBatch(batch)
   396  			cmd.FailOnError(err, fmt.Sprintf("Shutting down; failed to purge OCSP responses for %d certificates before exit", stackLen))
   397  			logger.Infof("Shutting down; finished purging OCSP responses for %d certificates.", stackLen)
   398  		} else {
   399  			logger.Info("Shutting down; queue is already empty.")
   400  		}
   401  		stopped <- true
   402  	}()
   403  
   404  	// When the gRPC server finally exits, run a clean-up routine that stops the
   405  	// ticker and waits for the goroutine above to finish purging the stack.
   406  	defer func() {
   407  		// Stop the ticker and signal that we want to shutdown by writing to the
   408  		// stop channel. We wait 15 seconds for any remaining URLs to be emptied
   409  		// from the current stack, if we pass that deadline we exit early.
   410  		ticker.Stop()
   411  		stop <- true
   412  		select {
   413  		case <-time.After(time.Second * 15):
   414  			cmd.Fail("Timed out waiting for purger to finish work")
   415  		case <-stopped:
   416  		}
   417  	}()
   418  
   419  	start, err := bgrpc.NewServer(c.AkamaiPurger.GRPC, logger).Add(
   420  		&akamaipb.AkamaiPurger_ServiceDesc, ap).Build(tlsConfig, scope, clk)
   421  	cmd.FailOnError(err, "Unable to setup Akamai purger gRPC server")
   422  
   423  	cmd.FailOnError(start(), "akamai-purger gRPC service failed")
   424  }
   425  
   426  func init() {
   427  	cmd.RegisterCommand("akamai-purger", main, &cmd.ConfigValidator{Config: &Config{}})
   428  }
   429  

View as plain text