...

Source file src/github.com/google/certificate-transparency-go/scanner/scanner.go

Documentation: github.com/google/certificate-transparency-go/scanner

     1  // Copyright 2014 Google LLC. All Rights Reserved.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Package scanner holds code for iterating through the contents of a CT log.
    16  package scanner
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"sync"
    22  	"sync/atomic"
    23  	"time"
    24  
    25  	ct "github.com/google/certificate-transparency-go"
    26  	"github.com/google/certificate-transparency-go/x509"
    27  	"k8s.io/klog/v2"
    28  )
    29  
    30  // ScannerOptions holds configuration options for the Scanner.
    31  type ScannerOptions struct { // nolint:revive
    32  	FetcherOptions
    33  
    34  	// Custom matcher for x509 Certificates, functor will be called for each
    35  	// Certificate found during scanning. Should be a Matcher or LeafMatcher
    36  	// implementation.
    37  	Matcher interface{}
    38  
    39  	// Match precerts only (Matcher still applies to precerts).
    40  	PrecertOnly bool
    41  
    42  	// Number of concurrent matchers to run.
    43  	NumWorkers int
    44  
    45  	// Number of fetched entries to buffer on their way to the callbacks.
    46  	BufferSize int
    47  }
    48  
    49  // DefaultScannerOptions returns a new ScannerOptions with sensible defaults.
    50  func DefaultScannerOptions() *ScannerOptions {
    51  	return &ScannerOptions{
    52  		FetcherOptions: *DefaultFetcherOptions(),
    53  		Matcher:        &MatchAll{},
    54  		PrecertOnly:    false,
    55  		NumWorkers:     1,
    56  	}
    57  }
    58  
    59  // Scanner is a tool to scan all the entries in a CT Log.
    60  type Scanner struct {
    61  	// N.B. 64-bit fields must be first due to
    62  	// https://golang.org/pkg/sync/atomic/#pkg-note-BUG
    63  
    64  	// Counters of the number of certificates scanned and matched.
    65  	certsProcessed int64
    66  	certsMatched   int64
    67  
    68  	// Counter of the number of precertificates encountered during the scan.
    69  	precertsSeen int64
    70  
    71  	unparsableEntries         int64
    72  	entriesWithNonFatalErrors int64
    73  
    74  	fetcher *Fetcher
    75  
    76  	// Configuration options for this Scanner instance.
    77  	opts ScannerOptions
    78  }
    79  
    80  // entryInfo represents information about a log entry.
    81  type entryInfo struct {
    82  	// The index of the entry containing the LeafInput in the log.
    83  	index int64
    84  	// The log entry returned by the log server.
    85  	entry ct.LeafEntry
    86  }
    87  
    88  // Takes the error returned by either x509.ParseCertificate() or
    89  // x509.ParseTBSCertificate() and determines if it's non-fatal or otherwise.
    90  // In the case of non-fatal errors, the error will be logged,
    91  // entriesWithNonFatalErrors will be incremented, and the return value will be
    92  // false.
    93  // Fatal errors will cause the function to return true.
    94  // When err is nil, this method does nothing.
    95  func (s *Scanner) isCertErrorFatal(err error, logEntry *ct.LogEntry, index int64) bool {
    96  	if err == nil {
    97  		// No error to handle.
    98  		return false
    99  	} else if !x509.IsFatal(err) {
   100  		atomic.AddInt64(&s.entriesWithNonFatalErrors, 1)
   101  		// We'll make a note, but continue.
   102  		klog.V(1).Infof("Non-fatal error in %v at index %d: %v", logEntry.Leaf.TimestampedEntry.EntryType, index, err)
   103  		return false
   104  	}
   105  	return true
   106  }
   107  
   108  // Processes the given entry in the specified log.
   109  func (s *Scanner) processEntry(info entryInfo, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
   110  	atomic.AddInt64(&s.certsProcessed, 1)
   111  
   112  	switch matcher := s.opts.Matcher.(type) {
   113  	case Matcher:
   114  		return s.processMatcherEntry(matcher, info, foundCert, foundPrecert)
   115  	case LeafMatcher:
   116  		return s.processMatcherLeafEntry(matcher, info, foundCert, foundPrecert)
   117  	default:
   118  		return fmt.Errorf("unexpected matcher type %T", matcher)
   119  	}
   120  }
   121  
   122  func (s *Scanner) processMatcherEntry(matcher Matcher, info entryInfo, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
   123  	rawLogEntry, err := ct.RawLogEntryFromLeaf(info.index, &info.entry)
   124  	if err != nil {
   125  		return fmt.Errorf("failed to build raw log entry %d: %v", info.index, err)
   126  	}
   127  	// Matcher instances need the parsed [pre-]certificate.
   128  	logEntry, err := rawLogEntry.ToLogEntry()
   129  	if s.isCertErrorFatal(err, logEntry, info.index) {
   130  		return fmt.Errorf("failed to parse [pre-]certificate in MerkleTreeLeaf[%d]: %v", info.index, err)
   131  	}
   132  
   133  	switch {
   134  	case logEntry.X509Cert != nil:
   135  		if s.opts.PrecertOnly {
   136  			// Only interested in precerts and this is an X.509 cert, early-out.
   137  			return nil
   138  		}
   139  		if matcher.CertificateMatches(logEntry.X509Cert) {
   140  			atomic.AddInt64(&s.certsMatched, 1)
   141  			foundCert(rawLogEntry)
   142  		}
   143  	case logEntry.Precert != nil:
   144  		if matcher.PrecertificateMatches(logEntry.Precert) {
   145  			atomic.AddInt64(&s.certsMatched, 1)
   146  			foundPrecert(rawLogEntry)
   147  		}
   148  		atomic.AddInt64(&s.precertsSeen, 1)
   149  	default:
   150  		return fmt.Errorf("saw unknown entry type: %v", logEntry.Leaf.TimestampedEntry.EntryType)
   151  	}
   152  	return nil
   153  }
   154  
   155  func (s *Scanner) processMatcherLeafEntry(matcher LeafMatcher, info entryInfo, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
   156  	if !matcher.Matches(&info.entry) {
   157  		return nil
   158  	}
   159  
   160  	rawLogEntry, err := ct.RawLogEntryFromLeaf(info.index, &info.entry)
   161  	if rawLogEntry == nil {
   162  		return fmt.Errorf("failed to build raw log entry %d: %v", info.index, err)
   163  	}
   164  	switch eType := rawLogEntry.Leaf.TimestampedEntry.EntryType; eType {
   165  	case ct.X509LogEntryType:
   166  		if s.opts.PrecertOnly {
   167  			// Only interested in precerts and this is an X.509 cert, early-out.
   168  			return nil
   169  		}
   170  		foundCert(rawLogEntry)
   171  	case ct.PrecertLogEntryType:
   172  		foundPrecert(rawLogEntry)
   173  		atomic.AddInt64(&s.precertsSeen, 1)
   174  	default:
   175  		return fmt.Errorf("saw unknown entry type: %v", eType)
   176  	}
   177  	return nil
   178  }
   179  
   180  // Worker function to match certs.
   181  // Accepts MatcherJobs over the entries channel, and processes them.
   182  // Returns true over the done channel when the entries channel is closed.
   183  func (s *Scanner) matcherJob(entries <-chan entryInfo, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) {
   184  	for e := range entries {
   185  		if err := s.processEntry(e, foundCert, foundPrecert); err != nil {
   186  			atomic.AddInt64(&s.unparsableEntries, 1)
   187  			klog.Errorf("Failed to parse entry at index %d: %s", e.index, err.Error())
   188  		}
   189  	}
   190  }
   191  
   192  // Pretty prints the passed in duration into a human readable string.
   193  func humanTime(dur time.Duration) string {
   194  	hours := int(dur / time.Hour)
   195  	dur %= time.Hour
   196  	minutes := int(dur / time.Minute)
   197  	dur %= time.Minute
   198  	seconds := int(dur / time.Second)
   199  	s := ""
   200  	if hours > 0 {
   201  		s += fmt.Sprintf("%d hours ", hours)
   202  	}
   203  	if minutes > 0 {
   204  		s += fmt.Sprintf("%d minutes ", minutes)
   205  	}
   206  	if seconds > 0 || len(s) == 0 {
   207  		s += fmt.Sprintf("%d seconds ", seconds)
   208  	}
   209  	return s
   210  }
   211  
   212  func (s *Scanner) logThroughput(treeSize int64, stop <-chan bool) {
   213  	const wndSize = 15
   214  	wnd := make([]int64, wndSize)
   215  	wndTotal := int64(0)
   216  
   217  	ticker := time.NewTicker(time.Second)
   218  	defer ticker.Stop()
   219  
   220  	for slot, filled, prevCnt := 0, 0, int64(0); ; slot = (slot + 1) % wndSize {
   221  		select {
   222  		case <-stop:
   223  			return
   224  		case <-ticker.C:
   225  			certsCnt := atomic.LoadInt64(&s.certsProcessed)
   226  			certsMatched := atomic.LoadInt64(&s.certsMatched)
   227  
   228  			slotValue := certsCnt - prevCnt
   229  			wndTotal += slotValue - wnd[slot]
   230  			wnd[slot], prevCnt = slotValue, certsCnt
   231  
   232  			if filled < wndSize {
   233  				filled++
   234  			}
   235  
   236  			throughput := float64(wndTotal) / float64(filled)
   237  			remainingCerts := treeSize - int64(s.opts.StartIndex) - certsCnt
   238  			remainingSeconds := int(float64(remainingCerts) / throughput)
   239  			remainingString := humanTime(time.Duration(remainingSeconds) * time.Second)
   240  			klog.V(1).Infof("Processed: %d certs (to index %d), matched %d (%2.2f%%). Throughput (last %ds): %3.2f ETA: %s\n",
   241  				certsCnt, s.opts.StartIndex+certsCnt, certsMatched,
   242  				(100.0*float64(certsMatched))/float64(certsCnt),
   243  				filled, throughput, remainingString)
   244  		}
   245  	}
   246  }
   247  
   248  // Scan performs a scan against the Log. Blocks until the scan is complete.
   249  //
   250  // For each x509 certificate found, calls foundCert with the corresponding
   251  // LogEntry, which includes the index of the entry and the certificate.
   252  // For each precert found, calls foundPrecert with the corresponding LogEntry,
   253  // which includes the index of the entry and the precert.
   254  func (s *Scanner) Scan(ctx context.Context, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
   255  	_, err := s.ScanLog(ctx, foundCert, foundPrecert)
   256  	return err
   257  }
   258  
   259  // ScanLog performs a scan against the Log, returning the count of scanned entries.
   260  func (s *Scanner) ScanLog(ctx context.Context, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) (int64, error) {
   261  	klog.V(1).Infof("Starting up Scanner...")
   262  	s.certsProcessed = 0
   263  	s.certsMatched = 0
   264  	s.precertsSeen = 0
   265  	s.unparsableEntries = 0
   266  	s.entriesWithNonFatalErrors = 0
   267  
   268  	sth, err := s.fetcher.Prepare(ctx)
   269  	if err != nil {
   270  		return -1, err
   271  	}
   272  
   273  	startTime := time.Now()
   274  	stop := make(chan bool)
   275  	go s.logThroughput(int64(sth.TreeSize), stop)
   276  	defer func() {
   277  		stop <- true
   278  		close(stop)
   279  	}()
   280  
   281  	// Start matcher workers.
   282  	var wg sync.WaitGroup
   283  	entries := make(chan entryInfo, s.opts.BufferSize)
   284  	for w, cnt := 0, s.opts.NumWorkers; w < cnt; w++ {
   285  		wg.Add(1)
   286  		go func(idx int) {
   287  			defer wg.Done()
   288  			klog.V(1).Infof("Matcher %d starting", idx)
   289  			s.matcherJob(entries, foundCert, foundPrecert)
   290  			klog.V(1).Infof("Matcher %d finished", idx)
   291  		}(w)
   292  	}
   293  
   294  	flatten := func(b EntryBatch) {
   295  		for i, e := range b.Entries {
   296  			entries <- entryInfo{index: b.Start + int64(i), entry: e}
   297  		}
   298  	}
   299  	err = s.fetcher.Run(ctx, flatten)
   300  	close(entries) // Causes matcher workers to terminate.
   301  	wg.Wait()      // Wait until they terminate.
   302  	if err != nil {
   303  		return -1, err
   304  	}
   305  
   306  	klog.V(1).Infof("Completed %d certs in %s", atomic.LoadInt64(&s.certsProcessed), humanTime(time.Since(startTime)))
   307  	klog.V(1).Infof("Saw %d precerts", atomic.LoadInt64(&s.precertsSeen))
   308  	klog.V(1).Infof("Saw %d unparsable entries", atomic.LoadInt64(&s.unparsableEntries))
   309  	klog.V(1).Infof("Saw %d non-fatal errors", atomic.LoadInt64(&s.entriesWithNonFatalErrors))
   310  
   311  	return int64(s.fetcher.opts.EndIndex), nil
   312  }
   313  
   314  // NewScanner creates a Scanner instance using client to talk to the log,
   315  // taking configuration options from opts.
   316  func NewScanner(client LogClient, opts ScannerOptions) *Scanner {
   317  	var scanner Scanner
   318  	scanner.opts = opts
   319  	scanner.fetcher = NewFetcher(client, &scanner.opts.FetcherOptions)
   320  
   321  	// Set a default match-everything regex if none was provided.
   322  	if opts.Matcher == nil {
   323  		opts.Matcher = &MatchAll{}
   324  	}
   325  	return &scanner
   326  }
   327  

View as plain text