...

Source file src/github.com/sigstore/rekor/pkg/witness/publish_checkpoint.go

Documentation: github.com/sigstore/rekor/pkg/witness

     1  // Copyright 2023 The Sigstore Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package witness
    16  
    17  import (
    18  	"context"
    19  	"encoding/hex"
    20  	"fmt"
    21  	"strconv"
    22  	"time"
    23  
    24  	"github.com/google/trillian"
    25  	"github.com/google/trillian/types"
    26  	"github.com/prometheus/client_golang/prometheus"
    27  	"github.com/redis/go-redis/v9"
    28  	"github.com/sigstore/rekor/pkg/log"
    29  	"github.com/sigstore/rekor/pkg/trillianclient"
    30  	"github.com/sigstore/rekor/pkg/util"
    31  	"github.com/sigstore/sigstore/pkg/signature"
    32  	"google.golang.org/grpc/codes"
    33  )
    34  
    35  // CheckpointPublisher is a long-running job to periodically publish signed checkpoints to etc.d
    36  type CheckpointPublisher struct {
    37  	ctx context.Context
    38  	// logClient is the client for Trillian
    39  	logClient trillian.TrillianLogClient
    40  	// treeID is used to construct the origin and configure the Trillian client
    41  	treeID int64
    42  	// hostname is used to construct the origin ("hostname - treeID")
    43  	hostname string
    44  	// signer signs the checkpoint
    45  	signer signature.Signer
    46  	// publishFreq is how often a new checkpoint is published to Rekor, in minutes
    47  	checkpointFreq uint
    48  	// redisClient to upload signed checkpoints
    49  	redisClient *redis.Client
    50  	// reqCounter tracks successes and failures for publishing
    51  	reqCounter *prometheus.CounterVec
    52  }
    53  
    54  // Constant values used with metrics
    55  const (
    56  	Success = iota
    57  	SuccessObtainLock
    58  	GetCheckpoint
    59  	UnmarshalCheckpoint
    60  	SignCheckpoint
    61  	RedisFailure
    62  	RedisLatestFailure
    63  )
    64  
    65  // NewCheckpointPublisher creates a CheckpointPublisher to write stable checkpoints to Redis
    66  func NewCheckpointPublisher(ctx context.Context,
    67  	logClient trillian.TrillianLogClient,
    68  	treeID int64,
    69  	hostname string,
    70  	signer signature.Signer,
    71  	redisClient *redis.Client,
    72  	checkpointFreq uint,
    73  	reqCounter *prometheus.CounterVec) CheckpointPublisher {
    74  	return CheckpointPublisher{ctx: ctx, logClient: logClient, treeID: treeID, hostname: hostname,
    75  		signer: signer, checkpointFreq: checkpointFreq, redisClient: redisClient, reqCounter: reqCounter}
    76  }
    77  
    78  // StartPublisher creates a long-running task that publishes the latest checkpoint every X minutes
    79  // Writing to Redis is best effort. Failure will be detected either through metrics or by witnesses
    80  // or Verifiers monitoring for fresh checkpoints. Failure can occur after a lock is obtained but
    81  // before publishing the latest checkpoint. If this occurs due to a sporadic failure, this simply
    82  // means that a witness will not see a fresh checkpoint for an additional period.
    83  func (c *CheckpointPublisher) StartPublisher(ctx context.Context) {
    84  	tc := trillianclient.NewTrillianClient(context.Background(), c.logClient, c.treeID)
    85  	sTreeID := strconv.FormatInt(c.treeID, 10)
    86  
    87  	// publish on startup to ensure a checkpoint is available the first time Rekor starts up
    88  	c.publish(&tc, sTreeID)
    89  
    90  	ticker := time.NewTicker(time.Duration(c.checkpointFreq) * time.Minute)
    91  	go func() {
    92  		for {
    93  			select {
    94  			case <-ctx.Done():
    95  				return
    96  			case <-ticker.C:
    97  				c.publish(&tc, sTreeID)
    98  			}
    99  		}
   100  	}()
   101  }
   102  
   103  // publish publishes the latest checkpoint to Redis once
   104  func (c *CheckpointPublisher) publish(tc *trillianclient.TrillianClient, sTreeID string) {
   105  	// get latest checkpoint
   106  	resp := tc.GetLatest(0)
   107  	if resp.Status != codes.OK {
   108  		c.reqCounter.With(
   109  			map[string]string{
   110  				"shard": sTreeID,
   111  				"code":  strconv.Itoa(GetCheckpoint),
   112  			}).Inc()
   113  		log.Logger.Errorf("error getting latest checkpoint to publish: %v", resp.Status)
   114  		return
   115  	}
   116  
   117  	// unmarshal checkpoint
   118  	root := &types.LogRootV1{}
   119  	if err := root.UnmarshalBinary(resp.GetLatestResult.SignedLogRoot.LogRoot); err != nil {
   120  		c.reqCounter.With(
   121  			map[string]string{
   122  				"shard": sTreeID,
   123  				"code":  strconv.Itoa(UnmarshalCheckpoint),
   124  			}).Inc()
   125  		log.Logger.Errorf("error unmarshalling latest checkpoint to publish: %v", err)
   126  		return
   127  	}
   128  
   129  	// sign checkpoint with Rekor private key
   130  	checkpoint, err := util.CreateAndSignCheckpoint(context.Background(), c.hostname, c.treeID, root.TreeSize, root.RootHash, c.signer)
   131  	if err != nil {
   132  		c.reqCounter.With(
   133  			map[string]string{
   134  				"shard": sTreeID,
   135  				"code":  strconv.Itoa(SignCheckpoint),
   136  			}).Inc()
   137  		log.Logger.Errorf("error signing checkpoint to publish: %v", err)
   138  		return
   139  	}
   140  
   141  	// encode checkpoint as hex to write to redis
   142  	hexCP := hex.EncodeToString(checkpoint)
   143  
   144  	// write checkpoint to Redis if key does not yet exist
   145  	// this prevents multiple instances of Rekor from writing different checkpoints in the same time window
   146  	ts := time.Now().Truncate(time.Duration(c.checkpointFreq) * time.Minute).UnixNano()
   147  	// key is treeID/timestamp, where timestamp is rounded down to the nearest X minutes
   148  	key := fmt.Sprintf("%d/%d", c.treeID, ts)
   149  	ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
   150  	defer cancel()
   151  
   152  	// return value ignored, which is whether or not the entry was set
   153  	// no error is thrown if the key already exists
   154  	// use smallest value as it's unused
   155  	value := true
   156  	successNX, err := c.redisClient.SetNX(ctx, key, value, 0).Result()
   157  	if err != nil {
   158  		c.reqCounter.With(
   159  			map[string]string{
   160  				"shard": sTreeID,
   161  				"code":  strconv.Itoa(RedisFailure),
   162  			}).Inc()
   163  		log.Logger.Errorf("error with client publishing checkpoint: %v", err)
   164  		return
   165  	}
   166  	// if the key was not set, then the key already exists for this time period
   167  	if !successNX {
   168  		return
   169  	}
   170  
   171  	// successful obtaining of lock for time period
   172  	c.reqCounter.With(
   173  		map[string]string{
   174  			"shard": sTreeID,
   175  			"code":  strconv.Itoa(SuccessObtainLock),
   176  		}).Inc()
   177  
   178  	// on successfully obtaining the "lock" for the time window, update latest checkpoint
   179  	latestKey := fmt.Sprintf("%d/latest", c.treeID)
   180  	latestCtx, latestCancel := context.WithTimeout(c.ctx, 10*time.Second)
   181  	defer latestCancel()
   182  
   183  	// return value ignored, which is whether or not the entry was set
   184  	// no error is thrown if the key already exists
   185  	if _, err = c.redisClient.Set(latestCtx, latestKey, hexCP, 0).Result(); err != nil {
   186  		c.reqCounter.With(
   187  			map[string]string{
   188  				"shard": sTreeID,
   189  				"code":  strconv.Itoa(RedisLatestFailure),
   190  			}).Inc()
   191  		log.Logger.Errorf("error with client publishing latest checkpoint: %v", err)
   192  		return
   193  	}
   194  
   195  	// successful publish
   196  	c.reqCounter.With(
   197  		map[string]string{
   198  			"shard": sTreeID,
   199  			"code":  strconv.Itoa(Success),
   200  		}).Inc()
   201  }
   202  

View as plain text