...

Source file src/github.com/google/certificate-transparency-go/trillian/migrillian/core/controller.go

Documentation: github.com/google/certificate-transparency-go/trillian/migrillian/core

     1  // Copyright 2018 Google LLC. All Rights Reserved.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Package core provides transport-agnostic implementation of Migrillian tool.
    16  package core
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"math/rand"
    22  	"strconv"
    23  	"sync"
    24  	"time"
    25  
    26  	ct "github.com/google/certificate-transparency-go"
    27  	"github.com/google/certificate-transparency-go/client"
    28  	"github.com/google/certificate-transparency-go/scanner"
    29  	"github.com/google/certificate-transparency-go/trillian/migrillian/configpb"
    30  	"k8s.io/klog/v2"
    31  
    32  	"github.com/google/trillian/monitoring"
    33  	"github.com/google/trillian/util/clock"
    34  	"github.com/google/trillian/util/election2"
    35  	"github.com/transparency-dev/merkle/proof"
    36  	"github.com/transparency-dev/merkle/rfc6962"
    37  )
    38  
    39  var (
    40  	metrics     treeMetrics
    41  	metricsOnce sync.Once
    42  )
    43  
    44  // treeMetrics holds metrics keyed by Tree ID.
    45  type treeMetrics struct {
    46  	masterRuns       monitoring.Counter
    47  	masterCancels    monitoring.Counter
    48  	controllerStarts monitoring.Counter
    49  	isMaster         monitoring.Gauge
    50  	entriesFetched   monitoring.Counter
    51  	entriesSeen      monitoring.Counter
    52  	entriesStored    monitoring.Counter
    53  	sthTimestamp     monitoring.Gauge
    54  	sthTreeSize      monitoring.Gauge
    55  }
    56  
    57  // initMetrics creates metrics using the factory, if not yet created.
    58  func initMetrics(mf monitoring.MetricFactory) {
    59  	const treeID = "tree_id"
    60  	metricsOnce.Do(func() {
    61  		metrics = treeMetrics{
    62  			masterRuns:       mf.NewCounter("master_runs", "Number of mastership runs.", treeID),
    63  			masterCancels:    mf.NewCounter("master_cancels", "Number of unexpected mastership cancelations.", treeID),
    64  			controllerStarts: mf.NewCounter("controller_starts", "Number of Controller (re-)starts.", treeID),
    65  			isMaster:         mf.NewGauge("is_master", "The instance is currently the master.", treeID),
    66  			entriesFetched:   mf.NewCounter("entries_fetched", "Entries fetched from the source log.", treeID),
    67  			entriesSeen:      mf.NewCounter("entries_seen", "Entries seen by the submitters.", treeID),
    68  			entriesStored:    mf.NewCounter("entries_stored", "Entries successfully submitted to Trillian.", treeID),
    69  			sthTimestamp:     mf.NewGauge("sth_timestamp", "Timestamp of the last seen STH.", treeID),
    70  			sthTreeSize:      mf.NewGauge("sth_tree_size", "Tree size of the last seen STH.", treeID),
    71  		}
    72  	})
    73  }
    74  
    75  // Options holds configuration for a Controller.
    76  type Options struct {
    77  	scanner.FetcherOptions
    78  	Submitters         int
    79  	ChannelSize        int
    80  	NoConsistencyCheck bool
    81  	StartDelay         time.Duration
    82  	StopAfter          time.Duration
    83  }
    84  
    85  // OptionsFromConfig returns Options created from the passed in config.
    86  func OptionsFromConfig(cfg *configpb.MigrationConfig) Options {
    87  	opts := Options{
    88  		FetcherOptions: scanner.FetcherOptions{
    89  			BatchSize:     int(cfg.BatchSize),
    90  			ParallelFetch: int(cfg.NumFetchers),
    91  			StartIndex:    cfg.StartIndex,
    92  			EndIndex:      cfg.EndIndex,
    93  			Continuous:    cfg.IsContinuous,
    94  		},
    95  		Submitters:         int(cfg.NumSubmitters),
    96  		ChannelSize:        int(cfg.ChannelSize),
    97  		NoConsistencyCheck: cfg.NoConsistencyCheck,
    98  	}
    99  	if cfg.NumFetchers == 0 {
   100  		opts.ParallelFetch = 1
   101  	}
   102  	if cfg.NumSubmitters == 0 {
   103  		opts.Submitters = 1
   104  	}
   105  	return opts
   106  }
   107  
   108  // Controller coordinates migration from a CT log to a Trillian tree.
   109  type Controller struct {
   110  	opts     Options
   111  	ctClient *client.LogClient
   112  	plClient *PreorderedLogClient
   113  	ef       election2.Factory
   114  	label    string
   115  }
   116  
   117  // NewController creates a Controller configured by the passed in options, CT
   118  // and Trillian clients, and a master election factory.
   119  //
   120  // The passed in MetricFactory is used to create per-tree metrics, and it
   121  // should be the same for all instances. However, it is used only once.
   122  func NewController(
   123  	opts Options,
   124  	ctClient *client.LogClient,
   125  	plClient *PreorderedLogClient,
   126  	ef election2.Factory,
   127  	mf monitoring.MetricFactory,
   128  ) *Controller {
   129  	initMetrics(mf)
   130  	l := strconv.FormatInt(plClient.treeID, 10)
   131  	return &Controller{opts: opts, ctClient: ctClient, plClient: plClient, ef: ef, label: l}
   132  }
   133  
   134  // RunWhenMasterWithRestarts calls RunWhenMaster, and, if the migration is
   135  // configured with continuous mode, restarts it whenever it returns.
   136  func (c *Controller) RunWhenMasterWithRestarts(ctx context.Context) {
   137  	uri := c.ctClient.BaseURI()
   138  	treeID := c.plClient.treeID
   139  	for run := true; run; run = c.opts.Continuous && ctx.Err() == nil {
   140  		klog.Infof("Starting migration Controller (%d<-%q)", treeID, uri)
   141  		if err := c.RunWhenMaster(ctx); err != nil {
   142  			klog.Errorf("Controller.RunWhenMaster(%d<-%q): %v", treeID, uri, err)
   143  			continue
   144  		}
   145  		klog.Infof("Controller stopped (%d<-%q)", treeID, uri)
   146  	}
   147  }
   148  
   149  // RunWhenMaster is a master-elected version of Run method. It executes Run
   150  // whenever this instance captures mastership of the tree ID. As soon as the
   151  // instance stops being the master, Run is canceled. The method returns if a
   152  // severe error occurs, the passed in context is canceled, or fetching is
   153  // completed (in non-Continuous mode). Releases mastership when terminates.
   154  func (c *Controller) RunWhenMaster(ctx context.Context) error {
   155  	// Avoid thundering herd when starting multiple tasks on the same tree.
   156  	if err := sleepRandom(ctx, 0, c.opts.StartDelay); err != nil {
   157  		return err // The context has been canceled.
   158  	}
   159  
   160  	el, err := c.ef.NewElection(ctx, c.label)
   161  	if err != nil {
   162  		return err
   163  	}
   164  	metrics.isMaster.Set(0, c.label)
   165  	defer func(ctx context.Context) {
   166  		metrics.isMaster.Set(0, c.label)
   167  		if err := el.Close(ctx); err != nil {
   168  			klog.Warningf("%s: Election.Close(): %v", c.label, err)
   169  		}
   170  	}(ctx)
   171  
   172  	for {
   173  		if err := el.Await(ctx); err != nil {
   174  			return err
   175  		}
   176  		metrics.isMaster.Set(1, c.label)
   177  
   178  		mctx, err := el.WithMastership(ctx)
   179  		if err != nil {
   180  			return err
   181  		} else if err := mctx.Err(); err != nil {
   182  			return err
   183  		}
   184  
   185  		klog.Infof("%s: running as master", c.label)
   186  		metrics.masterRuns.Inc(c.label)
   187  
   188  		// Run while still master (or until an error).
   189  		err = c.runWithRestarts(mctx)
   190  		if ctx.Err() != nil {
   191  			// We have been externally canceled, so return the current error (which
   192  			// could be nil or a cancelation-related error).
   193  			return err
   194  		} else if mctx.Err() == nil {
   195  			// We are still the master, so try to resign and emit the real error.
   196  			if rerr := el.Resign(ctx); rerr != nil {
   197  				klog.Errorf("%s: Election.Resign(): %v", c.label, rerr)
   198  			}
   199  			return err
   200  		}
   201  
   202  		// Otherwise the mastership has been canceled, retry.
   203  		metrics.isMaster.Set(0, c.label)
   204  		metrics.masterCancels.Inc(c.label)
   205  	}
   206  }
   207  
   208  // runWithRestarts calls Run until it succeeds or the context is done, in
   209  // continuous mode. For non-continuous mode it is simply equivalent to Run.
   210  func (c *Controller) runWithRestarts(ctx context.Context) error {
   211  	err := c.Run(ctx)
   212  	if !c.opts.Continuous {
   213  		return err
   214  	}
   215  	for err != nil && ctx.Err() == nil {
   216  		klog.Errorf("%s: Controller.Run: %v", c.label, err)
   217  		if slerr := sleepRandom(ctx, 0, c.opts.StartDelay); slerr == nil {
   218  			err = c.Run(ctx)
   219  		}
   220  	}
   221  	return ctx.Err()
   222  }
   223  
   224  // Run transfers CT log entries obtained via the CT log client to a Trillian
   225  // pre-ordered log via Trillian client. If Options.Continuous is true then the
   226  // migration process runs continuously trying to keep up with the target CT
   227  // log. Returns if an error occurs, the context is canceled, or all the entries
   228  // have been transferred (in non-Continuous mode).
   229  func (c *Controller) Run(ctx context.Context) error {
   230  	metrics.controllerStarts.Inc(c.label)
   231  	stopAfter := randDuration(c.opts.StopAfter, c.opts.StopAfter)
   232  	start := time.Now()
   233  
   234  	// Note: Non-continuous runs are not affected by StopAfter.
   235  	pos, err := c.fetchTail(ctx, 0)
   236  	if err != nil {
   237  		return err
   238  	}
   239  	if !c.opts.Continuous {
   240  		return nil
   241  	}
   242  
   243  	for stopAfter == 0 || time.Since(start) < stopAfter {
   244  		// TODO(pavelkalinnikov): Integrate runWithRestarts here.
   245  		next, err := c.fetchTail(ctx, pos)
   246  		if err != nil {
   247  			return err
   248  		}
   249  		if next == pos {
   250  			// TODO(pavelkalinnikov): Pause with accordance to the rate of growth.
   251  			// TODO(pavelkalinnikov): Make the duration configurable.
   252  			if err := clock.SleepContext(ctx, 30*time.Second); err != nil {
   253  				return err
   254  			}
   255  		}
   256  		pos = next
   257  	}
   258  
   259  	return nil
   260  }
   261  
   262  // fetchTail transfers entries within the range specified in FetcherConfig,
   263  // with respect to the passed in minimal position to start from, and the
   264  // current tree size obtained from an STH.
   265  func (c *Controller) fetchTail(ctx context.Context, begin uint64) (uint64, error) {
   266  	treeSize, rootHash, err := c.plClient.getRoot(ctx)
   267  	if err != nil {
   268  		return 0, err
   269  	}
   270  
   271  	fo := c.opts.FetcherOptions
   272  	if fo.Continuous { // Ignore range parameters in continuous mode.
   273  		fo.StartIndex, fo.EndIndex = int64(treeSize), 0
   274  		// Use non-continuous Fetcher, as we implement continuity in Controller.
   275  		// TODO(pavelkalinnikov): Don't overload Fetcher's Continuous flag.
   276  		fo.Continuous = false
   277  	} else if fo.StartIndex < 0 {
   278  		fo.StartIndex = int64(treeSize)
   279  	}
   280  	if int64(begin) > fo.StartIndex {
   281  		fo.StartIndex = int64(begin)
   282  	}
   283  	klog.Infof("%s: fetching range [%d, %d)", c.label, fo.StartIndex, fo.EndIndex)
   284  
   285  	fetcher := scanner.NewFetcher(c.ctClient, &fo)
   286  	sth, err := fetcher.Prepare(ctx)
   287  	if err != nil {
   288  		return 0, err
   289  	}
   290  	metrics.sthTimestamp.Set(float64(sth.Timestamp), c.label)
   291  	metrics.sthTreeSize.Set(float64(sth.TreeSize), c.label)
   292  	if sth.TreeSize <= begin {
   293  		return begin, nil
   294  	}
   295  
   296  	if err := c.verifyConsistency(ctx, treeSize, rootHash, sth); err != nil {
   297  		return 0, err
   298  	}
   299  
   300  	var wg sync.WaitGroup
   301  	batches := make(chan scanner.EntryBatch, c.opts.ChannelSize)
   302  	cctx, cancel := context.WithCancel(ctx)
   303  	defer cancel()
   304  
   305  	for w, cnt := 0, c.opts.Submitters; w < cnt; w++ {
   306  		wg.Add(1)
   307  		go func() {
   308  			defer wg.Done()
   309  			if err := c.runSubmitter(cctx, batches); err != nil {
   310  				klog.Errorf("%s: Stopping due to submitter error: %v", c.label, err)
   311  				cancel() // Stop the other submitters and the Fetcher.
   312  			}
   313  		}()
   314  	}
   315  
   316  	handler := func(b scanner.EntryBatch) {
   317  		metrics.entriesFetched.Add(float64(len(b.Entries)), c.label)
   318  		select {
   319  		case batches <- b:
   320  		case <-cctx.Done(): // Avoid deadlock when shutting down.
   321  		}
   322  	}
   323  
   324  	err = fetcher.Run(cctx, handler)
   325  	close(batches)
   326  	wg.Wait()
   327  	if err != nil {
   328  		return 0, err
   329  	}
   330  	// Run may have returned nil despite a cancel() call.
   331  	if err := cctx.Err(); err != nil {
   332  		return 0, fmt.Errorf("failed to fetch and submit the entire tail: %v", err)
   333  	}
   334  	return sth.TreeSize, nil
   335  }
   336  
   337  // verifyConsistency checks that the provided verified Trillian root is
   338  // consistent with the CT log's STH.
   339  func (c *Controller) verifyConsistency(ctx context.Context, treeSize uint64, rootHash []byte, sth *ct.SignedTreeHead) error {
   340  	if treeSize == 0 {
   341  		// Any head is consistent with empty root -- unnecessary to request empty proof.
   342  		return nil
   343  	}
   344  	if c.opts.NoConsistencyCheck {
   345  		klog.Warningf("%s: skipping consistency check", c.label)
   346  		return nil
   347  	}
   348  	pf, err := c.ctClient.GetSTHConsistency(ctx, treeSize, sth.TreeSize)
   349  	if err != nil {
   350  		return err
   351  	}
   352  	return proof.VerifyConsistency(rfc6962.DefaultHasher, treeSize, sth.TreeSize,
   353  		pf, rootHash, sth.SHA256RootHash[:])
   354  }
   355  
   356  // runSubmitter obtains CT log entry batches from the controller's channel and
   357  // submits them through Trillian client. Returns when the channel is closed, or
   358  // the client returns a non-recoverable error (an example of a recoverable
   359  // error is when Trillian write quota is exceeded).
   360  func (c *Controller) runSubmitter(ctx context.Context, batches <-chan scanner.EntryBatch) error {
   361  	for b := range batches {
   362  		entries := float64(len(b.Entries))
   363  		metrics.entriesSeen.Add(entries, c.label)
   364  
   365  		end := b.Start + int64(len(b.Entries))
   366  		if err := c.plClient.addSequencedLeaves(ctx, &b); err != nil {
   367  			// addSequencedLeaves failed to submit entries despite retries. At this
   368  			// point there is not much we can do. Seemingly the best strategy is to
   369  			// shut down the Controller.
   370  			return fmt.Errorf("failed to add batch [%d, %d): %v", b.Start, end, err)
   371  		}
   372  		klog.Infof("%s: added batch [%d, %d)", c.label, b.Start, end)
   373  		metrics.entriesStored.Add(entries, c.label)
   374  	}
   375  	return nil
   376  }
   377  
   378  // sleepRandom sleeps for random duration in [base, base+spread).
   379  func sleepRandom(ctx context.Context, base, spread time.Duration) error {
   380  	d := randDuration(base, spread)
   381  	if d == 0 {
   382  		return nil
   383  	}
   384  	return clock.SleepContext(ctx, d)
   385  }
   386  
   387  // randDuration returns a random duration in [base, base+spread).
   388  func randDuration(base, spread time.Duration) time.Duration {
   389  	d := base
   390  	if spread != 0 {
   391  		d += time.Duration(rand.Int63n(int64(spread)))
   392  	}
   393  	return d
   394  }
   395  

View as plain text