...

Source file src/github.com/letsencrypt/boulder/cmd/notify-mailer/main.go

Documentation: github.com/letsencrypt/boulder/cmd/notify-mailer

     1  package notmain
     2  
     3  import (
     4  	"context"
     5  	"encoding/csv"
     6  	"encoding/json"
     7  	"errors"
     8  	"flag"
     9  	"fmt"
    10  	"io"
    11  	"net/mail"
    12  	"os"
    13  	"sort"
    14  	"strconv"
    15  	"strings"
    16  	"sync"
    17  	"text/template"
    18  	"time"
    19  
    20  	"github.com/jmhodges/clock"
    21  	"github.com/letsencrypt/boulder/cmd"
    22  	"github.com/letsencrypt/boulder/db"
    23  	blog "github.com/letsencrypt/boulder/log"
    24  	bmail "github.com/letsencrypt/boulder/mail"
    25  	"github.com/letsencrypt/boulder/metrics"
    26  	"github.com/letsencrypt/boulder/policy"
    27  	"github.com/letsencrypt/boulder/sa"
    28  )
    29  
    30  type mailer struct {
    31  	clk           clock.Clock
    32  	log           blog.Logger
    33  	dbMap         dbSelector
    34  	mailer        bmail.Mailer
    35  	subject       string
    36  	emailTemplate *template.Template
    37  	recipients    []recipient
    38  	targetRange   interval
    39  	sleepInterval time.Duration
    40  	parallelSends uint
    41  }
    42  
    43  // interval defines a range of email addresses to send to in alphabetical order.
    44  // The `start` field is inclusive and the `end` field is exclusive. To include
    45  // everything, set `end` to \xFF.
    46  type interval struct {
    47  	start string
    48  	end   string
    49  }
    50  
    51  // contactQueryResult is a receiver for queries to the `registrations` table.
    52  type contactQueryResult struct {
    53  	// ID is exported to receive the value of `id`.
    54  	ID int64
    55  
    56  	// Contact is exported to receive the value of `contact`.
    57  	Contact []byte
    58  }
    59  
    60  func (i *interval) ok() error {
    61  	if i.start > i.end {
    62  		return fmt.Errorf("interval start value (%s) is greater than end value (%s)",
    63  			i.start, i.end)
    64  	}
    65  	return nil
    66  }
    67  
    68  func (i *interval) includes(s string) bool {
    69  	return s >= i.start && s < i.end
    70  }
    71  
    72  // ok ensures that both the `targetRange` and `sleepInterval` are valid.
    73  func (m *mailer) ok() error {
    74  	err := m.targetRange.ok()
    75  	if err != nil {
    76  		return err
    77  	}
    78  
    79  	if m.sleepInterval < 0 {
    80  		return fmt.Errorf(
    81  			"sleep interval (%d) is < 0", m.sleepInterval)
    82  	}
    83  	return nil
    84  }
    85  
    86  func (m *mailer) logStatus(to string, current, total int, start time.Time) {
    87  	// Should never happen.
    88  	if total <= 0 || current < 1 || current > total {
    89  		m.log.AuditErrf("Invalid current (%d) or total (%d)", current, total)
    90  	}
    91  	completion := (float32(current) / float32(total)) * 100
    92  	now := m.clk.Now()
    93  	elapsed := now.Sub(start)
    94  	m.log.Infof("Sending message (%d) of (%d) to address (%s) [%.2f%%] time elapsed (%s)",
    95  		current, total, to, completion, elapsed)
    96  }
    97  
    98  func sortAddresses(input addressToRecipientMap) []string {
    99  	var addresses []string
   100  	for address := range input {
   101  		addresses = append(addresses, address)
   102  	}
   103  	sort.Strings(addresses)
   104  	return addresses
   105  }
   106  
   107  // makeMessageBody is a helper for mailer.run() that's split out for the
   108  // purposes of testing.
   109  func (m *mailer) makeMessageBody(recipients []recipient) (string, error) {
   110  	var messageBody strings.Builder
   111  
   112  	err := m.emailTemplate.Execute(&messageBody, recipients)
   113  	if err != nil {
   114  		return "", err
   115  	}
   116  
   117  	if messageBody.Len() == 0 {
   118  		return "", errors.New("templating resulted in an empty message body")
   119  	}
   120  	return messageBody.String(), nil
   121  }
   122  
   123  func (m *mailer) run(ctx context.Context) error {
   124  	err := m.ok()
   125  	if err != nil {
   126  		return err
   127  	}
   128  
   129  	totalRecipients := len(m.recipients)
   130  	m.log.Infof("Resolving addresses for (%d) recipients", totalRecipients)
   131  
   132  	addressToRecipient, err := m.resolveAddresses(ctx)
   133  	if err != nil {
   134  		return err
   135  	}
   136  
   137  	totalAddresses := len(addressToRecipient)
   138  	if totalAddresses == 0 {
   139  		return errors.New("0 recipients remained after resolving addresses")
   140  	}
   141  
   142  	m.log.Infof("%d recipients were resolved to %d addresses", totalRecipients, totalAddresses)
   143  
   144  	var mostRecipients string
   145  	var mostRecipientsLen int
   146  	for k, v := range addressToRecipient {
   147  		if len(v) > mostRecipientsLen {
   148  			mostRecipientsLen = len(v)
   149  			mostRecipients = k
   150  		}
   151  	}
   152  
   153  	m.log.Infof("Address %q was associated with the most recipients (%d)",
   154  		mostRecipients, mostRecipientsLen)
   155  
   156  	type work struct {
   157  		index   int
   158  		address string
   159  	}
   160  
   161  	var wg sync.WaitGroup
   162  	workChan := make(chan work, totalAddresses)
   163  
   164  	startTime := m.clk.Now()
   165  	sortedAddresses := sortAddresses(addressToRecipient)
   166  
   167  	if (m.targetRange.start != "" && m.targetRange.start > sortedAddresses[totalAddresses-1]) ||
   168  		(m.targetRange.end != "" && m.targetRange.end < sortedAddresses[0]) {
   169  		return errors.New("Zero found addresses fall inside target range")
   170  	}
   171  
   172  	go func(ch chan<- work) {
   173  		for i, address := range sortedAddresses {
   174  			ch <- work{i, address}
   175  		}
   176  		close(workChan)
   177  	}(workChan)
   178  
   179  	if m.parallelSends < 1 {
   180  		m.parallelSends = 1
   181  	}
   182  
   183  	for senderNum := uint(0); senderNum < m.parallelSends; senderNum++ {
   184  		// For politeness' sake, don't open more than 1 new connection per
   185  		// second.
   186  		if senderNum > 0 {
   187  			m.clk.Sleep(time.Second)
   188  		}
   189  
   190  		conn, err := m.mailer.Connect()
   191  		if err != nil {
   192  			return fmt.Errorf("connecting parallel sender %d: %w", senderNum, err)
   193  		}
   194  
   195  		wg.Add(1)
   196  		go func(conn bmail.Conn, ch <-chan work) {
   197  			defer wg.Done()
   198  			for w := range ch {
   199  				if !m.targetRange.includes(w.address) {
   200  					m.log.Debugf("Address %q is outside of target range, skipping", w.address)
   201  					continue
   202  				}
   203  
   204  				err := policy.ValidEmail(w.address)
   205  				if err != nil {
   206  					m.log.Infof("Skipping %q due to policy violation: %s", w.address, err)
   207  					continue
   208  				}
   209  
   210  				recipients := addressToRecipient[w.address]
   211  				m.logStatus(w.address, w.index+1, totalAddresses, startTime)
   212  
   213  				messageBody, err := m.makeMessageBody(recipients)
   214  				if err != nil {
   215  					m.log.Errf("Skipping %q due to templating error: %s", w.address, err)
   216  					continue
   217  				}
   218  
   219  				err = conn.SendMail([]string{w.address}, m.subject, messageBody)
   220  				if err != nil {
   221  					var badAddrErr bmail.BadAddressSMTPError
   222  					if errors.As(err, &badAddrErr) {
   223  						m.log.Errf("address %q was rejected by server: %s", w.address, err)
   224  						continue
   225  					}
   226  					m.log.AuditErrf("while sending mail (%d) of (%d) to address %q: %s",
   227  						w.index, len(sortedAddresses), w.address, err)
   228  				}
   229  
   230  				m.clk.Sleep(m.sleepInterval)
   231  			}
   232  			conn.Close()
   233  		}(conn, workChan)
   234  	}
   235  	wg.Wait()
   236  
   237  	return nil
   238  }
   239  
   240  // resolveAddresses creates a mapping of email addresses to (a list of)
   241  // `recipient`s that resolve to that email address.
   242  func (m *mailer) resolveAddresses(ctx context.Context) (addressToRecipientMap, error) {
   243  	result := make(addressToRecipientMap, len(m.recipients))
   244  	for _, recipient := range m.recipients {
   245  		addresses, err := getAddressForID(ctx, recipient.id, m.dbMap)
   246  		if err != nil {
   247  			return nil, err
   248  		}
   249  
   250  		for _, address := range addresses {
   251  			parsed, err := mail.ParseAddress(address)
   252  			if err != nil {
   253  				m.log.Errf("Unparsable address %q, skipping ID (%d)", address, recipient.id)
   254  				continue
   255  			}
   256  			result[parsed.Address] = append(result[parsed.Address], recipient)
   257  		}
   258  	}
   259  	return result, nil
   260  }
   261  
   262  // dbSelector abstracts over a subset of methods from `borp.DbMap` objects to
   263  // facilitate mocking in unit tests.
   264  type dbSelector interface {
   265  	SelectOne(ctx context.Context, holder interface{}, query string, args ...interface{}) error
   266  }
   267  
   268  // getAddressForID queries the database for the email address associated with
   269  // the provided registration ID.
   270  func getAddressForID(ctx context.Context, id int64, dbMap dbSelector) ([]string, error) {
   271  	var result contactQueryResult
   272  	err := dbMap.SelectOne(ctx, &result,
   273  		`SELECT id,
   274  			contact
   275  		FROM registrations
   276  		WHERE contact NOT IN ('[]', 'null')
   277  			AND id = :id;`,
   278  		map[string]interface{}{"id": id})
   279  	if err != nil {
   280  		if db.IsNoRows(err) {
   281  			return []string{}, nil
   282  		}
   283  		return nil, err
   284  	}
   285  
   286  	var contacts []string
   287  	err = json.Unmarshal(result.Contact, &contacts)
   288  	if err != nil {
   289  		return nil, err
   290  	}
   291  
   292  	var addresses []string
   293  	for _, contact := range contacts {
   294  		if strings.HasPrefix(contact, "mailto:") {
   295  			addresses = append(addresses, strings.TrimPrefix(contact, "mailto:"))
   296  		}
   297  	}
   298  	return addresses, nil
   299  }
   300  
   301  // recipient represents a single record from the recipient list file. The 'id'
   302  // column is parsed to the 'id' field, all additional data will be parsed to a
   303  // mapping of column name to value in the 'Data' field. Please inform SRE if you
   304  // make any changes to the exported fields of this struct. These fields are
   305  // referenced in operationally critical e-mail templates used to notify
   306  // subscribers during incident response.
   307  type recipient struct {
   308  	// id is the subscriber's ID.
   309  	id int64
   310  
   311  	// Data is a mapping of column name to value parsed from a single record in
   312  	// the provided recipient list file. It's exported so the contents can be
   313  	// accessed by the the template package. Please inform SRE if you make any
   314  	// changes to this field.
   315  	Data map[string]string
   316  }
   317  
   318  // addressToRecipientMap maps email addresses to a list of `recipient`s that
   319  // resolve to that email address.
   320  type addressToRecipientMap map[string][]recipient
   321  
   322  // readRecipientsList parses the contents of a recipient list file into a list
   323  // of `recipient` objects.
   324  func readRecipientsList(filename string, delimiter rune) ([]recipient, string, error) {
   325  	f, err := os.Open(filename)
   326  	if err != nil {
   327  		return nil, "", err
   328  	}
   329  
   330  	reader := csv.NewReader(f)
   331  	reader.Comma = delimiter
   332  
   333  	// Parse header.
   334  	record, err := reader.Read()
   335  	if err != nil {
   336  		return nil, "", fmt.Errorf("failed to parse header: %w", err)
   337  	}
   338  
   339  	if record[0] != "id" {
   340  		return nil, "", errors.New("header must begin with \"id\"")
   341  	}
   342  
   343  	// Collect the names of each header column after `id`.
   344  	var dataColumns []string
   345  	for _, v := range record[1:] {
   346  		dataColumns = append(dataColumns, strings.TrimSpace(v))
   347  		if len(v) == 0 {
   348  			return nil, "", errors.New("header contains an empty column")
   349  		}
   350  	}
   351  
   352  	var recordsWithEmptyColumns []int64
   353  	var recordsWithDuplicateIDs []int64
   354  	var probsBuff strings.Builder
   355  	stringProbs := func() string {
   356  		if len(recordsWithEmptyColumns) != 0 {
   357  			fmt.Fprintf(&probsBuff, "ID(s) %v contained empty columns and ",
   358  				recordsWithEmptyColumns)
   359  		}
   360  
   361  		if len(recordsWithDuplicateIDs) != 0 {
   362  			fmt.Fprintf(&probsBuff, "ID(s) %v were skipped as duplicates",
   363  				recordsWithDuplicateIDs)
   364  		}
   365  
   366  		if probsBuff.Len() == 0 {
   367  			return ""
   368  		}
   369  		return strings.TrimSuffix(probsBuff.String(), " and ")
   370  	}
   371  
   372  	// Parse records.
   373  	recipientIDs := make(map[int64]bool)
   374  	var recipients []recipient
   375  	for {
   376  		record, err := reader.Read()
   377  		if errors.Is(err, io.EOF) {
   378  			// Finished parsing the file.
   379  			if len(recipients) == 0 {
   380  				return nil, stringProbs(), errors.New("no records after header")
   381  			}
   382  			return recipients, stringProbs(), nil
   383  		} else if err != nil {
   384  			return nil, "", err
   385  		}
   386  
   387  		// Ensure the first column of each record can be parsed as a valid
   388  		// registration ID.
   389  		recordID := record[0]
   390  		id, err := strconv.ParseInt(recordID, 10, 64)
   391  		if err != nil {
   392  			return nil, "", fmt.Errorf(
   393  				"%q couldn't be parsed as a registration ID due to: %s", recordID, err)
   394  		}
   395  
   396  		// Skip records that have the same ID as those read previously.
   397  		if recipientIDs[id] {
   398  			recordsWithDuplicateIDs = append(recordsWithDuplicateIDs, id)
   399  			continue
   400  		}
   401  		recipientIDs[id] = true
   402  
   403  		// Collect the columns of data after `id` into a map.
   404  		var emptyColumn bool
   405  		data := make(map[string]string)
   406  		for i, v := range record[1:] {
   407  			if len(v) == 0 {
   408  				emptyColumn = true
   409  			}
   410  			data[dataColumns[i]] = v
   411  		}
   412  
   413  		// Only used for logging.
   414  		if emptyColumn {
   415  			recordsWithEmptyColumns = append(recordsWithEmptyColumns, id)
   416  		}
   417  
   418  		recipients = append(recipients, recipient{id, data})
   419  	}
   420  }
   421  
   422  const usageIntro = `
   423  Introduction:
   424  
   425  The notification mailer exists to send a message to the contact associated
   426  with a list of registration IDs. The attributes of the message (from address,
   427  subject, and message content) are provided by the command line arguments. The
   428  message content is provided as a path to a template file via the -body argument.
   429  
   430  Provide a list of recipient user ids in a CSV file passed with the -recipientList
   431  flag. The CSV file must have "id" as the first column and may have additional
   432  fields to be interpolated into the email template:
   433  
   434  	id, lastIssuance
   435  	1234, "from example.com 2018-12-01"
   436  	5678, "from example.net 2018-12-13"
   437  
   438  The additional fields will be interpolated with Golang templating, e.g.:
   439  
   440    Your last issuance on each account was:
   441  		{{ range . }} {{ .Data.lastIssuance }}
   442  		{{ end }}
   443  
   444  To help the operator gain confidence in the mailing run before committing fully
   445  three safety features are supported: dry runs, intervals and a sleep between emails.
   446  
   447  The -dryRun=true flag will use a mock mailer that prints message content to
   448  stdout instead of performing an SMTP transaction with a real mailserver. This
   449  can be used when the initial parameters are being tweaked to ensure no real
   450  emails are sent. Using -dryRun=false will send real email.
   451  
   452  Intervals supported via the -start and -end arguments. Only email addresses that
   453  are alphabetically between the -start and -end strings will be sent. This can be used
   454  to break up sending into batches, or more likely to resume sending if a batch is killed,
   455  without resending messages that have already been sent. The -start flag is inclusive and
   456  the -end flag is exclusive.
   457  
   458  Notify-mailer de-duplicates email addresses and groups together the resulting recipient
   459  structs, so a person who has multiple accounts using the same address will only receive
   460  one email.
   461  
   462  During mailing the -sleep argument is used to space out individual messages.
   463  This can be used to ensure that the mailing happens at a steady pace with ample
   464  opportunity for the operator to terminate early in the event of error. The
   465  -sleep flag honours durations with a unit suffix (e.g. 1m for 1 minute, 10s for
   466  10 seconds, etc). Using -sleep=0 will disable the sleep and send at full speed.
   467  
   468  Examples:
   469    Send an email with subject "Hello!" from the email "hello@goodbye.com" with
   470    the contents read from "test_msg_body.txt" to every email associated with the
   471    registration IDs listed in "test_reg_recipients.json", sleeping 10 seconds
   472    between each message:
   473  
   474    notify-mailer -config test/config/notify-mailer.json -body
   475      cmd/notify-mailer/testdata/test_msg_body.txt -from hello@goodbye.com
   476      -recipientList cmd/notify-mailer/testdata/test_msg_recipients.csv -subject "Hello!"
   477      -sleep 10s -dryRun=false
   478  
   479    Do the same, but only to example@example.com:
   480  
   481    notify-mailer -config test/config/notify-mailer.json
   482      -body cmd/notify-mailer/testdata/test_msg_body.txt -from hello@goodbye.com
   483      -recipientList cmd/notify-mailer/testdata/test_msg_recipients.csv -subject "Hello!"
   484      -start example@example.com -end example@example.comX
   485  
   486    Send the message starting with example@example.com and emailing every address that's
   487  	alphabetically higher:
   488  
   489    notify-mailer -config test/config/notify-mailer.json 
   490      -body cmd/notify-mailer/testdata/test_msg_body.txt -from hello@goodbye.com 
   491      -recipientList cmd/notify-mailer/testdata/test_msg_recipients.csv -subject "Hello!"
   492      -start example@example.com
   493  
   494  Required arguments:
   495  - body
   496  - config
   497  - from
   498  - subject
   499  - recipientList`
   500  
   501  type Config struct {
   502  	NotifyMailer struct {
   503  		DB cmd.DBConfig
   504  		cmd.SMTPConfig
   505  	}
   506  	Syslog cmd.SyslogConfig
   507  }
   508  
   509  func main() {
   510  	from := flag.String("from", "", "From header for emails. Must be a bare email address.")
   511  	subject := flag.String("subject", "", "Subject of emails")
   512  	recipientListFile := flag.String("recipientList", "", "File containing a CSV list of registration IDs and extra info.")
   513  	parseAsTSV := flag.Bool("tsv", false, "Parse the recipient list file as a TSV.")
   514  	bodyFile := flag.String("body", "", "File containing the email body in Golang template format.")
   515  	dryRun := flag.Bool("dryRun", true, "Whether to do a dry run.")
   516  	sleep := flag.Duration("sleep", 500*time.Millisecond, "How long to sleep between emails.")
   517  	parallelSends := flag.Uint("parallelSends", 1, "How many parallel goroutines should process emails")
   518  	start := flag.String("start", "", "Alphabetically lowest email address to include.")
   519  	end := flag.String("end", "\xFF", "Alphabetically highest email address (exclusive).")
   520  	reconnBase := flag.Duration("reconnectBase", 1*time.Second, "Base sleep duration between reconnect attempts")
   521  	reconnMax := flag.Duration("reconnectMax", 5*60*time.Second, "Max sleep duration between reconnect attempts after exponential backoff")
   522  	configFile := flag.String("config", "", "File containing a JSON config.")
   523  
   524  	flag.Usage = func() {
   525  		fmt.Fprintf(os.Stderr, "%s\n\n", usageIntro)
   526  		fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0])
   527  		flag.PrintDefaults()
   528  	}
   529  
   530  	// Validate required args.
   531  	flag.Parse()
   532  	if *from == "" || *subject == "" || *bodyFile == "" || *configFile == "" || *recipientListFile == "" {
   533  		flag.Usage()
   534  		os.Exit(1)
   535  	}
   536  
   537  	configData, err := os.ReadFile(*configFile)
   538  	cmd.FailOnError(err, "Couldn't load JSON config file")
   539  
   540  	// Parse JSON config.
   541  	var cfg Config
   542  	err = json.Unmarshal(configData, &cfg)
   543  	cmd.FailOnError(err, "Couldn't unmarshal JSON config file")
   544  
   545  	log := cmd.NewLogger(cfg.Syslog)
   546  	log.Info(cmd.VersionString())
   547  
   548  	dbMap, err := sa.InitWrappedDb(cfg.NotifyMailer.DB, nil, log)
   549  	cmd.FailOnError(err, "While initializing dbMap")
   550  
   551  	// Load and parse message body.
   552  	template, err := template.ParseFiles(*bodyFile)
   553  	cmd.FailOnError(err, "Couldn't parse message template")
   554  
   555  	// Ensure that in the event of a missing key, an informative error is
   556  	// returned.
   557  	template.Option("missingkey=error")
   558  
   559  	address, err := mail.ParseAddress(*from)
   560  	cmd.FailOnError(err, fmt.Sprintf("Couldn't parse %q to address", *from))
   561  
   562  	recipientListDelimiter := ','
   563  	if *parseAsTSV {
   564  		recipientListDelimiter = '\t'
   565  	}
   566  	recipients, probs, err := readRecipientsList(*recipientListFile, recipientListDelimiter)
   567  	cmd.FailOnError(err, "Couldn't populate recipients")
   568  
   569  	if probs != "" {
   570  		log.Infof("While reading the recipient list file %s", probs)
   571  	}
   572  
   573  	var mailClient bmail.Mailer
   574  	if *dryRun {
   575  		log.Infof("Starting %s in dry-run mode", cmd.VersionString())
   576  		mailClient = bmail.NewDryRun(*address, log)
   577  	} else {
   578  		log.Infof("Starting %s", cmd.VersionString())
   579  		smtpPassword, err := cfg.NotifyMailer.PasswordConfig.Pass()
   580  		cmd.FailOnError(err, "Couldn't load SMTP password from file")
   581  
   582  		mailClient = bmail.New(
   583  			cfg.NotifyMailer.Server,
   584  			cfg.NotifyMailer.Port,
   585  			cfg.NotifyMailer.Username,
   586  			smtpPassword,
   587  			nil,
   588  			*address,
   589  			log,
   590  			metrics.NoopRegisterer,
   591  			*reconnBase,
   592  			*reconnMax)
   593  	}
   594  
   595  	m := mailer{
   596  		clk:           cmd.Clock(),
   597  		log:           log,
   598  		dbMap:         dbMap,
   599  		mailer:        mailClient,
   600  		subject:       *subject,
   601  		recipients:    recipients,
   602  		emailTemplate: template,
   603  		targetRange: interval{
   604  			start: *start,
   605  			end:   *end,
   606  		},
   607  		sleepInterval: *sleep,
   608  		parallelSends: *parallelSends,
   609  	}
   610  
   611  	err = m.run(context.TODO())
   612  	cmd.FailOnError(err, "Couldn't complete")
   613  
   614  	log.Info("Completed successfully")
   615  }
   616  
   617  func init() {
   618  	cmd.RegisterCommand("notify-mailer", main, &cmd.ConfigValidator{Config: &Config{}})
   619  }
   620  

View as plain text