...

Source file src/github.com/letsencrypt/boulder/cmd/expiration-mailer/main.go

Documentation: github.com/letsencrypt/boulder/cmd/expiration-mailer

     1  package notmain
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"crypto/x509"
     7  	"encoding/json"
     8  	"errors"
     9  	"flag"
    10  	"fmt"
    11  	"math"
    12  	netmail "net/mail"
    13  	"net/url"
    14  	"os"
    15  	"sort"
    16  	"strings"
    17  	"sync"
    18  	"text/template"
    19  	"time"
    20  
    21  	"github.com/jmhodges/clock"
    22  	"google.golang.org/grpc"
    23  
    24  	"github.com/prometheus/client_golang/prometheus"
    25  
    26  	"github.com/letsencrypt/boulder/cmd"
    27  	"github.com/letsencrypt/boulder/config"
    28  	"github.com/letsencrypt/boulder/core"
    29  	corepb "github.com/letsencrypt/boulder/core/proto"
    30  	"github.com/letsencrypt/boulder/db"
    31  	"github.com/letsencrypt/boulder/features"
    32  	bgrpc "github.com/letsencrypt/boulder/grpc"
    33  	blog "github.com/letsencrypt/boulder/log"
    34  	bmail "github.com/letsencrypt/boulder/mail"
    35  	"github.com/letsencrypt/boulder/metrics"
    36  	"github.com/letsencrypt/boulder/sa"
    37  	sapb "github.com/letsencrypt/boulder/sa/proto"
    38  )
    39  
    40  const (
    41  	defaultExpirationSubject = "Let's Encrypt certificate expiration notice for domain {{.ExpirationSubject}}"
    42  )
    43  
    44  type regStore interface {
    45  	GetRegistration(ctx context.Context, req *sapb.RegistrationID, _ ...grpc.CallOption) (*corepb.Registration, error)
    46  }
    47  
    48  // limiter tracks how many mails we've sent to a given address in a given day.
    49  // Note that this does not track mails across restarts of the process.
    50  // Modifications to `counts` and `currentDay` are protected by a mutex.
    51  type limiter struct {
    52  	sync.RWMutex
    53  	// currentDay is a day in UTC, truncated to 24 hours. When the current
    54  	// time is more than 24 hours past this date, all counts reset and this
    55  	// date is updated.
    56  	currentDay time.Time
    57  
    58  	// counts is a map from address to number of mails we have attempted to
    59  	// send during `currentDay`.
    60  	counts map[string]int
    61  
    62  	// limit is the number of sends after which we'll return an error from
    63  	// check()
    64  	limit int
    65  
    66  	clk clock.Clock
    67  }
    68  
    69  const oneDay = 24 * time.Hour
    70  
    71  // maybeBumpDay updates lim.currentDay if its current value is more than 24
    72  // hours ago, and resets the counts map. Expects limiter is locked.
    73  func (lim *limiter) maybeBumpDay() {
    74  	today := lim.clk.Now().Truncate(oneDay)
    75  	if (today.Sub(lim.currentDay) >= oneDay && len(lim.counts) > 0) ||
    76  		lim.counts == nil {
    77  		// Throw away counts so far and switch to a new day.
    78  		// This also does the initialization of counts and currentDay the first
    79  		// time inc() is called.
    80  		lim.counts = make(map[string]int)
    81  		lim.currentDay = today
    82  	}
    83  }
    84  
    85  // inc increments the count for the current day, and cleans up previous days
    86  // if needed.
    87  func (lim *limiter) inc(address string) {
    88  	lim.Lock()
    89  	defer lim.Unlock()
    90  
    91  	lim.maybeBumpDay()
    92  
    93  	lim.counts[address] += 1
    94  }
    95  
    96  // check checks whether the count for the given address is at the limit,
    97  // and returns an error if so.
    98  func (lim *limiter) check(address string) error {
    99  	lim.RLock()
   100  	defer lim.RUnlock()
   101  
   102  	lim.maybeBumpDay()
   103  	if lim.counts[address] >= lim.limit {
   104  		return fmt.Errorf("daily mail limit exceeded for %q", address)
   105  	}
   106  	return nil
   107  }
   108  
   109  type mailer struct {
   110  	log                 blog.Logger
   111  	dbMap               *db.WrappedMap
   112  	rs                  regStore
   113  	mailer              bmail.Mailer
   114  	emailTemplate       *template.Template
   115  	subjectTemplate     *template.Template
   116  	nagTimes            []time.Duration
   117  	parallelSends       uint
   118  	certificatesPerTick int
   119  	// addressLimiter limits how many mails we'll send to a single address in
   120  	// a single day.
   121  	addressLimiter *limiter
   122  	// Maximum number of rows to update in a single SQL UPDATE statement.
   123  	updateChunkSize int
   124  	clk             clock.Clock
   125  	stats           mailerStats
   126  }
   127  
   128  type certDERWithRegID struct {
   129  	DER   core.CertDER
   130  	RegID int64
   131  }
   132  
   133  type mailerStats struct {
   134  	sendDelay                         *prometheus.GaugeVec
   135  	sendDelayHistogram                *prometheus.HistogramVec
   136  	nagsAtCapacity                    *prometheus.GaugeVec
   137  	errorCount                        *prometheus.CounterVec
   138  	sendLatency                       prometheus.Histogram
   139  	processingLatency                 prometheus.Histogram
   140  	certificatesExamined              prometheus.Counter
   141  	certificatesAlreadyRenewed        prometheus.Counter
   142  	certificatesPerAccountNeedingMail prometheus.Histogram
   143  }
   144  
   145  func (m *mailer) sendNags(conn bmail.Conn, contacts []string, certs []*x509.Certificate) error {
   146  	if len(certs) == 0 {
   147  		return errors.New("no certs given to send nags for")
   148  	}
   149  	emails := []string{}
   150  	for _, contact := range contacts {
   151  		parsed, err := url.Parse(contact)
   152  		if err != nil {
   153  			m.log.Errf("parsing contact email %s: %s", contact, err)
   154  			continue
   155  		}
   156  		if parsed.Scheme != "mailto" {
   157  			continue
   158  		}
   159  		address := parsed.Opaque
   160  		err = m.addressLimiter.check(address)
   161  		if err != nil {
   162  			m.log.Infof("not sending mail: %s", err)
   163  			continue
   164  		}
   165  		m.addressLimiter.inc(address)
   166  		emails = append(emails, parsed.Opaque)
   167  	}
   168  	if len(emails) == 0 {
   169  		return nil
   170  	}
   171  
   172  	expiresIn := time.Duration(math.MaxInt64)
   173  	expDate := m.clk.Now()
   174  	domains := []string{}
   175  	serials := []string{}
   176  
   177  	// Pick out the expiration date that is closest to being hit.
   178  	for _, cert := range certs {
   179  		domains = append(domains, cert.DNSNames...)
   180  		serials = append(serials, core.SerialToString(cert.SerialNumber))
   181  		possible := cert.NotAfter.Sub(m.clk.Now())
   182  		if possible < expiresIn {
   183  			expiresIn = possible
   184  			expDate = cert.NotAfter
   185  		}
   186  	}
   187  	domains = core.UniqueLowerNames(domains)
   188  	sort.Strings(domains)
   189  
   190  	const maxSerials = 100
   191  	truncatedSerials := serials
   192  	if len(truncatedSerials) > maxSerials {
   193  		truncatedSerials = serials[0:maxSerials]
   194  	}
   195  
   196  	const maxDomains = 100
   197  	truncatedDomains := domains
   198  	if len(truncatedDomains) > maxDomains {
   199  		truncatedDomains = domains[0:maxDomains]
   200  	}
   201  
   202  	// Construct the information about the expiring certificates for use in the
   203  	// subject template
   204  	expiringSubject := fmt.Sprintf("%q", domains[0])
   205  	if len(domains) > 1 {
   206  		expiringSubject += fmt.Sprintf(" (and %d more)", len(domains)-1)
   207  	}
   208  
   209  	// Execute the subjectTemplate by filling in the ExpirationSubject
   210  	subjBuf := new(bytes.Buffer)
   211  	err := m.subjectTemplate.Execute(subjBuf, struct {
   212  		ExpirationSubject string
   213  	}{
   214  		ExpirationSubject: expiringSubject,
   215  	})
   216  	if err != nil {
   217  		m.stats.errorCount.With(prometheus.Labels{"type": "SubjectTemplateFailure"}).Inc()
   218  		return err
   219  	}
   220  
   221  	email := struct {
   222  		ExpirationDate     string
   223  		DaysToExpiration   int
   224  		DNSNames           string
   225  		TruncatedDNSNames  string
   226  		NumDNSNamesOmitted int
   227  	}{
   228  		ExpirationDate:     expDate.UTC().Format(time.DateOnly),
   229  		DaysToExpiration:   int(expiresIn.Hours() / 24),
   230  		DNSNames:           strings.Join(domains, "\n"),
   231  		TruncatedDNSNames:  strings.Join(truncatedDomains, "\n"),
   232  		NumDNSNamesOmitted: len(domains) - len(truncatedDomains),
   233  	}
   234  	msgBuf := new(bytes.Buffer)
   235  	err = m.emailTemplate.Execute(msgBuf, email)
   236  	if err != nil {
   237  		m.stats.errorCount.With(prometheus.Labels{"type": "TemplateFailure"}).Inc()
   238  		return err
   239  	}
   240  
   241  	logItem := struct {
   242  		Rcpt              []string
   243  		DaysToExpiration  int
   244  		TruncatedDNSNames []string
   245  		TruncatedSerials  []string
   246  	}{
   247  		Rcpt:              emails,
   248  		DaysToExpiration:  email.DaysToExpiration,
   249  		TruncatedDNSNames: truncatedDomains,
   250  		TruncatedSerials:  truncatedSerials,
   251  	}
   252  	logStr, err := json.Marshal(logItem)
   253  	if err != nil {
   254  		m.log.Errf("logItem could not be serialized to JSON. Raw: %+v", logItem)
   255  		return err
   256  	}
   257  	m.log.Infof("attempting send JSON=%s", string(logStr))
   258  
   259  	startSending := m.clk.Now()
   260  	err = conn.SendMail(emails, subjBuf.String(), msgBuf.String())
   261  	if err != nil {
   262  		m.log.Errf("failed send JSON=%s err=%s", string(logStr), err)
   263  		return err
   264  	}
   265  	finishSending := m.clk.Now()
   266  	elapsed := finishSending.Sub(startSending)
   267  	m.stats.sendLatency.Observe(elapsed.Seconds())
   268  	return nil
   269  }
   270  
   271  // updateLastNagTimestamps updates the lastExpirationNagSent column for every cert in
   272  // the given list. Even though it can encounter errors, it only logs them and
   273  // does not return them, because we always prefer to simply continue.
   274  func (m *mailer) updateLastNagTimestamps(ctx context.Context, certs []*x509.Certificate) {
   275  	for len(certs) > 0 {
   276  		size := len(certs)
   277  		if m.updateChunkSize > 0 && size > m.updateChunkSize {
   278  			size = m.updateChunkSize
   279  		}
   280  		chunk := certs[0:size]
   281  		certs = certs[size:]
   282  		m.updateLastNagTimestampsChunk(ctx, chunk)
   283  	}
   284  }
   285  
   286  // updateLastNagTimestampsChunk processes a single chunk (up to 65k) of certificates.
   287  func (m *mailer) updateLastNagTimestampsChunk(ctx context.Context, certs []*x509.Certificate) {
   288  	params := make([]interface{}, len(certs)+1)
   289  	for i, cert := range certs {
   290  		params[i+1] = core.SerialToString(cert.SerialNumber)
   291  	}
   292  
   293  	query := fmt.Sprintf(
   294  		"UPDATE certificateStatus SET lastExpirationNagSent = ? WHERE serial IN (%s)",
   295  		db.QuestionMarks(len(certs)),
   296  	)
   297  	params[0] = m.clk.Now()
   298  
   299  	_, err := m.dbMap.ExecContext(ctx, query, params...)
   300  	if err != nil {
   301  		m.log.AuditErrf("Error updating certificate status for %d certs: %s", len(certs), err)
   302  		m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc()
   303  	}
   304  }
   305  
   306  func (m *mailer) certIsRenewed(ctx context.Context, names []string, issued time.Time) (bool, error) {
   307  	namehash := core.HashNames(names)
   308  
   309  	var present bool
   310  	err := m.dbMap.SelectOne(
   311  		ctx,
   312  		&present,
   313  		`SELECT EXISTS (SELECT id FROM fqdnSets WHERE setHash = ? AND issued > ? LIMIT 1)`,
   314  		namehash,
   315  		issued,
   316  	)
   317  	return present, err
   318  }
   319  
   320  type work struct {
   321  	regID    int64
   322  	certDERs []core.CertDER
   323  }
   324  
   325  func (m *mailer) processCerts(
   326  	ctx context.Context,
   327  	allCerts []certDERWithRegID,
   328  	expiresIn time.Duration,
   329  ) error {
   330  	regIDToCertDERs := make(map[int64][]core.CertDER)
   331  
   332  	for _, cert := range allCerts {
   333  		cs := regIDToCertDERs[cert.RegID]
   334  		cs = append(cs, cert.DER)
   335  		regIDToCertDERs[cert.RegID] = cs
   336  	}
   337  
   338  	parallelSends := m.parallelSends
   339  	if parallelSends == 0 {
   340  		parallelSends = 1
   341  	}
   342  
   343  	var wg sync.WaitGroup
   344  	workChan := make(chan work, len(regIDToCertDERs))
   345  
   346  	// Populate the work chan on a goroutine so work is available as soon
   347  	// as one of the sender routines starts.
   348  	go func(ch chan<- work) {
   349  		for regID, certs := range regIDToCertDERs {
   350  			ch <- work{regID, certs}
   351  		}
   352  		close(workChan)
   353  	}(workChan)
   354  
   355  	for senderNum := uint(0); senderNum < parallelSends; senderNum++ {
   356  		// For politeness' sake, don't open more than 1 new connection per
   357  		// second.
   358  		if senderNum > 0 {
   359  			time.Sleep(time.Second)
   360  		}
   361  
   362  		if ctx.Err() != nil {
   363  			return ctx.Err()
   364  		}
   365  
   366  		conn, err := m.mailer.Connect()
   367  		if err != nil {
   368  			m.log.AuditErrf("connecting parallel sender %d: %s", senderNum, err)
   369  			return err
   370  		}
   371  		wg.Add(1)
   372  		go func(conn bmail.Conn, ch <-chan work) {
   373  			defer wg.Done()
   374  			for w := range ch {
   375  				err := m.sendToOneRegID(ctx, conn, w.regID, w.certDERs, expiresIn)
   376  				if err != nil {
   377  					m.log.AuditErr(err.Error())
   378  				}
   379  			}
   380  			conn.Close()
   381  		}(conn, workChan)
   382  	}
   383  	wg.Wait()
   384  	return nil
   385  }
   386  
   387  func (m *mailer) sendToOneRegID(ctx context.Context, conn bmail.Conn, regID int64, certDERs []core.CertDER, expiresIn time.Duration) error {
   388  	if ctx.Err() != nil {
   389  		return ctx.Err()
   390  	}
   391  	if len(certDERs) == 0 {
   392  		return errors.New("shouldn't happen: empty certificate list in sendToOneRegID")
   393  	}
   394  	reg, err := m.rs.GetRegistration(ctx, &sapb.RegistrationID{Id: regID})
   395  	if err != nil {
   396  		m.stats.errorCount.With(prometheus.Labels{"type": "GetRegistration"}).Inc()
   397  		return fmt.Errorf("Error fetching registration %d: %s", regID, err)
   398  	}
   399  
   400  	parsedCerts := []*x509.Certificate{}
   401  	for i, certDER := range certDERs {
   402  		if ctx.Err() != nil {
   403  			return ctx.Err()
   404  		}
   405  		parsedCert, err := x509.ParseCertificate(certDER)
   406  		if err != nil {
   407  			// TODO(#1420): tell registration about this error
   408  			m.log.AuditErrf("Error parsing certificate: %s. Body: %x", err, certDER)
   409  			m.stats.errorCount.With(prometheus.Labels{"type": "ParseCertificate"}).Inc()
   410  			continue
   411  		}
   412  
   413  		// The histogram version of send delay reports the worst case send delay for
   414  		// a single regID in this cycle.
   415  		if i == 0 {
   416  			sendDelay := expiresIn - parsedCert.NotAfter.Sub(m.clk.Now())
   417  			m.stats.sendDelayHistogram.With(prometheus.Labels{"nag_group": expiresIn.String()}).Observe(
   418  				sendDelay.Truncate(time.Second).Seconds())
   419  		}
   420  
   421  		renewed, err := m.certIsRenewed(ctx, parsedCert.DNSNames, parsedCert.NotBefore)
   422  		if err != nil {
   423  			m.log.AuditErrf("expiration-mailer: error fetching renewal state: %v", err)
   424  			// assume not renewed
   425  		} else if renewed {
   426  			m.log.Debugf("Cert %s is already renewed", core.SerialToString(parsedCert.SerialNumber))
   427  			m.stats.certificatesAlreadyRenewed.Add(1)
   428  			m.updateLastNagTimestamps(ctx, []*x509.Certificate{parsedCert})
   429  			continue
   430  		}
   431  
   432  		parsedCerts = append(parsedCerts, parsedCert)
   433  	}
   434  
   435  	m.stats.certificatesPerAccountNeedingMail.Observe(float64(len(parsedCerts)))
   436  
   437  	if len(parsedCerts) == 0 {
   438  		// all certificates are renewed
   439  		return nil
   440  	}
   441  
   442  	err = m.sendNags(conn, reg.Contact, parsedCerts)
   443  	if err != nil {
   444  		// Check to see if the error was due to the mail being undeliverable,
   445  		// in which case we don't want to try again later.
   446  		var badAddrErr *bmail.BadAddressSMTPError
   447  		if ok := errors.As(err, &badAddrErr); ok {
   448  			m.updateLastNagTimestamps(ctx, parsedCerts)
   449  		}
   450  
   451  		m.stats.errorCount.With(prometheus.Labels{"type": "SendNags"}).Inc()
   452  		return fmt.Errorf("sending nag emails: %s", err)
   453  	}
   454  
   455  	m.updateLastNagTimestamps(ctx, parsedCerts)
   456  	return nil
   457  }
   458  
   459  // findExpiringCertificates finds certificates that might need an expiration mail, filters them,
   460  // groups by account, sends mail, and updates their status in the DB so we don't examine them again.
   461  //
   462  // Invariant: findExpiringCertificates should examine each certificate at most N times, where
   463  // N is the number of reminders. For every certificate examined (barring errors), this function
   464  // should update the lastExpirationNagSent field of certificateStatus, so it does not need to
   465  // examine the same certificate again on the next go-round. This ensures we make forward progress
   466  // and don't clog up the window of certificates to be examined.
   467  func (m *mailer) findExpiringCertificates(ctx context.Context) error {
   468  	now := m.clk.Now()
   469  	// E.g. m.nagTimes = [2, 4, 8, 15] days from expiration
   470  	for i, expiresIn := range m.nagTimes {
   471  		left := now
   472  		if i > 0 {
   473  			left = left.Add(m.nagTimes[i-1])
   474  		}
   475  		right := now.Add(expiresIn)
   476  
   477  		m.log.Infof("expiration-mailer: Searching for certificates that expire between %s and %s and had last nag >%s before expiry",
   478  			left.UTC(), right.UTC(), expiresIn)
   479  
   480  		var certs []certDERWithRegID
   481  		var err error
   482  		if features.Enabled(features.ExpirationMailerUsesJoin) {
   483  			certs, err = m.getCertsWithJoin(ctx, left, right, expiresIn)
   484  		} else {
   485  			certs, err = m.getCerts(ctx, left, right, expiresIn)
   486  		}
   487  		if err != nil {
   488  			return err
   489  		}
   490  
   491  		m.stats.certificatesExamined.Add(float64(len(certs)))
   492  
   493  		// If the number of rows was exactly `m.certificatesPerTick` rows we need to increment
   494  		// a stat indicating that this nag group is at capacity. If this condition
   495  		// continually occurs across mailer runs then we will not catch up,
   496  		// resulting in under-sending expiration mails. The effects of this
   497  		// were initially described in issue #2002[0].
   498  		//
   499  		// 0: https://github.com/letsencrypt/boulder/issues/2002
   500  		atCapacity := float64(0)
   501  		if len(certs) == m.certificatesPerTick {
   502  			m.log.Infof("nag group %s expiring certificates at configured capacity (select limit %d)",
   503  				expiresIn.String(), m.certificatesPerTick)
   504  			atCapacity = float64(1)
   505  		}
   506  		m.stats.nagsAtCapacity.With(prometheus.Labels{"nag_group": expiresIn.String()}).Set(atCapacity)
   507  
   508  		m.log.Infof("Found %d certificates expiring between %s and %s", len(certs),
   509  			left.Format(time.DateTime), right.Format(time.DateTime))
   510  
   511  		if len(certs) == 0 {
   512  			continue // nothing to do
   513  		}
   514  
   515  		processingStarted := m.clk.Now()
   516  		err = m.processCerts(ctx, certs, expiresIn)
   517  		if err != nil {
   518  			m.log.AuditErr(err.Error())
   519  		}
   520  		processingEnded := m.clk.Now()
   521  		elapsed := processingEnded.Sub(processingStarted)
   522  		m.stats.processingLatency.Observe(elapsed.Seconds())
   523  	}
   524  
   525  	return nil
   526  }
   527  
   528  func (m *mailer) getCertsWithJoin(ctx context.Context, left, right time.Time, expiresIn time.Duration) ([]certDERWithRegID, error) {
   529  	// First we do a query on the certificateStatus table to find certificates
   530  	// nearing expiry meeting our criteria for email notification. We later
   531  	// sequentially fetch the certificate details. This avoids an expensive
   532  	// JOIN.
   533  	var certs []certDERWithRegID
   534  	_, err := m.dbMap.Select(
   535  		ctx,
   536  		&certs,
   537  		`SELECT
   538  				cert.der as der, cert.registrationID as regID
   539  				FROM certificateStatus AS cs
   540  				JOIN certificates as cert
   541  				ON cs.serial = cert.serial
   542  				AND cs.notAfter > :cutoffA
   543  				AND cs.notAfter <= :cutoffB
   544  				AND cs.status != "revoked"
   545  				AND COALESCE(TIMESTAMPDIFF(SECOND, cs.lastExpirationNagSent, cs.notAfter) > :nagCutoff, 1)
   546  				ORDER BY cs.notAfter ASC
   547  				LIMIT :certificatesPerTick`,
   548  		map[string]interface{}{
   549  			"cutoffA":             left,
   550  			"cutoffB":             right,
   551  			"nagCutoff":           expiresIn.Seconds(),
   552  			"certificatesPerTick": m.certificatesPerTick,
   553  		},
   554  	)
   555  	if err != nil {
   556  		m.log.AuditErrf("expiration-mailer: Error loading certificate serials: %s", err)
   557  		return nil, err
   558  	}
   559  	m.log.Debugf("found %d certificates", len(certs))
   560  	return certs, nil
   561  }
   562  
   563  func (m *mailer) getCerts(ctx context.Context, left, right time.Time, expiresIn time.Duration) ([]certDERWithRegID, error) {
   564  	// First we do a query on the certificateStatus table to find certificates
   565  	// nearing expiry meeting our criteria for email notification. We later
   566  	// sequentially fetch the certificate details. This avoids an expensive
   567  	// JOIN.
   568  	var serials []string
   569  	_, err := m.dbMap.Select(
   570  		ctx,
   571  		&serials,
   572  		`SELECT
   573  				cs.serial
   574  				FROM certificateStatus AS cs
   575  				WHERE cs.notAfter > :cutoffA
   576  				AND cs.notAfter <= :cutoffB
   577  				AND cs.status != "revoked"
   578  				AND COALESCE(TIMESTAMPDIFF(SECOND, cs.lastExpirationNagSent, cs.notAfter) > :nagCutoff, 1)
   579  				ORDER BY cs.notAfter ASC
   580  				LIMIT :certificatesPerTick`,
   581  		map[string]interface{}{
   582  			"cutoffA":             left,
   583  			"cutoffB":             right,
   584  			"nagCutoff":           expiresIn.Seconds(),
   585  			"certificatesPerTick": m.certificatesPerTick,
   586  		},
   587  	)
   588  	if err != nil {
   589  		m.log.AuditErrf("expiration-mailer: Error loading certificate serials: %s", err)
   590  		return nil, err
   591  	}
   592  	m.log.Debugf("found %d certificates", len(serials))
   593  
   594  	// Now we can sequentially retrieve the certificate details for each of the
   595  	// certificate status rows
   596  	var certs []certDERWithRegID
   597  	for i, serial := range serials {
   598  		if ctx.Err() != nil {
   599  			return nil, ctx.Err()
   600  		}
   601  		var cert core.Certificate
   602  		cert, err := sa.SelectCertificate(ctx, m.dbMap, serial)
   603  		if err != nil {
   604  			// We can get a NoRowsErr when processing a serial number corresponding
   605  			// to a precertificate with no final certificate. Since this certificate
   606  			// is not being used by a subscriber, we don't send expiration email about
   607  			// it.
   608  			if db.IsNoRows(err) {
   609  				m.log.Infof("no rows for serial %q", serial)
   610  				continue
   611  			}
   612  			m.log.AuditErrf("expiration-mailer: Error loading cert %q: %s", cert.Serial, err)
   613  			continue
   614  		}
   615  		certs = append(certs, certDERWithRegID{
   616  			DER:   cert.DER,
   617  			RegID: cert.RegistrationID,
   618  		})
   619  		if i == 0 {
   620  			// Report the send delay metric. Note: this is the worst-case send delay
   621  			// of any certificate in this batch because it's based on the first (oldest).
   622  			sendDelay := expiresIn - cert.Expires.Sub(m.clk.Now())
   623  			m.stats.sendDelay.With(prometheus.Labels{"nag_group": expiresIn.String()}).Set(
   624  				sendDelay.Truncate(time.Second).Seconds())
   625  		}
   626  	}
   627  
   628  	return certs, nil
   629  }
   630  
   631  type durationSlice []time.Duration
   632  
   633  func (ds durationSlice) Len() int {
   634  	return len(ds)
   635  }
   636  
   637  func (ds durationSlice) Less(a, b int) bool {
   638  	return ds[a] < ds[b]
   639  }
   640  
   641  func (ds durationSlice) Swap(a, b int) {
   642  	ds[a], ds[b] = ds[b], ds[a]
   643  }
   644  
   645  type Config struct {
   646  	Mailer struct {
   647  		DebugAddr string `validate:"required,hostname_port"`
   648  		DB        cmd.DBConfig
   649  		cmd.SMTPConfig
   650  
   651  		// From is an RFC 5322 formatted "From" address for reminder messages,
   652  		// e.g. "Example <example@test.org>"
   653  		From string `validate:"required"`
   654  
   655  		// Subject is the Subject line of reminder messages. This is a Go
   656  		// template with a single variable: ExpirationSubject, which contains
   657  		// a list of affected hostnames, possibly truncated.
   658  		Subject string
   659  
   660  		// CertLimit is the maximum number of certificates to investigate in a
   661  		// single batch. Defaults to 100.
   662  		CertLimit int `validate:"min=0"`
   663  
   664  		// MailsPerAddressPerDay is the maximum number of emails we'll send to
   665  		// a single address in a single day. Defaults to 0 (unlimited).
   666  		// Note that this does not track sends across restarts of the process,
   667  		// so we may send more than this when we restart expiration-mailer.
   668  		// This is a best-effort limitation. Defaults to math.MaxInt.
   669  		MailsPerAddressPerDay int `validate:"min=0"`
   670  
   671  		// UpdateChunkSize is the maximum number of rows to update in a single
   672  		// SQL UPDATE statement.
   673  		UpdateChunkSize int `validate:"min=0,max=65535"`
   674  
   675  		NagTimes []string `validate:"min=1,dive,required"`
   676  
   677  		// Path to a text/template email template with a .gotmpl or .txt file
   678  		// extension.
   679  		EmailTemplate string `validate:"required"`
   680  
   681  		// How often to process a batch of certificates
   682  		Frequency config.Duration
   683  
   684  		// ParallelSends is the number of parallel goroutines used to process
   685  		// each batch of emails. Defaults to 1.
   686  		ParallelSends uint
   687  
   688  		TLS       cmd.TLSConfig
   689  		SAService *cmd.GRPCClientConfig
   690  
   691  		// Path to a file containing a list of trusted root certificates for use
   692  		// during the SMTP connection (as opposed to the gRPC connections).
   693  		SMTPTrustedRootFile string
   694  
   695  		Features map[string]bool
   696  	}
   697  
   698  	Syslog        cmd.SyslogConfig
   699  	OpenTelemetry cmd.OpenTelemetryConfig
   700  }
   701  
   702  func initStats(stats prometheus.Registerer) mailerStats {
   703  	sendDelay := prometheus.NewGaugeVec(
   704  		prometheus.GaugeOpts{
   705  			Name: "send_delay",
   706  			Help: "For the last batch of certificates, difference between the idealized send time and actual send time. Will always be nonzero, bigger numbers are worse",
   707  		},
   708  		[]string{"nag_group"})
   709  	stats.MustRegister(sendDelay)
   710  
   711  	sendDelayHistogram := prometheus.NewHistogramVec(
   712  		prometheus.HistogramOpts{
   713  			Name:    "send_delay_histogram",
   714  			Help:    "For each mail sent, difference between the idealized send time and actual send time. Will always be nonzero, bigger numbers are worse",
   715  			Buckets: prometheus.LinearBuckets(86400, 86400, 10),
   716  		},
   717  		[]string{"nag_group"})
   718  	stats.MustRegister(sendDelayHistogram)
   719  
   720  	nagsAtCapacity := prometheus.NewGaugeVec(
   721  		prometheus.GaugeOpts{
   722  			Name: "nags_at_capacity",
   723  			Help: "Count of nag groups at capcacity",
   724  		},
   725  		[]string{"nag_group"})
   726  	stats.MustRegister(nagsAtCapacity)
   727  
   728  	errorCount := prometheus.NewCounterVec(
   729  		prometheus.CounterOpts{
   730  			Name: "errors",
   731  			Help: "Number of errors",
   732  		},
   733  		[]string{"type"})
   734  	stats.MustRegister(errorCount)
   735  
   736  	sendLatency := prometheus.NewHistogram(
   737  		prometheus.HistogramOpts{
   738  			Name:    "send_latency",
   739  			Help:    "Time the mailer takes sending messages in seconds",
   740  			Buckets: metrics.InternetFacingBuckets,
   741  		})
   742  	stats.MustRegister(sendLatency)
   743  
   744  	processingLatency := prometheus.NewHistogram(
   745  		prometheus.HistogramOpts{
   746  			Name:    "processing_latency",
   747  			Help:    "Time the mailer takes processing certificates in seconds",
   748  			Buckets: []float64{30, 60, 75, 90, 120, 600, 3600},
   749  		})
   750  	stats.MustRegister(processingLatency)
   751  
   752  	certificatesExamined := prometheus.NewCounter(
   753  		prometheus.CounterOpts{
   754  			Name: "certificates_examined",
   755  			Help: "Number of certificates looked at that are potentially due for an expiration mail",
   756  		})
   757  	stats.MustRegister(certificatesExamined)
   758  
   759  	certificatesAlreadyRenewed := prometheus.NewCounter(
   760  		prometheus.CounterOpts{
   761  			Name: "certificates_already_renewed",
   762  			Help: "Number of certificates from certificates_examined that were ignored because they were already renewed",
   763  		})
   764  	stats.MustRegister(certificatesAlreadyRenewed)
   765  
   766  	accountsNeedingMail := prometheus.NewHistogram(
   767  		prometheus.HistogramOpts{
   768  			Name:    "certificates_per_account_needing_mail",
   769  			Help:    "After ignoring certificates_already_renewed and grouping the remaining certificates by account, how many accounts needed to get an email; grouped by how many certificates each account needed",
   770  			Buckets: []float64{0, 1, 2, 100, 1000, 10000, 100000},
   771  		})
   772  	stats.MustRegister(accountsNeedingMail)
   773  
   774  	return mailerStats{
   775  		sendDelay:                         sendDelay,
   776  		sendDelayHistogram:                sendDelayHistogram,
   777  		nagsAtCapacity:                    nagsAtCapacity,
   778  		errorCount:                        errorCount,
   779  		sendLatency:                       sendLatency,
   780  		processingLatency:                 processingLatency,
   781  		certificatesExamined:              certificatesExamined,
   782  		certificatesAlreadyRenewed:        certificatesAlreadyRenewed,
   783  		certificatesPerAccountNeedingMail: accountsNeedingMail,
   784  	}
   785  }
   786  
   787  func main() {
   788  	configFile := flag.String("config", "", "File path to the configuration file for this service")
   789  	certLimit := flag.Int("cert_limit", 0, "Count of certificates to process per expiration period")
   790  	reconnBase := flag.Duration("reconnectBase", 1*time.Second, "Base sleep duration between reconnect attempts")
   791  	reconnMax := flag.Duration("reconnectMax", 5*60*time.Second, "Max sleep duration between reconnect attempts after exponential backoff")
   792  	daemon := flag.Bool("daemon", false, "Run in daemon mode")
   793  	flag.Parse()
   794  
   795  	if *configFile == "" {
   796  		flag.Usage()
   797  		os.Exit(1)
   798  	}
   799  
   800  	var c Config
   801  	err := cmd.ReadConfigFile(*configFile, &c)
   802  	cmd.FailOnError(err, "Reading JSON config file into config structure")
   803  	err = features.Set(c.Mailer.Features)
   804  	cmd.FailOnError(err, "Failed to set feature flags")
   805  
   806  	scope, logger, oTelShutdown := cmd.StatsAndLogging(c.Syslog, c.OpenTelemetry, c.Mailer.DebugAddr)
   807  	defer oTelShutdown(context.Background())
   808  	logger.Info(cmd.VersionString())
   809  
   810  	if *daemon && c.Mailer.Frequency.Duration == 0 {
   811  		fmt.Fprintln(os.Stderr, "mailer.frequency is not set in the JSON config")
   812  		os.Exit(1)
   813  	}
   814  
   815  	if *certLimit > 0 {
   816  		c.Mailer.CertLimit = *certLimit
   817  	}
   818  	// Default to 100 if no certLimit is set
   819  	if c.Mailer.CertLimit == 0 {
   820  		c.Mailer.CertLimit = 100
   821  	}
   822  
   823  	if c.Mailer.MailsPerAddressPerDay == 0 {
   824  		c.Mailer.MailsPerAddressPerDay = math.MaxInt
   825  	}
   826  
   827  	dbMap, err := sa.InitWrappedDb(c.Mailer.DB, scope, logger)
   828  	cmd.FailOnError(err, "While initializing dbMap")
   829  
   830  	tlsConfig, err := c.Mailer.TLS.Load(scope)
   831  	cmd.FailOnError(err, "TLS config")
   832  
   833  	clk := cmd.Clock()
   834  
   835  	conn, err := bgrpc.ClientSetup(c.Mailer.SAService, tlsConfig, scope, clk)
   836  	cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
   837  	sac := sapb.NewStorageAuthorityClient(conn)
   838  
   839  	var smtpRoots *x509.CertPool
   840  	if c.Mailer.SMTPTrustedRootFile != "" {
   841  		pem, err := os.ReadFile(c.Mailer.SMTPTrustedRootFile)
   842  		cmd.FailOnError(err, "Loading trusted roots file")
   843  		smtpRoots = x509.NewCertPool()
   844  		if !smtpRoots.AppendCertsFromPEM(pem) {
   845  			cmd.FailOnError(nil, "Failed to parse root certs PEM")
   846  		}
   847  	}
   848  
   849  	// Load email template
   850  	emailTmpl, err := os.ReadFile(c.Mailer.EmailTemplate)
   851  	cmd.FailOnError(err, fmt.Sprintf("Could not read email template file [%s]", c.Mailer.EmailTemplate))
   852  	tmpl, err := template.New("expiry-email").Parse(string(emailTmpl))
   853  	cmd.FailOnError(err, "Could not parse email template")
   854  
   855  	// If there is no configured subject template, use a default
   856  	if c.Mailer.Subject == "" {
   857  		c.Mailer.Subject = defaultExpirationSubject
   858  	}
   859  	// Load subject template
   860  	subjTmpl, err := template.New("expiry-email-subject").Parse(c.Mailer.Subject)
   861  	cmd.FailOnError(err, "Could not parse email subject template")
   862  
   863  	fromAddress, err := netmail.ParseAddress(c.Mailer.From)
   864  	cmd.FailOnError(err, fmt.Sprintf("Could not parse from address: %s", c.Mailer.From))
   865  
   866  	smtpPassword, err := c.Mailer.PasswordConfig.Pass()
   867  	cmd.FailOnError(err, "Failed to load SMTP password")
   868  	mailClient := bmail.New(
   869  		c.Mailer.Server,
   870  		c.Mailer.Port,
   871  		c.Mailer.Username,
   872  		smtpPassword,
   873  		smtpRoots,
   874  		*fromAddress,
   875  		logger,
   876  		scope,
   877  		*reconnBase,
   878  		*reconnMax)
   879  
   880  	var nags durationSlice
   881  	for _, nagDuration := range c.Mailer.NagTimes {
   882  		dur, err := time.ParseDuration(nagDuration)
   883  		if err != nil {
   884  			logger.AuditErrf("Failed to parse nag duration string [%s]: %s", nagDuration, err)
   885  			return
   886  		}
   887  		// Add some padding to the nag times so we send _before_ the configured
   888  		// time rather than after. See https://github.com/letsencrypt/boulder/pull/1029
   889  		adjustedInterval := dur + c.Mailer.Frequency.Duration
   890  		nags = append(nags, adjustedInterval)
   891  	}
   892  	// Make sure durations are sorted in increasing order
   893  	sort.Sort(nags)
   894  
   895  	if c.Mailer.UpdateChunkSize > 65535 {
   896  		// MariaDB limits the number of placeholders parameters to max_uint16:
   897  		// https://github.com/MariaDB/server/blob/10.5/sql/sql_prepare.cc#L2629-L2635
   898  		cmd.Fail(fmt.Sprintf("UpdateChunkSize of %d is too big", c.Mailer.UpdateChunkSize))
   899  	}
   900  
   901  	m := mailer{
   902  		log:                 logger,
   903  		dbMap:               dbMap,
   904  		rs:                  sac,
   905  		mailer:              mailClient,
   906  		subjectTemplate:     subjTmpl,
   907  		emailTemplate:       tmpl,
   908  		nagTimes:            nags,
   909  		certificatesPerTick: c.Mailer.CertLimit,
   910  		addressLimiter:      &limiter{clk: cmd.Clock(), limit: c.Mailer.MailsPerAddressPerDay},
   911  		updateChunkSize:     c.Mailer.UpdateChunkSize,
   912  		parallelSends:       c.Mailer.ParallelSends,
   913  		clk:                 clk,
   914  		stats:               initStats(scope),
   915  	}
   916  
   917  	// Prefill this labelled stat with the possible label values, so each value is
   918  	// set to 0 on startup, rather than being missing from stats collection until
   919  	// the first mail run.
   920  	for _, expiresIn := range nags {
   921  		m.stats.nagsAtCapacity.With(prometheus.Labels{"nag_group": expiresIn.String()}).Set(0)
   922  	}
   923  
   924  	ctx, cancel := context.WithCancel(context.Background())
   925  	go cmd.CatchSignals(cancel)
   926  
   927  	if *daemon {
   928  		t := time.NewTicker(c.Mailer.Frequency.Duration)
   929  		for {
   930  			select {
   931  			case <-t.C:
   932  				err = m.findExpiringCertificates(ctx)
   933  				if err != nil && !errors.Is(err, context.Canceled) {
   934  					cmd.FailOnError(err, "expiration-mailer has failed")
   935  				}
   936  			case <-ctx.Done():
   937  				return
   938  			}
   939  		}
   940  	} else {
   941  		err = m.findExpiringCertificates(ctx)
   942  		if err != nil && !errors.Is(err, context.Canceled) {
   943  			cmd.FailOnError(err, "expiration-mailer has failed")
   944  		}
   945  	}
   946  }
   947  
   948  func init() {
   949  	cmd.RegisterCommand("expiration-mailer", main, &cmd.ConfigValidator{Config: &Config{}})
   950  }
   951  

View as plain text