1 package notmain
2
3 import (
4 "context"
5 "encoding/csv"
6 "encoding/json"
7 "errors"
8 "flag"
9 "fmt"
10 "io"
11 "net/mail"
12 "os"
13 "sort"
14 "strconv"
15 "strings"
16 "sync"
17 "text/template"
18 "time"
19
20 "github.com/jmhodges/clock"
21 "github.com/letsencrypt/boulder/cmd"
22 "github.com/letsencrypt/boulder/db"
23 blog "github.com/letsencrypt/boulder/log"
24 bmail "github.com/letsencrypt/boulder/mail"
25 "github.com/letsencrypt/boulder/metrics"
26 "github.com/letsencrypt/boulder/policy"
27 "github.com/letsencrypt/boulder/sa"
28 )
29
30 type mailer struct {
31 clk clock.Clock
32 log blog.Logger
33 dbMap dbSelector
34 mailer bmail.Mailer
35 subject string
36 emailTemplate *template.Template
37 recipients []recipient
38 targetRange interval
39 sleepInterval time.Duration
40 parallelSends uint
41 }
42
43
44
45
46 type interval struct {
47 start string
48 end string
49 }
50
51
52 type contactQueryResult struct {
53
54 ID int64
55
56
57 Contact []byte
58 }
59
60 func (i *interval) ok() error {
61 if i.start > i.end {
62 return fmt.Errorf("interval start value (%s) is greater than end value (%s)",
63 i.start, i.end)
64 }
65 return nil
66 }
67
68 func (i *interval) includes(s string) bool {
69 return s >= i.start && s < i.end
70 }
71
72
73 func (m *mailer) ok() error {
74 err := m.targetRange.ok()
75 if err != nil {
76 return err
77 }
78
79 if m.sleepInterval < 0 {
80 return fmt.Errorf(
81 "sleep interval (%d) is < 0", m.sleepInterval)
82 }
83 return nil
84 }
85
86 func (m *mailer) logStatus(to string, current, total int, start time.Time) {
87
88 if total <= 0 || current < 1 || current > total {
89 m.log.AuditErrf("Invalid current (%d) or total (%d)", current, total)
90 }
91 completion := (float32(current) / float32(total)) * 100
92 now := m.clk.Now()
93 elapsed := now.Sub(start)
94 m.log.Infof("Sending message (%d) of (%d) to address (%s) [%.2f%%] time elapsed (%s)",
95 current, total, to, completion, elapsed)
96 }
97
98 func sortAddresses(input addressToRecipientMap) []string {
99 var addresses []string
100 for address := range input {
101 addresses = append(addresses, address)
102 }
103 sort.Strings(addresses)
104 return addresses
105 }
106
107
108
109 func (m *mailer) makeMessageBody(recipients []recipient) (string, error) {
110 var messageBody strings.Builder
111
112 err := m.emailTemplate.Execute(&messageBody, recipients)
113 if err != nil {
114 return "", err
115 }
116
117 if messageBody.Len() == 0 {
118 return "", errors.New("templating resulted in an empty message body")
119 }
120 return messageBody.String(), nil
121 }
122
123 func (m *mailer) run(ctx context.Context) error {
124 err := m.ok()
125 if err != nil {
126 return err
127 }
128
129 totalRecipients := len(m.recipients)
130 m.log.Infof("Resolving addresses for (%d) recipients", totalRecipients)
131
132 addressToRecipient, err := m.resolveAddresses(ctx)
133 if err != nil {
134 return err
135 }
136
137 totalAddresses := len(addressToRecipient)
138 if totalAddresses == 0 {
139 return errors.New("0 recipients remained after resolving addresses")
140 }
141
142 m.log.Infof("%d recipients were resolved to %d addresses", totalRecipients, totalAddresses)
143
144 var mostRecipients string
145 var mostRecipientsLen int
146 for k, v := range addressToRecipient {
147 if len(v) > mostRecipientsLen {
148 mostRecipientsLen = len(v)
149 mostRecipients = k
150 }
151 }
152
153 m.log.Infof("Address %q was associated with the most recipients (%d)",
154 mostRecipients, mostRecipientsLen)
155
156 type work struct {
157 index int
158 address string
159 }
160
161 var wg sync.WaitGroup
162 workChan := make(chan work, totalAddresses)
163
164 startTime := m.clk.Now()
165 sortedAddresses := sortAddresses(addressToRecipient)
166
167 if (m.targetRange.start != "" && m.targetRange.start > sortedAddresses[totalAddresses-1]) ||
168 (m.targetRange.end != "" && m.targetRange.end < sortedAddresses[0]) {
169 return errors.New("Zero found addresses fall inside target range")
170 }
171
172 go func(ch chan<- work) {
173 for i, address := range sortedAddresses {
174 ch <- work{i, address}
175 }
176 close(workChan)
177 }(workChan)
178
179 if m.parallelSends < 1 {
180 m.parallelSends = 1
181 }
182
183 for senderNum := uint(0); senderNum < m.parallelSends; senderNum++ {
184
185
186 if senderNum > 0 {
187 m.clk.Sleep(time.Second)
188 }
189
190 conn, err := m.mailer.Connect()
191 if err != nil {
192 return fmt.Errorf("connecting parallel sender %d: %w", senderNum, err)
193 }
194
195 wg.Add(1)
196 go func(conn bmail.Conn, ch <-chan work) {
197 defer wg.Done()
198 for w := range ch {
199 if !m.targetRange.includes(w.address) {
200 m.log.Debugf("Address %q is outside of target range, skipping", w.address)
201 continue
202 }
203
204 err := policy.ValidEmail(w.address)
205 if err != nil {
206 m.log.Infof("Skipping %q due to policy violation: %s", w.address, err)
207 continue
208 }
209
210 recipients := addressToRecipient[w.address]
211 m.logStatus(w.address, w.index+1, totalAddresses, startTime)
212
213 messageBody, err := m.makeMessageBody(recipients)
214 if err != nil {
215 m.log.Errf("Skipping %q due to templating error: %s", w.address, err)
216 continue
217 }
218
219 err = conn.SendMail([]string{w.address}, m.subject, messageBody)
220 if err != nil {
221 var badAddrErr bmail.BadAddressSMTPError
222 if errors.As(err, &badAddrErr) {
223 m.log.Errf("address %q was rejected by server: %s", w.address, err)
224 continue
225 }
226 m.log.AuditErrf("while sending mail (%d) of (%d) to address %q: %s",
227 w.index, len(sortedAddresses), w.address, err)
228 }
229
230 m.clk.Sleep(m.sleepInterval)
231 }
232 conn.Close()
233 }(conn, workChan)
234 }
235 wg.Wait()
236
237 return nil
238 }
239
240
241
242 func (m *mailer) resolveAddresses(ctx context.Context) (addressToRecipientMap, error) {
243 result := make(addressToRecipientMap, len(m.recipients))
244 for _, recipient := range m.recipients {
245 addresses, err := getAddressForID(ctx, recipient.id, m.dbMap)
246 if err != nil {
247 return nil, err
248 }
249
250 for _, address := range addresses {
251 parsed, err := mail.ParseAddress(address)
252 if err != nil {
253 m.log.Errf("Unparsable address %q, skipping ID (%d)", address, recipient.id)
254 continue
255 }
256 result[parsed.Address] = append(result[parsed.Address], recipient)
257 }
258 }
259 return result, nil
260 }
261
262
263
264 type dbSelector interface {
265 SelectOne(ctx context.Context, holder interface{}, query string, args ...interface{}) error
266 }
267
268
269
270 func getAddressForID(ctx context.Context, id int64, dbMap dbSelector) ([]string, error) {
271 var result contactQueryResult
272 err := dbMap.SelectOne(ctx, &result,
273 `SELECT id,
274 contact
275 FROM registrations
276 WHERE contact NOT IN ('[]', 'null')
277 AND id = :id;`,
278 map[string]interface{}{"id": id})
279 if err != nil {
280 if db.IsNoRows(err) {
281 return []string{}, nil
282 }
283 return nil, err
284 }
285
286 var contacts []string
287 err = json.Unmarshal(result.Contact, &contacts)
288 if err != nil {
289 return nil, err
290 }
291
292 var addresses []string
293 for _, contact := range contacts {
294 if strings.HasPrefix(contact, "mailto:") {
295 addresses = append(addresses, strings.TrimPrefix(contact, "mailto:"))
296 }
297 }
298 return addresses, nil
299 }
300
301
302
303
304
305
306
307 type recipient struct {
308
309 id int64
310
311
312
313
314
315 Data map[string]string
316 }
317
318
319
320 type addressToRecipientMap map[string][]recipient
321
322
323
324 func readRecipientsList(filename string, delimiter rune) ([]recipient, string, error) {
325 f, err := os.Open(filename)
326 if err != nil {
327 return nil, "", err
328 }
329
330 reader := csv.NewReader(f)
331 reader.Comma = delimiter
332
333
334 record, err := reader.Read()
335 if err != nil {
336 return nil, "", fmt.Errorf("failed to parse header: %w", err)
337 }
338
339 if record[0] != "id" {
340 return nil, "", errors.New("header must begin with \"id\"")
341 }
342
343
344 var dataColumns []string
345 for _, v := range record[1:] {
346 dataColumns = append(dataColumns, strings.TrimSpace(v))
347 if len(v) == 0 {
348 return nil, "", errors.New("header contains an empty column")
349 }
350 }
351
352 var recordsWithEmptyColumns []int64
353 var recordsWithDuplicateIDs []int64
354 var probsBuff strings.Builder
355 stringProbs := func() string {
356 if len(recordsWithEmptyColumns) != 0 {
357 fmt.Fprintf(&probsBuff, "ID(s) %v contained empty columns and ",
358 recordsWithEmptyColumns)
359 }
360
361 if len(recordsWithDuplicateIDs) != 0 {
362 fmt.Fprintf(&probsBuff, "ID(s) %v were skipped as duplicates",
363 recordsWithDuplicateIDs)
364 }
365
366 if probsBuff.Len() == 0 {
367 return ""
368 }
369 return strings.TrimSuffix(probsBuff.String(), " and ")
370 }
371
372
373 recipientIDs := make(map[int64]bool)
374 var recipients []recipient
375 for {
376 record, err := reader.Read()
377 if errors.Is(err, io.EOF) {
378
379 if len(recipients) == 0 {
380 return nil, stringProbs(), errors.New("no records after header")
381 }
382 return recipients, stringProbs(), nil
383 } else if err != nil {
384 return nil, "", err
385 }
386
387
388
389 recordID := record[0]
390 id, err := strconv.ParseInt(recordID, 10, 64)
391 if err != nil {
392 return nil, "", fmt.Errorf(
393 "%q couldn't be parsed as a registration ID due to: %s", recordID, err)
394 }
395
396
397 if recipientIDs[id] {
398 recordsWithDuplicateIDs = append(recordsWithDuplicateIDs, id)
399 continue
400 }
401 recipientIDs[id] = true
402
403
404 var emptyColumn bool
405 data := make(map[string]string)
406 for i, v := range record[1:] {
407 if len(v) == 0 {
408 emptyColumn = true
409 }
410 data[dataColumns[i]] = v
411 }
412
413
414 if emptyColumn {
415 recordsWithEmptyColumns = append(recordsWithEmptyColumns, id)
416 }
417
418 recipients = append(recipients, recipient{id, data})
419 }
420 }
421
422 const usageIntro = `
423 Introduction:
424
425 The notification mailer exists to send a message to the contact associated
426 with a list of registration IDs. The attributes of the message (from address,
427 subject, and message content) are provided by the command line arguments. The
428 message content is provided as a path to a template file via the -body argument.
429
430 Provide a list of recipient user ids in a CSV file passed with the -recipientList
431 flag. The CSV file must have "id" as the first column and may have additional
432 fields to be interpolated into the email template:
433
434 id, lastIssuance
435 1234, "from example.com 2018-12-01"
436 5678, "from example.net 2018-12-13"
437
438 The additional fields will be interpolated with Golang templating, e.g.:
439
440 Your last issuance on each account was:
441 {{ range . }} {{ .Data.lastIssuance }}
442 {{ end }}
443
444 To help the operator gain confidence in the mailing run before committing fully
445 three safety features are supported: dry runs, intervals and a sleep between emails.
446
447 The -dryRun=true flag will use a mock mailer that prints message content to
448 stdout instead of performing an SMTP transaction with a real mailserver. This
449 can be used when the initial parameters are being tweaked to ensure no real
450 emails are sent. Using -dryRun=false will send real email.
451
452 Intervals supported via the -start and -end arguments. Only email addresses that
453 are alphabetically between the -start and -end strings will be sent. This can be used
454 to break up sending into batches, or more likely to resume sending if a batch is killed,
455 without resending messages that have already been sent. The -start flag is inclusive and
456 the -end flag is exclusive.
457
458 Notify-mailer de-duplicates email addresses and groups together the resulting recipient
459 structs, so a person who has multiple accounts using the same address will only receive
460 one email.
461
462 During mailing the -sleep argument is used to space out individual messages.
463 This can be used to ensure that the mailing happens at a steady pace with ample
464 opportunity for the operator to terminate early in the event of error. The
465 -sleep flag honours durations with a unit suffix (e.g. 1m for 1 minute, 10s for
466 10 seconds, etc). Using -sleep=0 will disable the sleep and send at full speed.
467
468 Examples:
469 Send an email with subject "Hello!" from the email "hello@goodbye.com" with
470 the contents read from "test_msg_body.txt" to every email associated with the
471 registration IDs listed in "test_reg_recipients.json", sleeping 10 seconds
472 between each message:
473
474 notify-mailer -config test/config/notify-mailer.json -body
475 cmd/notify-mailer/testdata/test_msg_body.txt -from hello@goodbye.com
476 -recipientList cmd/notify-mailer/testdata/test_msg_recipients.csv -subject "Hello!"
477 -sleep 10s -dryRun=false
478
479 Do the same, but only to example@example.com:
480
481 notify-mailer -config test/config/notify-mailer.json
482 -body cmd/notify-mailer/testdata/test_msg_body.txt -from hello@goodbye.com
483 -recipientList cmd/notify-mailer/testdata/test_msg_recipients.csv -subject "Hello!"
484 -start example@example.com -end example@example.comX
485
486 Send the message starting with example@example.com and emailing every address that's
487 alphabetically higher:
488
489 notify-mailer -config test/config/notify-mailer.json
490 -body cmd/notify-mailer/testdata/test_msg_body.txt -from hello@goodbye.com
491 -recipientList cmd/notify-mailer/testdata/test_msg_recipients.csv -subject "Hello!"
492 -start example@example.com
493
494 Required arguments:
495 - body
496 - config
497 - from
498 - subject
499 - recipientList`
500
501 type Config struct {
502 NotifyMailer struct {
503 DB cmd.DBConfig
504 cmd.SMTPConfig
505 }
506 Syslog cmd.SyslogConfig
507 }
508
509 func main() {
510 from := flag.String("from", "", "From header for emails. Must be a bare email address.")
511 subject := flag.String("subject", "", "Subject of emails")
512 recipientListFile := flag.String("recipientList", "", "File containing a CSV list of registration IDs and extra info.")
513 parseAsTSV := flag.Bool("tsv", false, "Parse the recipient list file as a TSV.")
514 bodyFile := flag.String("body", "", "File containing the email body in Golang template format.")
515 dryRun := flag.Bool("dryRun", true, "Whether to do a dry run.")
516 sleep := flag.Duration("sleep", 500*time.Millisecond, "How long to sleep between emails.")
517 parallelSends := flag.Uint("parallelSends", 1, "How many parallel goroutines should process emails")
518 start := flag.String("start", "", "Alphabetically lowest email address to include.")
519 end := flag.String("end", "\xFF", "Alphabetically highest email address (exclusive).")
520 reconnBase := flag.Duration("reconnectBase", 1*time.Second, "Base sleep duration between reconnect attempts")
521 reconnMax := flag.Duration("reconnectMax", 5*60*time.Second, "Max sleep duration between reconnect attempts after exponential backoff")
522 configFile := flag.String("config", "", "File containing a JSON config.")
523
524 flag.Usage = func() {
525 fmt.Fprintf(os.Stderr, "%s\n\n", usageIntro)
526 fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0])
527 flag.PrintDefaults()
528 }
529
530
531 flag.Parse()
532 if *from == "" || *subject == "" || *bodyFile == "" || *configFile == "" || *recipientListFile == "" {
533 flag.Usage()
534 os.Exit(1)
535 }
536
537 configData, err := os.ReadFile(*configFile)
538 cmd.FailOnError(err, "Couldn't load JSON config file")
539
540
541 var cfg Config
542 err = json.Unmarshal(configData, &cfg)
543 cmd.FailOnError(err, "Couldn't unmarshal JSON config file")
544
545 log := cmd.NewLogger(cfg.Syslog)
546 log.Info(cmd.VersionString())
547
548 dbMap, err := sa.InitWrappedDb(cfg.NotifyMailer.DB, nil, log)
549 cmd.FailOnError(err, "While initializing dbMap")
550
551
552 template, err := template.ParseFiles(*bodyFile)
553 cmd.FailOnError(err, "Couldn't parse message template")
554
555
556
557 template.Option("missingkey=error")
558
559 address, err := mail.ParseAddress(*from)
560 cmd.FailOnError(err, fmt.Sprintf("Couldn't parse %q to address", *from))
561
562 recipientListDelimiter := ','
563 if *parseAsTSV {
564 recipientListDelimiter = '\t'
565 }
566 recipients, probs, err := readRecipientsList(*recipientListFile, recipientListDelimiter)
567 cmd.FailOnError(err, "Couldn't populate recipients")
568
569 if probs != "" {
570 log.Infof("While reading the recipient list file %s", probs)
571 }
572
573 var mailClient bmail.Mailer
574 if *dryRun {
575 log.Infof("Starting %s in dry-run mode", cmd.VersionString())
576 mailClient = bmail.NewDryRun(*address, log)
577 } else {
578 log.Infof("Starting %s", cmd.VersionString())
579 smtpPassword, err := cfg.NotifyMailer.PasswordConfig.Pass()
580 cmd.FailOnError(err, "Couldn't load SMTP password from file")
581
582 mailClient = bmail.New(
583 cfg.NotifyMailer.Server,
584 cfg.NotifyMailer.Port,
585 cfg.NotifyMailer.Username,
586 smtpPassword,
587 nil,
588 *address,
589 log,
590 metrics.NoopRegisterer,
591 *reconnBase,
592 *reconnMax)
593 }
594
595 m := mailer{
596 clk: cmd.Clock(),
597 log: log,
598 dbMap: dbMap,
599 mailer: mailClient,
600 subject: *subject,
601 recipients: recipients,
602 emailTemplate: template,
603 targetRange: interval{
604 start: *start,
605 end: *end,
606 },
607 sleepInterval: *sleep,
608 parallelSends: *parallelSends,
609 }
610
611 err = m.run(context.TODO())
612 cmd.FailOnError(err, "Couldn't complete")
613
614 log.Info("Completed successfully")
615 }
616
617 func init() {
618 cmd.RegisterCommand("notify-mailer", main, &cmd.ConfigValidator{Config: &Config{}})
619 }
620
View as plain text