1 package notmain
2
3 import (
4 "context"
5 "errors"
6 "flag"
7 "fmt"
8 "math"
9 "os"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/prometheus/client_golang/prometheus"
15 "google.golang.org/protobuf/types/known/emptypb"
16
17 "github.com/letsencrypt/boulder/akamai"
18 akamaipb "github.com/letsencrypt/boulder/akamai/proto"
19 "github.com/letsencrypt/boulder/cmd"
20 "github.com/letsencrypt/boulder/config"
21 bgrpc "github.com/letsencrypt/boulder/grpc"
22 blog "github.com/letsencrypt/boulder/log"
23 )
24
25 const (
26
27
28
29
30
31 akamaiBytesPerResponse = 400
32
33
34
35 urlsPerQueueEntry = 3
36
37
38 defaultEntriesPerBatch = 2
39
40
41 defaultPurgeBatchInterval = time.Millisecond * 32
42
43
44
45
46 defaultQueueSize = 1250000
47
48
49
50
51
52 akamaiBytesPerReqLimit = 50000 - 1 - 19
53
54
55
56 akamaiAPIReqPerSecondLimit = 50
57
58
59
60 akamaiURLsPerSecondLimit = 200
61 )
62
63
64
65 type Throughput struct {
66
67
68
69
70 QueueEntriesPerBatch int
71
72
73
74
75 PurgeBatchInterval config.Duration `validate:"-"`
76 }
77
78 func (t *Throughput) useOptimizedDefaults() {
79 if t.QueueEntriesPerBatch == 0 {
80 t.QueueEntriesPerBatch = defaultEntriesPerBatch
81 }
82 if t.PurgeBatchInterval.Duration == 0 {
83 t.PurgeBatchInterval.Duration = defaultPurgeBatchInterval
84 }
85 }
86
87
88
89
90
91 func (t *Throughput) validate() error {
92 if t.PurgeBatchInterval.Duration == 0 {
93 return errors.New("'purgeBatchInterval' must be > 0")
94 }
95 if t.QueueEntriesPerBatch <= 0 {
96 return errors.New("'queueEntriesPerBatch' must be > 0")
97 }
98
99
100 bytesPerRequest := (t.QueueEntriesPerBatch * akamaiBytesPerResponse)
101 if bytesPerRequest > akamaiBytesPerReqLimit {
102 return fmt.Errorf("config exceeds Akamai's bytes per request limit (%d bytes) by %d",
103 akamaiBytesPerReqLimit, bytesPerRequest-akamaiBytesPerReqLimit)
104 }
105
106
107 requestsPerSecond := int(math.Ceil(float64(time.Second) / float64(t.PurgeBatchInterval.Duration)))
108 if requestsPerSecond > akamaiAPIReqPerSecondLimit {
109 return fmt.Errorf("config exceeds Akamai's requests per second limit (%d requests) by %d",
110 akamaiAPIReqPerSecondLimit, requestsPerSecond-akamaiAPIReqPerSecondLimit)
111 }
112
113
114 urlsPurgedPerSecond := requestsPerSecond * (t.QueueEntriesPerBatch * urlsPerQueueEntry)
115 if urlsPurgedPerSecond > akamaiURLsPerSecondLimit {
116 return fmt.Errorf("config exceeds Akamai's URLs per second limit (%d URLs) by %d",
117 akamaiURLsPerSecondLimit, urlsPurgedPerSecond-akamaiURLsPerSecondLimit)
118 }
119 return nil
120 }
121
122 type Config struct {
123 AkamaiPurger struct {
124 cmd.ServiceConfig
125
126
127
128 MaxQueueSize int
129
130 BaseURL string `validate:"required,url"`
131 ClientToken string `validate:"required"`
132 ClientSecret string `validate:"required"`
133 AccessToken string `validate:"required"`
134 V3Network string `validate:"required,oneof=staging production"`
135
136
137
138 Throughput Throughput
139
140
141
142 PurgeRetries int
143
144
145
146
147 PurgeRetryBackoff config.Duration `validate:"-"`
148 }
149 Syslog cmd.SyslogConfig
150 OpenTelemetry cmd.OpenTelemetryConfig
151 }
152
153
154 type cachePurgeClient interface {
155 Purge(urls []string) error
156 }
157
158
159
160
161
162 type akamaiPurger struct {
163 sync.Mutex
164 akamaipb.UnimplementedAkamaiPurgerServer
165
166
167
168 toPurge [][]string
169 maxStackSize int
170 entriesPerBatch int
171 client cachePurgeClient
172 log blog.Logger
173 }
174
175 func (ap *akamaiPurger) len() int {
176 ap.Lock()
177 defer ap.Unlock()
178 return len(ap.toPurge)
179 }
180
181 func (ap *akamaiPurger) purgeBatch(batch [][]string) error {
182
183 var urls []string
184 for _, url := range batch {
185 urls = append(urls, url...)
186 }
187
188 err := ap.client.Purge(urls)
189 if err != nil {
190 ap.log.Errf("Failed to purge %d OCSP responses (%s): %s", len(batch), strings.Join(urls, ","), err)
191 return err
192 }
193 return nil
194 }
195
196 func (ap *akamaiPurger) takeBatch() [][]string {
197 ap.Lock()
198 defer ap.Unlock()
199 stackSize := len(ap.toPurge)
200
201
202 if stackSize <= 0 {
203 return nil
204 }
205
206
207
208 batchSize := ap.entriesPerBatch
209 if stackSize < batchSize {
210 batchSize = stackSize
211 }
212
213 batchBegin := stackSize - batchSize
214 batch := ap.toPurge[batchBegin:]
215 ap.toPurge = ap.toPurge[:batchBegin]
216 return batch
217 }
218
219
220
221 func (ap *akamaiPurger) Purge(ctx context.Context, req *akamaipb.PurgeRequest) (*emptypb.Empty, error) {
222 ap.Lock()
223 defer ap.Unlock()
224 stackSize := len(ap.toPurge)
225 if stackSize >= ap.maxStackSize {
226
227 ap.toPurge = ap.toPurge[1:]
228 }
229
230 ap.toPurge = append(ap.toPurge, req.Urls)
231 return &emptypb.Empty{}, nil
232 }
233
234 func main() {
235 daemonFlags := flag.NewFlagSet("daemon", flag.ContinueOnError)
236 grpcAddr := daemonFlags.String("addr", "", "gRPC listen address override")
237 debugAddr := daemonFlags.String("debug-addr", "", "Debug server address override")
238 configFile := daemonFlags.String("config", "", "File path to the configuration file for this service")
239
240 manualFlags := flag.NewFlagSet("manual", flag.ExitOnError)
241 manualConfigFile := manualFlags.String("config", "", "File path to the configuration file for this service")
242 tag := manualFlags.String("tag", "", "Single cache tag to purge")
243 tagFile := manualFlags.String("tag-file", "", "File containing cache tags to purge, one per line")
244
245 if len(os.Args) < 2 {
246 fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0])
247 daemonFlags.PrintDefaults()
248 fmt.Fprintln(os.Stderr, "OR:")
249 fmt.Fprintf(os.Stderr, "%s manual <flags>\n", os.Args[0])
250 manualFlags.PrintDefaults()
251 os.Exit(1)
252 }
253
254
255
256 var manualMode bool
257 if os.Args[1] == "manual" {
258 manualMode = true
259 _ = manualFlags.Parse(os.Args[2:])
260 if *configFile == "" {
261 manualFlags.Usage()
262 os.Exit(1)
263 }
264 if *tag == "" && *tagFile == "" {
265 cmd.Fail("Must specify one of --tag or --tag-file for manual purge")
266 } else if *tag != "" && *tagFile != "" {
267 cmd.Fail("Cannot specify both of --tag and --tag-file for manual purge")
268 }
269 configFile = manualConfigFile
270 } else {
271 err := daemonFlags.Parse(os.Args[1:])
272 if err != nil {
273 fmt.Fprintf(os.Stderr, "OR:\n%s manual -config conf.json [-tag Foo] [-tag-file]\n", os.Args[0])
274 os.Exit(1)
275 }
276 if *configFile == "" {
277 daemonFlags.Usage()
278 os.Exit(1)
279 }
280 }
281
282 var c Config
283 err := cmd.ReadConfigFile(*configFile, &c)
284 cmd.FailOnError(err, "Reading JSON config file into config structure")
285
286
287 apc := &c.AkamaiPurger
288
289 if *grpcAddr != "" {
290 apc.GRPC.Address = *grpcAddr
291 }
292 if *debugAddr != "" {
293 apc.DebugAddr = *debugAddr
294 }
295
296 scope, logger, oTelShutdown := cmd.StatsAndLogging(c.Syslog, c.OpenTelemetry, apc.DebugAddr)
297 defer oTelShutdown(context.Background())
298 logger.Info(cmd.VersionString())
299
300
301 if (apc.Throughput == Throughput{}) {
302 apc.Throughput.useOptimizedDefaults()
303 }
304 cmd.FailOnError(apc.Throughput.validate(), "")
305
306 if apc.MaxQueueSize == 0 {
307 apc.MaxQueueSize = defaultQueueSize
308 }
309
310 ccu, err := akamai.NewCachePurgeClient(
311 apc.BaseURL,
312 apc.ClientToken,
313 apc.ClientSecret,
314 apc.AccessToken,
315 apc.V3Network,
316 apc.PurgeRetries,
317 apc.PurgeRetryBackoff.Duration,
318 logger,
319 scope,
320 )
321 cmd.FailOnError(err, "Failed to setup Akamai CCU client")
322
323 ap := &akamaiPurger{
324 maxStackSize: apc.MaxQueueSize,
325 entriesPerBatch: apc.Throughput.QueueEntriesPerBatch,
326 client: ccu,
327 log: logger,
328 }
329
330 var gaugePurgeQueueLength = prometheus.NewGaugeFunc(
331 prometheus.GaugeOpts{
332 Name: "ccu_purge_queue_length",
333 Help: "The length of the akamai-purger queue. Captured on each prometheus scrape.",
334 },
335 func() float64 { return float64(ap.len()) },
336 )
337 scope.MustRegister(gaugePurgeQueueLength)
338
339 if manualMode {
340 manualPurge(ccu, *tag, *tagFile)
341 } else {
342 daemon(c, ap, logger, scope)
343 }
344 }
345
346
347
348
349
350 func manualPurge(purgeClient *akamai.CachePurgeClient, tag, tagFile string) {
351 var tags []string
352 if tag != "" {
353 tags = []string{tag}
354 } else {
355 contents, err := os.ReadFile(tagFile)
356 cmd.FailOnError(err, fmt.Sprintf("While reading %q", tagFile))
357 tags = strings.Split(string(contents), "\n")
358 }
359
360 err := purgeClient.PurgeTags(tags)
361 cmd.FailOnError(err, "Purging tags")
362 }
363
364
365 func daemon(c Config, ap *akamaiPurger, logger blog.Logger, scope prometheus.Registerer) {
366 clk := cmd.Clock()
367
368 tlsConfig, err := c.AkamaiPurger.TLS.Load(scope)
369 cmd.FailOnError(err, "tlsConfig config")
370
371 stop, stopped := make(chan bool, 1), make(chan bool, 1)
372 ticker := time.NewTicker(c.AkamaiPurger.Throughput.PurgeBatchInterval.Duration)
373 go func() {
374 loop:
375 for {
376 select {
377 case <-ticker.C:
378 batch := ap.takeBatch()
379 if batch == nil {
380 continue
381 }
382 _ = ap.purgeBatch(batch)
383 case <-stop:
384 break loop
385 }
386 }
387
388
389
390
391 stackLen := ap.len()
392 if stackLen > 0 {
393 logger.Infof("Shutting down; purging OCSP responses for %d certificates before exit.", stackLen)
394 batch := ap.takeBatch()
395 err := ap.purgeBatch(batch)
396 cmd.FailOnError(err, fmt.Sprintf("Shutting down; failed to purge OCSP responses for %d certificates before exit", stackLen))
397 logger.Infof("Shutting down; finished purging OCSP responses for %d certificates.", stackLen)
398 } else {
399 logger.Info("Shutting down; queue is already empty.")
400 }
401 stopped <- true
402 }()
403
404
405
406 defer func() {
407
408
409
410 ticker.Stop()
411 stop <- true
412 select {
413 case <-time.After(time.Second * 15):
414 cmd.Fail("Timed out waiting for purger to finish work")
415 case <-stopped:
416 }
417 }()
418
419 start, err := bgrpc.NewServer(c.AkamaiPurger.GRPC, logger).Add(
420 &akamaipb.AkamaiPurger_ServiceDesc, ap).Build(tlsConfig, scope, clk)
421 cmd.FailOnError(err, "Unable to setup Akamai purger gRPC server")
422
423 cmd.FailOnError(start(), "akamai-purger gRPC service failed")
424 }
425
426 func init() {
427 cmd.RegisterCommand("akamai-purger", main, &cmd.ConfigValidator{Config: &Config{}})
428 }
429
View as plain text