...

Source file src/github.com/google/certificate-transparency-go/submission/distributor.go

Documentation: github.com/google/certificate-transparency-go/submission

     1  // Copyright 2019 Google LLC. All Rights Reserved.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package submission
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"net/http"
    21  	"net/url"
    22  	"strconv"
    23  	"sync"
    24  	"time"
    25  
    26  	"github.com/google/certificate-transparency-go/client"
    27  	"github.com/google/certificate-transparency-go/ctpolicy"
    28  	"github.com/google/certificate-transparency-go/jsonclient"
    29  	"github.com/google/certificate-transparency-go/loglist3"
    30  	"github.com/google/certificate-transparency-go/trillian/ctfe"
    31  	"github.com/google/certificate-transparency-go/x509"
    32  	"github.com/google/certificate-transparency-go/x509util"
    33  	"github.com/google/trillian/monitoring"
    34  	"k8s.io/klog/v2"
    35  
    36  	ct "github.com/google/certificate-transparency-go"
    37  )
    38  
    39  var (
    40  	// Metrics are per-log, per-endpoint and some per-response-status code.
    41  	distOnce      sync.Once
    42  	reqsCounter   monitoring.Counter   // logurl, ep => value
    43  	rspsCounter   monitoring.Counter   // logurl, ep, sc => value
    44  	errCounter    monitoring.Counter   // logurl, ep, status => value
    45  	logRspLatency monitoring.Histogram // logurl, ep => value
    46  	// Per-log
    47  	lastGetRootsSuccess monitoring.Gauge // Unix time
    48  )
    49  
    50  // distInitMetrics initializes all the exported metrics.
    51  func distInitMetrics(mf monitoring.MetricFactory) {
    52  	reqsCounter = mf.NewCounter("http_reqs", "Number of requests", "logurl", "ep")
    53  	rspsCounter = mf.NewCounter("http_rsps", "Number of responses", "logurl", "ep", "httpstatus")
    54  	errCounter = mf.NewCounter("err_count", "Number of errors", "logurl", "ep", "errtype")
    55  	logRspLatency = mf.NewHistogram("http_log_latency", "Latency of responses in seconds", "logurl", "ep")
    56  	lastGetRootsSuccess = mf.NewGauge("last_get_roots_success", "Unix timestamp for last successful get-roots request", "logurl")
    57  }
    58  
    59  const (
    60  	// GetRootsTimeout timeout used for external requests within root-updates.
    61  	getRootsTimeout = time.Second * 10
    62  )
    63  
    64  // pendingLogsPolicy is policy stub used for spreading submissions across
    65  // Pending and Qualified Logs.
    66  type pendingLogsPolicy struct {
    67  }
    68  
    69  func (stubP pendingLogsPolicy) LogsByGroup(cert *x509.Certificate, approved *loglist3.LogList) (ctpolicy.LogPolicyData, error) {
    70  	baseGroup, err := ctpolicy.BaseGroupFor(approved, 1)
    71  	groups := ctpolicy.LogPolicyData{baseGroup.Name: baseGroup}
    72  	return groups, err
    73  }
    74  
    75  func (stubP pendingLogsPolicy) Name() string {
    76  	return "Pending/Qualified Policy"
    77  }
    78  
    79  // Distributor operates policy-based submission across Logs.
    80  type Distributor struct {
    81  	ll                 *loglist3.LogList
    82  	usableLl           *loglist3.LogList
    83  	pendingQualifiedLl *loglist3.LogList
    84  
    85  	mu sync.RWMutex
    86  
    87  	// helper structs produced out of ll during init.
    88  	logClients map[string]client.AddLogClient
    89  	logRoots   loglist3.LogRoots
    90  	rootPool   *x509util.PEMCertPool
    91  
    92  	rootDataFull bool
    93  
    94  	policy            ctpolicy.CTPolicy
    95  	pendingLogsPolicy ctpolicy.CTPolicy
    96  }
    97  
    98  // RefreshRoots requests roots from Logs and updates local copy.
    99  // Returns error map keyed by log-URL for any Log experiencing roots retrieval
   100  // problems
   101  // If at least one root was successfully parsed for a log, log roots set gets
   102  // the update.
   103  func (d *Distributor) RefreshRoots(ctx context.Context) map[string]error {
   104  	type RootsResult struct {
   105  		LogURL string
   106  		Roots  *x509util.PEMCertPool
   107  		Err    error
   108  	}
   109  	ch := make(chan RootsResult, len(d.logClients))
   110  
   111  	rctx, cancel := context.WithTimeout(ctx, getRootsTimeout)
   112  	defer cancel()
   113  
   114  	for logURL, lc := range d.logClients {
   115  		go func(logURL string, lc client.AddLogClient) {
   116  			res := RootsResult{LogURL: logURL}
   117  
   118  			roots, err := lc.GetAcceptedRoots(rctx)
   119  			if err != nil {
   120  				res.Err = fmt.Errorf("roots refresh for %s: couldn't collect roots. %s", logURL, err)
   121  				ch <- res
   122  				return
   123  			}
   124  			res.Roots = x509util.NewPEMCertPool()
   125  			for _, r := range roots {
   126  				parsed, err := x509.ParseCertificate(r.Data)
   127  				if x509.IsFatal(err) {
   128  					errS := fmt.Errorf("roots refresh for %s: unable to parse root cert: %s", logURL, err)
   129  					if res.Err != nil {
   130  						res.Err = fmt.Errorf("%s\n%s", res.Err, errS)
   131  					} else {
   132  						res.Err = errS
   133  					}
   134  					continue
   135  				}
   136  				res.Roots.AddCert(parsed)
   137  			}
   138  			ch <- res
   139  		}(logURL, lc)
   140  	}
   141  
   142  	// Collect get-roots results for every Log-client.
   143  	freshRoots := make(loglist3.LogRoots)
   144  	errors := make(map[string]error)
   145  	for range d.logClients {
   146  		r := <-ch
   147  		// update roots
   148  		if r.Err != nil {
   149  			errors[r.LogURL] = r.Err
   150  		}
   151  		// Roots get update even if some returned roots couldn't get parsed.
   152  		if r.Roots != nil {
   153  			freshRoots[r.LogURL] = r.Roots
   154  			lastGetRootsSuccess.Set(float64(time.Now().Unix()), r.LogURL)
   155  		}
   156  	}
   157  
   158  	d.mu.Lock()
   159  	defer d.mu.Unlock()
   160  
   161  	d.logRoots = freshRoots
   162  	d.rootDataFull = len(d.logRoots) == len(d.logClients)
   163  	// Merge individual root-pools into a unified one
   164  	d.rootPool = x509util.NewPEMCertPool()
   165  	for _, pool := range d.logRoots {
   166  		for _, c := range pool.RawCertificates() {
   167  			d.rootPool.AddCert(c)
   168  		}
   169  	}
   170  
   171  	return errors
   172  }
   173  
   174  // incRspsCounter extracts HTTP status code and increments corresponding rspsCounter.
   175  func incRspsCounter(logURL string, endpoint string, rspErr error) {
   176  	status := http.StatusOK
   177  	if rspErr != nil {
   178  		status = http.StatusBadRequest // default to this if status code unavailable
   179  		if err, ok := rspErr.(client.RspError); ok {
   180  			status = err.StatusCode
   181  		}
   182  	}
   183  	rspsCounter.Inc(logURL, endpoint, strconv.Itoa(status))
   184  }
   185  
   186  // incErrCounter increments corresponding errCounter if any error occurred during
   187  // submission to a Log.
   188  func incErrCounter(logURL string, endpoint string, rspErr error) {
   189  	if rspErr == nil {
   190  		return
   191  	}
   192  	err, ok := rspErr.(client.RspError)
   193  	switch {
   194  	case !ok:
   195  		klog.Errorf("unknown_error (%s, %s) => %v", logURL, endpoint, rspErr)
   196  		errCounter.Inc(logURL, endpoint, "unknown_error")
   197  	case err.Err != nil && err.StatusCode == http.StatusOK:
   198  		klog.Errorf("invalid_sct (%s, %s) => HTTP details: status=%d, body:\n%s", logURL, endpoint, err.StatusCode, err.Body)
   199  		errCounter.Inc(logURL, endpoint, "invalid_sct")
   200  	case err.Err != nil: // err.StatusCode != http.StatusOK.
   201  		klog.Errorf("connection_error (%s, %s) => HTTP details: status=%d, body:\n%s", logURL, endpoint, err.StatusCode, err.Body)
   202  		errCounter.Inc(logURL, endpoint, "connection_error")
   203  	}
   204  }
   205  
   206  // SubmitToLog implements Submitter interface.
   207  func (d *Distributor) SubmitToLog(ctx context.Context, logURL string, chain []ct.ASN1Cert, asPreChain bool) (*ct.SignedCertificateTimestamp, error) {
   208  	lc, ok := d.logClients[logURL]
   209  	if !ok {
   210  		return nil, fmt.Errorf("no client registered for Log with URL %q", logURL)
   211  	}
   212  
   213  	// endpoint used for metrics
   214  	endpoint := string(ctfe.AddChainName)
   215  	if asPreChain {
   216  		endpoint = string(ctfe.AddPreChainName)
   217  	}
   218  
   219  	defer func(start time.Time) {
   220  		logRspLatency.Observe(time.Since(start).Seconds(), logURL, endpoint)
   221  	}(time.Now())
   222  	reqsCounter.Inc(logURL, endpoint)
   223  	addChain := lc.AddChain
   224  	if asPreChain {
   225  		addChain = lc.AddPreChain
   226  	}
   227  	sct, err := addChain(ctx, chain)
   228  	incRspsCounter(logURL, endpoint, err)
   229  	incErrCounter(logURL, endpoint, err)
   230  	return sct, err
   231  }
   232  
   233  // parseRawChain reads cert chain from bytes into x509.Certificate format.
   234  func parseRawChain(rawChain [][]byte) ([]*x509.Certificate, error) {
   235  	parsedChain := make([]*x509.Certificate, 0, len(rawChain))
   236  	for _, certBytes := range rawChain {
   237  		cert, err := x509.ParseCertificate(certBytes)
   238  		if x509.IsFatal(err) {
   239  			return nil, fmt.Errorf("distributor unable to parse cert-chain %v", err)
   240  		}
   241  		parsedChain = append(parsedChain, cert)
   242  	}
   243  	return parsedChain, nil
   244  }
   245  
   246  // addSomeChain is helper calling one of AddChain or AddPreChain based
   247  // on asPreChain param.
   248  func (d *Distributor) addSomeChain(ctx context.Context, rawChain [][]byte, loadPendingLogs bool, asPreChain bool) ([]*AssignedSCT, error) {
   249  	if len(rawChain) == 0 {
   250  		return nil, fmt.Errorf("distributor unable to process empty chain")
   251  	}
   252  
   253  	// Helper function establishing responsibility of locking while determining log list and root chain.
   254  	compatibleLogsAndChain := func() (loglist3.LogList, []*x509.Certificate, error) {
   255  		d.mu.RLock()
   256  		defer d.mu.RUnlock()
   257  		vOpts := ctfe.NewCertValidationOpts(d.rootPool, time.Time{}, false, false, nil, nil, false, nil)
   258  		rootedChain, err := ctfe.ValidateChain(rawChain, vOpts)
   259  		if err == nil {
   260  			return d.usableLl.Compatible(rootedChain[0], rootedChain[len(rootedChain)-1], d.logRoots), rootedChain, nil
   261  		}
   262  		if d.rootDataFull {
   263  			// Could not verify the chain while root info for logs is complete.
   264  			return loglist3.LogList{}, nil, fmt.Errorf("distributor unable to process cert-chain: %v", err)
   265  		}
   266  
   267  		// Chain might be rooted to the Log which has no root-info yet.
   268  		parsedChain, err := parseRawChain(rawChain)
   269  		if err != nil {
   270  			return loglist3.LogList{}, nil, fmt.Errorf("distributor unable to parse cert-chain: %v", err)
   271  		}
   272  		return d.usableLl.Compatible(parsedChain[0], nil, d.logRoots), parsedChain, nil
   273  	}
   274  	compatibleLogs, parsedChain, err := compatibleLogsAndChain()
   275  	if err != nil {
   276  		return nil, err
   277  	}
   278  
   279  	// Distinguish between precerts and certificates.
   280  	isPrecert, err := ctfe.IsPrecertificate(parsedChain[0])
   281  	if err != nil {
   282  		return nil, fmt.Errorf("distributor unable to check certificate %v: \n%v", parsedChain[0], err)
   283  	}
   284  	if isPrecert != asPreChain {
   285  		var methodType, inputType string
   286  		if asPreChain {
   287  			methodType = "pre-"
   288  		}
   289  		if isPrecert {
   290  			inputType = "pre-"
   291  		}
   292  		return nil, fmt.Errorf("add-%schain method expected %scertificate, got %scertificate", methodType, methodType, inputType)
   293  	}
   294  
   295  	// Set up policy structs.
   296  	groups, err := d.policy.LogsByGroup(parsedChain[0], &compatibleLogs)
   297  	if err != nil {
   298  		return nil, fmt.Errorf("distributor does not have enough compatible Logs to comply with the policy: %v", err)
   299  	}
   300  	chain := make([]ct.ASN1Cert, len(parsedChain))
   301  	for i, c := range parsedChain {
   302  		chain[i] = ct.ASN1Cert{Data: c.Raw}
   303  	}
   304  	if loadPendingLogs {
   305  		go func() {
   306  			pendingGroup, err := d.pendingLogsPolicy.LogsByGroup(parsedChain[0], d.pendingQualifiedLl)
   307  			if err != nil {
   308  				return
   309  			}
   310  			if _, err := GetSCTs(ctx, d, chain, asPreChain, pendingGroup); err != nil {
   311  				klog.Errorf("GetSCTs(): %v", err)
   312  			}
   313  		}()
   314  	}
   315  	return GetSCTs(ctx, d, chain, asPreChain, groups)
   316  }
   317  
   318  // AddPreChain runs add-pre-chain calls across subset of logs according to
   319  // Distributor's policy. May emit both SCTs array and error when SCTs
   320  // collected do not satisfy the policy.
   321  func (d *Distributor) AddPreChain(ctx context.Context, rawChain [][]byte, loadPendingLogs bool) ([]*AssignedSCT, error) {
   322  	return d.addSomeChain(ctx, rawChain, loadPendingLogs, true)
   323  }
   324  
   325  // AddChain runs add-chain calls across subset of logs according to
   326  // Distributor's policy. May emit both SCTs array and error when SCTs
   327  // collected do not satisfy the policy.
   328  func (d *Distributor) AddChain(ctx context.Context, rawChain [][]byte, loadPendingLogs bool) ([]*AssignedSCT, error) {
   329  	return d.addSomeChain(ctx, rawChain, loadPendingLogs, false)
   330  }
   331  
   332  // LogClientBuilder builds client-interface instance for a given Log.
   333  type LogClientBuilder func(*loglist3.Log) (client.AddLogClient, error)
   334  
   335  // BuildLogClient is default (non-mock) LogClientBuilder.
   336  func BuildLogClient(log *loglist3.Log) (client.AddLogClient, error) {
   337  	u, err := url.Parse(log.URL)
   338  	if err != nil {
   339  		return nil, err
   340  	}
   341  	if u.Scheme == "" {
   342  		u.Scheme = "https"
   343  	}
   344  	hc := &http.Client{Timeout: time.Second * 10}
   345  	return client.New(u.String(), hc, jsonclient.Options{PublicKeyDER: log.Key})
   346  }
   347  
   348  // NewDistributor creates and inits a Distributor instance.
   349  // The Distributor will asynchronously fetch the latest roots from all of the
   350  // logs when active. Call Run() to fetch roots and init regular updates to keep
   351  // the local copy of the roots up-to-date.
   352  func NewDistributor(ll *loglist3.LogList, plc ctpolicy.CTPolicy, lcBuilder LogClientBuilder, mf monitoring.MetricFactory) (*Distributor, error) {
   353  	var d Distributor
   354  	// Divide Logs by statuses.
   355  	d.ll = ll
   356  	usableStat := []loglist3.LogStatus{loglist3.UsableLogStatus}
   357  	active := ll.SelectByStatus(usableStat)
   358  	d.usableLl = &active
   359  	pendingQualifiedStat := []loglist3.LogStatus{
   360  		loglist3.PendingLogStatus, loglist3.QualifiedLogStatus}
   361  	pending := ll.SelectByStatus(pendingQualifiedStat)
   362  	d.pendingQualifiedLl = &pending
   363  
   364  	d.policy = plc
   365  	d.pendingLogsPolicy = pendingLogsPolicy{}
   366  	d.logClients = make(map[string]client.AddLogClient)
   367  	d.logRoots = make(loglist3.LogRoots)
   368  	d.rootPool = x509util.NewPEMCertPool()
   369  
   370  	// Build clients for each of the Logs. Also build log-to-id map.
   371  	if err := d.buildLogClients(lcBuilder, d.usableLl); err != nil {
   372  		return nil, err
   373  	}
   374  	if err := d.buildLogClients(lcBuilder, d.pendingQualifiedLl); err != nil {
   375  		return nil, err
   376  	}
   377  
   378  	if mf == nil {
   379  		mf = monitoring.InertMetricFactory{}
   380  	}
   381  	distOnce.Do(func() { distInitMetrics(mf) })
   382  	return &d, nil
   383  }
   384  
   385  // buildLogClients builds clients for every Log provided and adds them into
   386  // Distributor internals.
   387  func (d *Distributor) buildLogClients(lcBuilder LogClientBuilder, ll *loglist3.LogList) error {
   388  	for _, op := range ll.Operators {
   389  		for _, log := range op.Logs {
   390  			lc, err := lcBuilder(log)
   391  			if err != nil {
   392  				return fmt.Errorf("failed to create log client for %s: %v", log.URL, err)
   393  			}
   394  			d.logClients[log.URL] = lc
   395  		}
   396  	}
   397  	return nil
   398  }
   399  

View as plain text