...

Source file src/github.com/letsencrypt/boulder/test/load-generator/state.go

Documentation: github.com/letsencrypt/boulder/test/load-generator

     1  package main
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"crypto/ecdsa"
     7  	"crypto/elliptic"
     8  	"crypto/rand"
     9  	"crypto/tls"
    10  	"crypto/x509"
    11  	"encoding/json"
    12  	"errors"
    13  	"fmt"
    14  	"io"
    15  	"log"
    16  	"net"
    17  	"net/http"
    18  	"os"
    19  	"reflect"
    20  	"runtime"
    21  	"sort"
    22  	"strings"
    23  	"sync"
    24  	"sync/atomic"
    25  	"time"
    26  
    27  	"gopkg.in/go-jose/go-jose.v2"
    28  
    29  	"github.com/letsencrypt/boulder/test/load-generator/acme"
    30  	"github.com/letsencrypt/challtestsrv"
    31  )
    32  
    33  // account is an ACME v2 account resource. It does not have a `jose.Signer`
    34  // because we need to set the Signer options per-request with the URL being
    35  // POSTed and must construct it on the fly from the `key`. Accounts are
    36  // protected by a `sync.Mutex` that must be held for updates (see
    37  // `account.Update`).
    38  type account struct {
    39  	key             *ecdsa.PrivateKey
    40  	id              string
    41  	finalizedOrders []string
    42  	certs           []string
    43  	mu              sync.Mutex
    44  }
    45  
    46  // update locks an account resource's mutex and sets the `finalizedOrders` and
    47  // `certs` fields to the provided values.
    48  func (acct *account) update(finalizedOrders, certs []string) {
    49  	acct.mu.Lock()
    50  	defer acct.mu.Unlock()
    51  
    52  	acct.finalizedOrders = append(acct.finalizedOrders, finalizedOrders...)
    53  	acct.certs = append(acct.certs, certs...)
    54  }
    55  
    56  type acmeCache struct {
    57  	// The current V2 account (may be nil for legacy load generation)
    58  	acct *account
    59  	// Pending orders waiting for authorization challenge validation
    60  	pendingOrders []*OrderJSON
    61  	// Fulfilled orders in a valid status waiting for finalization
    62  	fulfilledOrders []string
    63  	// Finalized orders that have certificates
    64  	finalizedOrders []string
    65  
    66  	// A list of URLs for issued certificates
    67  	certs []string
    68  	// The nonce source for JWS signature nonce headers
    69  	ns *nonceSource
    70  }
    71  
    72  // signEmbeddedV2Request signs the provided request data using the acmeCache's
    73  // account's private key. The provided URL is set as a protected header per ACME
    74  // v2 JWS standards. The resulting JWS contains an **embedded** JWK - this makes
    75  // this function primarily applicable to new account requests where no key ID is
    76  // known.
    77  func (c *acmeCache) signEmbeddedV2Request(data []byte, url string) (*jose.JSONWebSignature, error) {
    78  	// Create a signing key for the account's private key
    79  	signingKey := jose.SigningKey{
    80  		Key:       c.acct.key,
    81  		Algorithm: jose.ES256,
    82  	}
    83  	// Create a signer, setting the URL protected header
    84  	signer, err := jose.NewSigner(signingKey, &jose.SignerOptions{
    85  		NonceSource: c.ns,
    86  		EmbedJWK:    true,
    87  		ExtraHeaders: map[jose.HeaderKey]interface{}{
    88  			"url": url,
    89  		},
    90  	})
    91  	if err != nil {
    92  		return nil, err
    93  	}
    94  
    95  	// Sign the data with the signer
    96  	signed, err := signer.Sign(data)
    97  	if err != nil {
    98  		return nil, err
    99  	}
   100  	return signed, nil
   101  }
   102  
   103  // signKeyIDV2Request signs the provided request data using the acmeCache's
   104  // account's private key. The provided URL is set as a protected header per ACME
   105  // v2 JWS standards. The resulting JWS contains a Key ID header that is
   106  // populated using the acmeCache's account's ID. This is the default JWS signing
   107  // style for ACME v2 requests and should be used everywhere but where the key ID
   108  // is unknown (e.g. new-account requests where an account doesn't exist yet).
   109  func (c *acmeCache) signKeyIDV2Request(data []byte, url string) (*jose.JSONWebSignature, error) {
   110  	// Create a JWK with the account's private key and key ID
   111  	jwk := &jose.JSONWebKey{
   112  		Key:       c.acct.key,
   113  		Algorithm: "ECDSA",
   114  		KeyID:     c.acct.id,
   115  	}
   116  
   117  	// Create a signing key with the JWK
   118  	signerKey := jose.SigningKey{
   119  		Key:       jwk,
   120  		Algorithm: jose.ES256,
   121  	}
   122  
   123  	// Ensure the signer's nonce source and URL header will be set
   124  	opts := &jose.SignerOptions{
   125  		NonceSource: c.ns,
   126  		ExtraHeaders: map[jose.HeaderKey]interface{}{
   127  			"url": url,
   128  		},
   129  	}
   130  
   131  	// Construct the signer with the configured options
   132  	signer, err := jose.NewSigner(signerKey, opts)
   133  	if err != nil {
   134  		return nil, err
   135  	}
   136  
   137  	// Sign the data with the signer
   138  	signed, err := signer.Sign(data)
   139  	if err != nil {
   140  		return nil, err
   141  	}
   142  	return signed, nil
   143  }
   144  
   145  type RateDelta struct {
   146  	Inc    int64
   147  	Period time.Duration
   148  }
   149  
   150  type Plan struct {
   151  	Runtime time.Duration
   152  	Rate    int64
   153  	Delta   *RateDelta
   154  }
   155  
   156  type respCode struct {
   157  	code int
   158  	num  int
   159  }
   160  
   161  // State holds *all* the stuff
   162  type State struct {
   163  	domainBase      string
   164  	email           string
   165  	maxRegs         int
   166  	maxNamesPerCert int
   167  	realIP          string
   168  	certKey         *ecdsa.PrivateKey
   169  
   170  	operations []func(*State, *acmeCache) error
   171  
   172  	rMu sync.RWMutex
   173  
   174  	// accts holds V2 account objects
   175  	accts []*account
   176  
   177  	challSrv    *challtestsrv.ChallSrv
   178  	callLatency latencyWriter
   179  
   180  	directory  *acme.Directory
   181  	challStrat acme.ChallengeStrategy
   182  	httpClient *http.Client
   183  
   184  	revokeChance float32
   185  
   186  	reqTotal  int64
   187  	respCodes map[int]*respCode
   188  	cMu       sync.Mutex
   189  
   190  	wg *sync.WaitGroup
   191  }
   192  
   193  type rawAccount struct {
   194  	FinalizedOrders []string `json:"finalizedOrders"`
   195  	Certs           []string `json:"certs"`
   196  	ID              string   `json:"id"`
   197  	RawKey          []byte   `json:"rawKey"`
   198  }
   199  
   200  type snapshot struct {
   201  	Accounts []rawAccount
   202  }
   203  
   204  func (s *State) numAccts() int {
   205  	s.rMu.RLock()
   206  	defer s.rMu.RUnlock()
   207  	return len(s.accts)
   208  }
   209  
   210  // Snapshot will save out generated accounts
   211  func (s *State) Snapshot(filename string) error {
   212  	fmt.Printf("[+] Saving accounts to %s\n", filename)
   213  	snap := snapshot{}
   214  	for _, acct := range s.accts {
   215  		k, err := x509.MarshalECPrivateKey(acct.key)
   216  		if err != nil {
   217  			return err
   218  		}
   219  		snap.Accounts = append(snap.Accounts, rawAccount{
   220  			Certs:           acct.certs,
   221  			FinalizedOrders: acct.finalizedOrders,
   222  			ID:              acct.id,
   223  			RawKey:          k,
   224  		})
   225  	}
   226  	cont, err := json.Marshal(snap)
   227  	if err != nil {
   228  		return err
   229  	}
   230  	return os.WriteFile(filename, cont, os.ModePerm)
   231  }
   232  
   233  // Restore previously generated accounts
   234  func (s *State) Restore(filename string) error {
   235  	fmt.Printf("[+] Loading accounts from %q\n", filename)
   236  	// NOTE(@cpu): Using os.O_CREATE here explicitly to create the file if it does
   237  	// not exist.
   238  	f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0600)
   239  	if err != nil {
   240  		return err
   241  	}
   242  
   243  	content, err := io.ReadAll(f)
   244  	if err != nil {
   245  		return err
   246  	}
   247  	// If the file's content is the empty string it was probably just created.
   248  	// Avoid an unmarshaling error by assuming an empty file is an empty snapshot.
   249  	if string(content) == "" {
   250  		content = []byte("{}")
   251  	}
   252  
   253  	snap := snapshot{}
   254  	err = json.Unmarshal(content, &snap)
   255  	if err != nil {
   256  		return err
   257  	}
   258  	for _, a := range snap.Accounts {
   259  		key, err := x509.ParseECPrivateKey(a.RawKey)
   260  		if err != nil {
   261  			continue
   262  		}
   263  		if err != nil {
   264  			continue
   265  		}
   266  		s.accts = append(s.accts, &account{
   267  			key:             key,
   268  			id:              a.ID,
   269  			finalizedOrders: a.FinalizedOrders,
   270  			certs:           a.Certs,
   271  		})
   272  	}
   273  	return nil
   274  }
   275  
   276  // New returns a pointer to a new State struct or an error
   277  func New(
   278  	directoryURL string,
   279  	domainBase string,
   280  	realIP string,
   281  	maxRegs, maxNamesPerCert int,
   282  	latencyPath string,
   283  	userEmail string,
   284  	operations []string,
   285  	challStrat string,
   286  	revokeChance float32) (*State, error) {
   287  	certKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
   288  	if err != nil {
   289  		return nil, err
   290  	}
   291  	directory, err := acme.NewDirectory(directoryURL)
   292  	if err != nil {
   293  		return nil, err
   294  	}
   295  	strategy, err := acme.NewChallengeStrategy(challStrat)
   296  	if err != nil {
   297  		return nil, err
   298  	}
   299  	if revokeChance > 1 {
   300  		return nil, errors.New("revokeChance must be between 0.0 and 1.0")
   301  	}
   302  	httpClient := &http.Client{
   303  		Transport: &http.Transport{
   304  			DialContext: (&net.Dialer{
   305  				Timeout:   10 * time.Second,
   306  				KeepAlive: 30 * time.Second,
   307  			}).DialContext,
   308  			TLSHandshakeTimeout: 5 * time.Second,
   309  			TLSClientConfig: &tls.Config{
   310  				InsecureSkipVerify: true, // CDN bypass can cause validation failures
   311  			},
   312  			MaxIdleConns:    500,
   313  			IdleConnTimeout: 90 * time.Second,
   314  		},
   315  		Timeout: 10 * time.Second,
   316  	}
   317  	latencyFile, err := newLatencyFile(latencyPath)
   318  	if err != nil {
   319  		return nil, err
   320  	}
   321  	s := &State{
   322  		httpClient:      httpClient,
   323  		directory:       directory,
   324  		challStrat:      strategy,
   325  		certKey:         certKey,
   326  		domainBase:      domainBase,
   327  		callLatency:     latencyFile,
   328  		wg:              new(sync.WaitGroup),
   329  		realIP:          realIP,
   330  		maxRegs:         maxRegs,
   331  		maxNamesPerCert: maxNamesPerCert,
   332  		email:           userEmail,
   333  		respCodes:       make(map[int]*respCode),
   334  		revokeChance:    revokeChance,
   335  	}
   336  
   337  	// convert operations strings to methods
   338  	for _, opName := range operations {
   339  		op, present := stringToOperation[opName]
   340  		if !present {
   341  			return nil, fmt.Errorf("unknown operation %q", opName)
   342  		}
   343  		s.operations = append(s.operations, op)
   344  	}
   345  
   346  	return s, nil
   347  }
   348  
   349  // Run runs the WFE load-generator
   350  func (s *State) Run(
   351  	ctx context.Context,
   352  	httpOneAddrs []string,
   353  	tlsALPNOneAddrs []string,
   354  	dnsAddrs []string,
   355  	fakeDNS string,
   356  	p Plan) error {
   357  	// Create a new challenge server binding the requested addrs.
   358  	challSrv, err := challtestsrv.New(challtestsrv.Config{
   359  		HTTPOneAddrs:    httpOneAddrs,
   360  		TLSALPNOneAddrs: tlsALPNOneAddrs,
   361  		DNSOneAddrs:     dnsAddrs,
   362  		// Use a logger that has a load-generator prefix
   363  		Log: log.New(os.Stdout, "load-generator challsrv - ", log.LstdFlags),
   364  	})
   365  	// Setup the challenge server to return the mock "fake DNS" IP address
   366  	challSrv.SetDefaultDNSIPv4(fakeDNS)
   367  	// Disable returning any AAAA records.
   368  	challSrv.SetDefaultDNSIPv6("")
   369  
   370  	if err != nil {
   371  		return err
   372  	}
   373  	// Save the challenge server in the state
   374  	s.challSrv = challSrv
   375  
   376  	// Start the Challenge server in its own Go routine
   377  	go s.challSrv.Run()
   378  
   379  	if p.Delta != nil {
   380  		go func() {
   381  			for {
   382  				time.Sleep(p.Delta.Period)
   383  				atomic.AddInt64(&p.Rate, p.Delta.Inc)
   384  			}
   385  		}()
   386  	}
   387  
   388  	// Run sending loop
   389  	stop := make(chan bool, 1)
   390  	fmt.Println("[+] Beginning execution plan")
   391  	i := int64(0)
   392  	go func() {
   393  		for {
   394  			start := time.Now()
   395  			select {
   396  			case <-stop:
   397  				return
   398  			default:
   399  				s.wg.Add(1)
   400  				go s.sendCall()
   401  				atomic.AddInt64(&i, 1)
   402  			}
   403  			sf := time.Duration(time.Second.Nanoseconds()/atomic.LoadInt64(&p.Rate)) - time.Since(start)
   404  			time.Sleep(sf)
   405  		}
   406  	}()
   407  	go func() {
   408  		lastTotal := int64(0)
   409  		lastReqTotal := int64(0)
   410  		for {
   411  			time.Sleep(time.Second)
   412  			curTotal := atomic.LoadInt64(&i)
   413  			curReqTotal := atomic.LoadInt64(&s.reqTotal)
   414  			fmt.Printf(
   415  				"%s Action rate: %d/s [expected: %d/s], Request rate: %d/s, Responses: [%s]\n",
   416  				time.Now().Format(time.DateTime),
   417  				curTotal-lastTotal,
   418  				atomic.LoadInt64(&p.Rate),
   419  				curReqTotal-lastReqTotal,
   420  				s.respCodeString(),
   421  			)
   422  			lastTotal = curTotal
   423  			lastReqTotal = curReqTotal
   424  		}
   425  	}()
   426  
   427  	select {
   428  	case <-time.After(p.Runtime):
   429  		fmt.Println("[+] Execution plan finished")
   430  	case <-ctx.Done():
   431  		fmt.Println("[!] Execution plan cancelled")
   432  	}
   433  	stop <- true
   434  	fmt.Println("[+] Waiting for pending flows to finish before killing challenge server")
   435  	s.wg.Wait()
   436  	fmt.Println("[+] Shutting down challenge server")
   437  	s.challSrv.Shutdown()
   438  	return nil
   439  }
   440  
   441  // HTTP utils
   442  
   443  func (s *State) addRespCode(code int) {
   444  	s.cMu.Lock()
   445  	defer s.cMu.Unlock()
   446  	code = code / 100
   447  	if e, ok := s.respCodes[code]; ok {
   448  		e.num++
   449  	} else if !ok {
   450  		s.respCodes[code] = &respCode{code, 1}
   451  	}
   452  }
   453  
   454  // codes is a convenience type for holding copies of the state object's
   455  // `respCodes` field of `map[int]*respCode`. Unlike the state object the
   456  // respCodes are copied by value and not held as pointers. The codes type allows
   457  // sorting the response codes for output.
   458  type codes []respCode
   459  
   460  func (c codes) Len() int {
   461  	return len(c)
   462  }
   463  
   464  func (c codes) Less(i, j int) bool {
   465  	return c[i].code < c[j].code
   466  }
   467  
   468  func (c codes) Swap(i, j int) {
   469  	c[i], c[j] = c[j], c[i]
   470  }
   471  
   472  func (s *State) respCodeString() string {
   473  	s.cMu.Lock()
   474  	list := codes{}
   475  	for _, v := range s.respCodes {
   476  		list = append(list, *v)
   477  	}
   478  	s.cMu.Unlock()
   479  	sort.Sort(list)
   480  	counts := []string{}
   481  	for _, v := range list {
   482  		counts = append(counts, fmt.Sprintf("%dxx: %d", v.code, v.num))
   483  	}
   484  	return strings.Join(counts, ", ")
   485  }
   486  
   487  var userAgent = "boulder load-generator -- heyo ^_^"
   488  
   489  func (s *State) post(
   490  	url string,
   491  	payload []byte,
   492  	ns *nonceSource,
   493  	latencyTag string,
   494  	expectedCode int) (*http.Response, error) {
   495  	req, err := http.NewRequest("POST", url, bytes.NewBuffer(payload))
   496  	if err != nil {
   497  		return nil, err
   498  	}
   499  	req.Header.Add("X-Real-IP", s.realIP)
   500  	req.Header.Add("User-Agent", userAgent)
   501  	req.Header.Add("Content-Type", "application/jose+json")
   502  	atomic.AddInt64(&s.reqTotal, 1)
   503  	started := time.Now()
   504  	resp, err := s.httpClient.Do(req)
   505  	finished := time.Now()
   506  	state := "error"
   507  	// Defer logging the latency and result
   508  	defer func() {
   509  		s.callLatency.Add(latencyTag, started, finished, state)
   510  	}()
   511  	if err != nil {
   512  		return nil, err
   513  	}
   514  	go s.addRespCode(resp.StatusCode)
   515  	if newNonce := resp.Header.Get("Replay-Nonce"); newNonce != "" {
   516  		ns.addNonce(newNonce)
   517  	}
   518  	if resp.StatusCode != expectedCode {
   519  		return nil, fmt.Errorf("POST %q returned HTTP status %d, expected %d",
   520  			url, resp.StatusCode, expectedCode)
   521  	}
   522  	state = "good"
   523  	return resp, nil
   524  }
   525  
   526  type nonceSource struct {
   527  	mu        sync.Mutex
   528  	noncePool []string
   529  	s         *State
   530  }
   531  
   532  func (ns *nonceSource) getNonce() (string, error) {
   533  	nonceURL := ns.s.directory.EndpointURL(acme.NewNonceEndpoint)
   534  	latencyTag := string(acme.NewNonceEndpoint)
   535  	started := time.Now()
   536  	resp, err := ns.s.httpClient.Head(nonceURL)
   537  	finished := time.Now()
   538  	state := "error"
   539  	defer func() {
   540  		ns.s.callLatency.Add(fmt.Sprintf("HEAD %s", latencyTag),
   541  			started, finished, state)
   542  	}()
   543  	if err != nil {
   544  		return "", err
   545  	}
   546  	defer resp.Body.Close()
   547  	if nonce := resp.Header.Get("Replay-Nonce"); nonce != "" {
   548  		state = "good"
   549  		return nonce, nil
   550  	}
   551  	return "", errors.New("'Replay-Nonce' header not supplied")
   552  }
   553  
   554  // Nonce satisfies the interface jose.NonceSource, should probably actually be per context but ¯\_(ツ)_/¯ for now
   555  func (ns *nonceSource) Nonce() (string, error) {
   556  	ns.mu.Lock()
   557  	if len(ns.noncePool) == 0 {
   558  		ns.mu.Unlock()
   559  		return ns.getNonce()
   560  	}
   561  	defer ns.mu.Unlock()
   562  	nonce := ns.noncePool[0]
   563  	if len(ns.noncePool) > 1 {
   564  		ns.noncePool = ns.noncePool[1:]
   565  	} else {
   566  		ns.noncePool = []string{}
   567  	}
   568  	return nonce, nil
   569  }
   570  
   571  func (ns *nonceSource) addNonce(nonce string) {
   572  	ns.mu.Lock()
   573  	defer ns.mu.Unlock()
   574  	ns.noncePool = append(ns.noncePool, nonce)
   575  }
   576  
   577  // addAccount adds the provided account to the state's list of accts
   578  func (s *State) addAccount(acct *account) {
   579  	s.rMu.Lock()
   580  	defer s.rMu.Unlock()
   581  
   582  	s.accts = append(s.accts, acct)
   583  }
   584  
   585  func (s *State) sendCall() {
   586  	defer s.wg.Done()
   587  	c := &acmeCache{}
   588  
   589  	for _, op := range s.operations {
   590  		err := op(s, c)
   591  		if err != nil {
   592  			method := runtime.FuncForPC(reflect.ValueOf(op).Pointer()).Name()
   593  			fmt.Printf("[FAILED] %s: %s\n", method, err)
   594  			break
   595  		}
   596  	}
   597  	// If the acmeCache's V2 account isn't nil, update it based on the cache's
   598  	// finalizedOrders and certs.
   599  	if c.acct != nil {
   600  		c.acct.update(c.finalizedOrders, c.certs)
   601  	}
   602  }
   603  

View as plain text