...

Source file src/github.com/google/certificate-transparency-go/scanner/fetcher.go

Documentation: github.com/google/certificate-transparency-go/scanner

     1  // Copyright 2018 Google LLC. All Rights Reserved.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package scanner
    16  
    17  import (
    18  	"context"
    19  	"net/http"
    20  	"sync"
    21  	"time"
    22  
    23  	ct "github.com/google/certificate-transparency-go"
    24  	"github.com/google/certificate-transparency-go/jsonclient"
    25  	"github.com/google/trillian/client/backoff"
    26  	"k8s.io/klog/v2"
    27  )
    28  
    29  // LogClient implements the subset of CT log API that the Fetcher uses.
    30  type LogClient interface {
    31  	BaseURI() string
    32  	GetSTH(context.Context) (*ct.SignedTreeHead, error)
    33  	GetRawEntries(ctx context.Context, start, end int64) (*ct.GetEntriesResponse, error)
    34  }
    35  
    36  // FetcherOptions holds configuration options for the Fetcher.
    37  type FetcherOptions struct {
    38  	// Number of entries to request in one batch from the Log.
    39  	BatchSize int
    40  
    41  	// Number of concurrent fetcher workers to run.
    42  	ParallelFetch int
    43  
    44  	// [StartIndex, EndIndex) is a log entry range to fetch. If EndIndex == 0,
    45  	// then it gets reassigned to sth.TreeSize.
    46  	StartIndex int64
    47  	EndIndex   int64
    48  
    49  	// Continuous determines whether Fetcher should run indefinitely after
    50  	// reaching EndIndex.
    51  	Continuous bool
    52  }
    53  
    54  // DefaultFetcherOptions returns new FetcherOptions with sensible defaults.
    55  func DefaultFetcherOptions() *FetcherOptions {
    56  	return &FetcherOptions{
    57  		BatchSize:     1000,
    58  		ParallelFetch: 1,
    59  		StartIndex:    0,
    60  		EndIndex:      0,
    61  		Continuous:    false,
    62  	}
    63  }
    64  
    65  // Fetcher is a tool that fetches entries from a CT Log.
    66  type Fetcher struct {
    67  	// Base URI of the CT log, for diagnostics.
    68  	uri string
    69  	// Client used to talk to the CT log instance.
    70  	client LogClient
    71  	// Configuration options for this Fetcher instance.
    72  	opts *FetcherOptions
    73  
    74  	// Current STH of the Log this Fetcher sends queries to.
    75  	sth *ct.SignedTreeHead
    76  	// The STH retrieval backoff state. Used only in Continuous fetch mode.
    77  	sthBackoff *backoff.Backoff
    78  
    79  	// Stops range generator, which causes the Fetcher to terminate gracefully.
    80  	mu     sync.Mutex
    81  	cancel context.CancelFunc
    82  }
    83  
    84  // EntryBatch represents a contiguous range of entries of the Log.
    85  type EntryBatch struct {
    86  	Start   int64          // LeafIndex of the first entry in the range.
    87  	Entries []ct.LeafEntry // Entries of the range.
    88  }
    89  
    90  // fetchRange represents a range of certs to fetch from a CT log.
    91  type fetchRange struct {
    92  	start int64 // inclusive
    93  	end   int64 // inclusive
    94  }
    95  
    96  // NewFetcher creates a Fetcher instance using client to talk to the log,
    97  // taking configuration options from opts.
    98  func NewFetcher(client LogClient, opts *FetcherOptions) *Fetcher {
    99  	cancel := func() {} // Protect against calling Stop before Run.
   100  	return &Fetcher{
   101  		uri:    client.BaseURI(),
   102  		client: client,
   103  		opts:   opts,
   104  		cancel: cancel,
   105  	}
   106  }
   107  
   108  // Prepare caches the latest Log's STH if not present and returns it. It also
   109  // adjusts the entry range to fit the size of the tree.
   110  func (f *Fetcher) Prepare(ctx context.Context) (*ct.SignedTreeHead, error) {
   111  	if f.sth != nil {
   112  		return f.sth, nil
   113  	}
   114  
   115  	sth, err := f.client.GetSTH(ctx)
   116  	if err != nil {
   117  		klog.Errorf("%s: GetSTH() failed: %v", f.uri, err)
   118  		return nil, err
   119  	}
   120  	klog.V(1).Infof("%s: Got STH with %d certs", f.uri, sth.TreeSize)
   121  
   122  	if size := int64(sth.TreeSize); f.opts.EndIndex == 0 || f.opts.EndIndex > size {
   123  		klog.V(1).Infof("%s: Reset EndIndex from %d to %d", f.uri, f.opts.EndIndex, size)
   124  		f.opts.EndIndex = size
   125  	}
   126  	f.sth = sth
   127  	return sth, nil
   128  }
   129  
   130  // Run performs fetching of the Log. Blocks until scanning is complete, the
   131  // passed in context is canceled, or Stop is called (and pending work is
   132  // finished). For each successfully fetched batch, runs the fn callback.
   133  func (f *Fetcher) Run(ctx context.Context, fn func(EntryBatch)) error {
   134  	klog.V(1).Infof("%s: Starting up Fetcher...", f.uri)
   135  	if _, err := f.Prepare(ctx); err != nil {
   136  		return err
   137  	}
   138  
   139  	cctx, cancel := context.WithCancel(ctx)
   140  	defer cancel()
   141  
   142  	f.mu.Lock()
   143  	f.cancel = cancel
   144  	f.mu.Unlock()
   145  
   146  	// Use a separately-cancelable context for the range generator, so we can
   147  	// close it down (in Stop) but still let the fetchers below run to
   148  	// completion.
   149  	ranges := f.genRanges(cctx)
   150  
   151  	// Run fetcher workers.
   152  	var wg sync.WaitGroup
   153  	for w, cnt := 0, f.opts.ParallelFetch; w < cnt; w++ {
   154  		wg.Add(1)
   155  		go func(idx int) {
   156  			defer wg.Done()
   157  			klog.V(1).Infof("%s: Fetcher worker %d starting...", f.uri, idx)
   158  			f.runWorker(ctx, ranges, fn)
   159  			klog.V(1).Infof("%s: Fetcher worker %d finished", f.uri, idx)
   160  		}(w)
   161  	}
   162  	wg.Wait()
   163  
   164  	klog.V(1).Infof("%s: Fetcher terminated", f.uri)
   165  	return nil
   166  }
   167  
   168  // Stop causes the Fetcher to terminate gracefully. After this call Run will
   169  // try to finish all the started fetches, and then return. Does nothing if
   170  // there was no preceding Run invocation.
   171  func (f *Fetcher) Stop() {
   172  	f.mu.Lock()
   173  	defer f.mu.Unlock()
   174  	f.cancel()
   175  }
   176  
   177  // genRanges returns a channel of ranges to fetch, and starts a goroutine that
   178  // sends things down this channel. The goroutine terminates when all ranges
   179  // have been generated, or if context is cancelled.
   180  func (f *Fetcher) genRanges(ctx context.Context) <-chan fetchRange {
   181  	batch := int64(f.opts.BatchSize)
   182  	ranges := make(chan fetchRange)
   183  
   184  	go func() {
   185  		klog.V(1).Infof("%s: Range generator starting", f.uri)
   186  		defer klog.V(1).Infof("%s: Range generator finished", f.uri)
   187  		defer close(ranges)
   188  		start, end := f.opts.StartIndex, f.opts.EndIndex
   189  
   190  		for start < end || f.opts.Continuous {
   191  			// In continuous mode wait for bigger STH every time we reach the end,
   192  			// including, possibly, the very first iteration.
   193  			if start == end { // Implies f.opts.Continuous == true.
   194  				if err := f.updateSTH(ctx); err != nil {
   195  					klog.Warningf("%s: Failed to obtain bigger STH: %v", f.uri, err)
   196  					return
   197  				}
   198  				end = f.opts.EndIndex
   199  			}
   200  
   201  			batchEnd := start + min(end-start, batch)
   202  			next := fetchRange{start, batchEnd - 1}
   203  			select {
   204  			case <-ctx.Done():
   205  				klog.Warningf("%s: Cancelling genRanges: %v", f.uri, ctx.Err())
   206  				return
   207  			case ranges <- next:
   208  			}
   209  			start = batchEnd
   210  		}
   211  	}()
   212  
   213  	return ranges
   214  }
   215  
   216  // updateSTH waits until a bigger STH is discovered, and updates the Fetcher
   217  // accordingly. It is optimized for both bulk-load (new STH is way bigger then
   218  // the last one) and keep-up (STH grows slowly) modes of operation. Waits for
   219  // some time until the STH grows enough to request a full batch, but falls back
   220  // to *any* STH bigger than the old one if it takes too long.
   221  // Returns error only if the context is cancelled.
   222  func (f *Fetcher) updateSTH(ctx context.Context) error {
   223  	// TODO(pavelkalinnikov): Make these parameters tunable.
   224  	const quickDur = 45 * time.Second
   225  	if f.sthBackoff == nil {
   226  		f.sthBackoff = &backoff.Backoff{
   227  			Min:    1 * time.Second,
   228  			Max:    30 * time.Second,
   229  			Factor: 2,
   230  			Jitter: true,
   231  		}
   232  	}
   233  
   234  	lastSize := uint64(f.opts.EndIndex)
   235  	targetSize := lastSize + uint64(f.opts.BatchSize)
   236  	quickDeadline := time.Now().Add(quickDur)
   237  
   238  	return f.sthBackoff.Retry(ctx, func() error {
   239  		sth, err := f.client.GetSTH(ctx)
   240  		if err != nil {
   241  			return backoff.RetriableErrorf("GetSTH: %v", err)
   242  		}
   243  		klog.V(2).Infof("%s: Got STH with %d certs", f.uri, sth.TreeSize)
   244  
   245  		quick := time.Now().Before(quickDeadline)
   246  		if sth.TreeSize <= lastSize || quick && sth.TreeSize < targetSize {
   247  			return backoff.RetriableErrorf("wait for bigger STH than %d (last=%d, target=%d)", sth.TreeSize, lastSize, targetSize)
   248  		}
   249  
   250  		if quick {
   251  			f.sthBackoff.Reset() // Growth is presumably fast, set next pause to Min.
   252  		}
   253  		f.sth = sth
   254  		f.opts.EndIndex = int64(sth.TreeSize)
   255  		return nil
   256  	})
   257  }
   258  
   259  // runWorker is a worker function for handling fetcher ranges.
   260  // Accepts cert ranges to fetch over the ranges channel, and if the fetch is
   261  // successful sends the corresponding EntryBatch through the fn callback. Will
   262  // retry failed attempts to retrieve ranges until the context is cancelled.
   263  func (f *Fetcher) runWorker(ctx context.Context, ranges <-chan fetchRange, fn func(EntryBatch)) {
   264  	for r := range ranges {
   265  		// Logs MAY return fewer than the number of leaves requested. Only complete
   266  		// if we actually got all the leaves we were expecting.
   267  		for r.start <= r.end {
   268  			if ctx.Err() != nil { // Prevent spinning when context is canceled.
   269  				return
   270  			}
   271  			// TODO(pavelkalinnikov): Make these parameters tunable.
   272  			// This backoff will only apply to a single request and be reset for the next one.
   273  			// This precludes reaching some kind of stability in request rate, but means that
   274  			// an intermittent problem won't harm long-term running of the worker.
   275  			bo := &backoff.Backoff{
   276  				Min:    1 * time.Second,
   277  				Max:    30 * time.Second,
   278  				Factor: 2,
   279  				Jitter: true,
   280  			}
   281  
   282  			var resp *ct.GetEntriesResponse
   283  			// TODO(pavelkalinnikov): Report errors in a LogClient decorator on failure.
   284  			if err := bo.Retry(ctx, func() error {
   285  				var err error
   286  				resp, err = f.client.GetRawEntries(ctx, r.start, r.end)
   287  				return err
   288  			}); err != nil {
   289  				if rspErr, isRspErr := err.(jsonclient.RspError); isRspErr && rspErr.StatusCode == http.StatusTooManyRequests {
   290  					klog.V(2).Infof("%s: GetRawEntries() failed: %v", f.uri, err)
   291  				} else {
   292  					klog.Errorf("%s: GetRawEntries() failed: %v", f.uri, err)
   293  				}
   294  				// There is no error reporting yet for this worker, so just retry again.
   295  				continue
   296  			}
   297  			fn(EntryBatch{Start: r.start, Entries: resp.Entries})
   298  			r.start += int64(len(resp.Entries))
   299  		}
   300  	}
   301  }
   302  
   303  func min(a, b int64) int64 {
   304  	if a < b {
   305  		return a
   306  	}
   307  	return b
   308  }
   309  

View as plain text