...

Source file src/github.com/sigstore/rekor/cmd/backfill-redis/main.go

Documentation: github.com/sigstore/rekor/cmd/backfill-redis

     1  // Copyright 2022 The Sigstore Authors.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  /*
    16  	backfill-redis is a script to populate the Redis index with entries
    17  	from Rekor. This is sometimes necessary because Redis caching is best
    18  	effort. If Redis returns an error, Rekor will not, and so sometimes
    19  	we need to backfill missing entries into Redis for the search API.
    20  
    21  	To run:
    22  	go run cmd/backfill-redis/main.go --rekor-address <address> \
    23  	    --hostname <redis-hostname> --port <redis-port> --concurrency <num-of-workers> \
    24  		--start <first index to backfill> --end <last index to backfill> [--dry-run]
    25  */
    26  
    27  package main
    28  
    29  import (
    30  	"bytes"
    31  	"context"
    32  	"crypto/tls"
    33  	"encoding/base64"
    34  	"errors"
    35  	"flag"
    36  	"fmt"
    37  	"log"
    38  	"os"
    39  	"os/signal"
    40  	"syscall"
    41  
    42  	"github.com/go-openapi/runtime"
    43  	"github.com/redis/go-redis/v9"
    44  	"golang.org/x/sync/errgroup"
    45  	"sigs.k8s.io/release-utils/version"
    46  
    47  	"github.com/sigstore/rekor/pkg/client"
    48  	"github.com/sigstore/rekor/pkg/generated/client/entries"
    49  	"github.com/sigstore/rekor/pkg/generated/models"
    50  	"github.com/sigstore/rekor/pkg/types"
    51  
    52  	// these imports are to call the packages' init methods
    53  	_ "github.com/sigstore/rekor/pkg/types/alpine/v0.0.1"
    54  	_ "github.com/sigstore/rekor/pkg/types/cose/v0.0.1"
    55  	_ "github.com/sigstore/rekor/pkg/types/dsse/v0.0.1"
    56  	_ "github.com/sigstore/rekor/pkg/types/hashedrekord/v0.0.1"
    57  	_ "github.com/sigstore/rekor/pkg/types/helm/v0.0.1"
    58  	_ "github.com/sigstore/rekor/pkg/types/intoto/v0.0.1"
    59  	_ "github.com/sigstore/rekor/pkg/types/intoto/v0.0.2"
    60  	_ "github.com/sigstore/rekor/pkg/types/jar/v0.0.1"
    61  	_ "github.com/sigstore/rekor/pkg/types/rekord/v0.0.1"
    62  	_ "github.com/sigstore/rekor/pkg/types/rfc3161/v0.0.1"
    63  	_ "github.com/sigstore/rekor/pkg/types/rpm/v0.0.1"
    64  	_ "github.com/sigstore/rekor/pkg/types/tuf/v0.0.1"
    65  )
    66  
    67  var (
    68  	redisHostname      = flag.String("hostname", "", "Hostname for Redis application")
    69  	redisPort          = flag.String("port", "", "Port to Redis application")
    70  	redisPassword      = flag.String("password", "", "Password for Redis authentication")
    71  	startIndex         = flag.Int("start", -1, "First index to backfill")
    72  	endIndex           = flag.Int("end", -1, "Last index to backfill")
    73  	enableTLS          = flag.Bool("enable-tls", false, "Enable TLS for Redis client")
    74  	insecureSkipVerify = flag.Bool("insecure-skip-verify", false, "Whether to skip TLS verification for Redis client or not")
    75  	rekorAddress       = flag.String("rekor-address", "", "Address for Rekor, e.g. https://rekor.sigstore.dev")
    76  	versionFlag        = flag.Bool("version", false, "Print the current version of Backfill Redis")
    77  	concurrency        = flag.Int("concurrency", 1, "Number of workers to use for backfill")
    78  	dryRun             = flag.Bool("dry-run", false, "Dry run - don't actually insert into Redis")
    79  )
    80  
    81  func main() {
    82  	flag.Parse()
    83  
    84  	versionInfo := version.GetVersionInfo()
    85  	if *versionFlag {
    86  		fmt.Println(versionInfo.String())
    87  		os.Exit(0)
    88  	}
    89  
    90  	if *redisHostname == "" {
    91  		log.Fatal("address must be set")
    92  	}
    93  	if *redisPort == "" {
    94  		log.Fatal("port must be set")
    95  	}
    96  	if *startIndex == -1 {
    97  		log.Fatal("start must be set to >=0")
    98  	}
    99  	if *endIndex == -1 {
   100  		log.Fatal("end must be set to >=0")
   101  	}
   102  	if *rekorAddress == "" {
   103  		log.Fatal("rekor-address must be set")
   104  	}
   105  
   106  	log.Printf("running backfill redis Version: %s GitCommit: %s BuildDate: %s", versionInfo.GitVersion, versionInfo.GitCommit, versionInfo.BuildDate)
   107  
   108  	redisClient := redisClient()
   109  
   110  	rekorClient, err := client.GetRekorClient(*rekorAddress)
   111  	if err != nil {
   112  		log.Fatalf("creating rekor client: %v", err)
   113  	}
   114  
   115  	ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
   116  	group, ctx := errgroup.WithContext(ctx)
   117  	group.SetLimit(*concurrency)
   118  
   119  	type result struct {
   120  		index      int
   121  		parseErrs  []error
   122  		insertErrs []error
   123  	}
   124  	var resultChan = make(chan result)
   125  	parseErrs := make([]int, 0)
   126  	insertErrs := make([]int, 0)
   127  
   128  	go func() {
   129  		for r := range resultChan {
   130  			if len(r.parseErrs) > 0 {
   131  				parseErrs = append(parseErrs, r.index)
   132  			}
   133  			if len(r.insertErrs) > 0 {
   134  				insertErrs = append(insertErrs, r.index)
   135  			}
   136  		}
   137  	}()
   138  
   139  	for i := *startIndex; i <= *endIndex; i++ {
   140  		index := i // capture loop variable for closure
   141  		group.Go(func() error {
   142  			params := entries.NewGetLogEntryByIndexParamsWithContext(ctx)
   143  			params.SetLogIndex(int64(index))
   144  			resp, err := rekorClient.Entries.GetLogEntryByIndex(params)
   145  			if err != nil {
   146  				// in case of sigterm, just return to exit gracefully
   147  				if errors.Is(err, context.Canceled) {
   148  					return nil
   149  				}
   150  				log.Fatalf("retrieving log uuid by index: %v", err)
   151  			}
   152  			var parseErrs []error
   153  			var insertErrs []error
   154  			for uuid, entry := range resp.Payload {
   155  				// uuid is the global UUID - tree ID and entry UUID
   156  				e, _, _, err := unmarshalEntryImpl(entry.Body.(string))
   157  				if err != nil {
   158  					parseErrs = append(parseErrs, fmt.Errorf("error unmarshalling entry for %s: %w", uuid, err))
   159  					continue
   160  				}
   161  				keys, err := e.IndexKeys()
   162  				if err != nil {
   163  					parseErrs = append(parseErrs, fmt.Errorf("error building index keys for %s: %w", uuid, err))
   164  					continue
   165  				}
   166  				for _, key := range keys {
   167  					// remove the key-value pair from the index in case it already exists
   168  					if err := removeFromIndex(ctx, redisClient, key, uuid); err != nil {
   169  						insertErrs = append(insertErrs, fmt.Errorf("error removing UUID %s with key %s: %w", uuid, key, err))
   170  					}
   171  					if err := addToIndex(ctx, redisClient, key, uuid); err != nil {
   172  						insertErrs = append(insertErrs, fmt.Errorf("error inserting UUID %s with key %s: %w", uuid, key, err))
   173  					}
   174  					fmt.Printf("Uploaded Redis entry %s, index %d, key %s\n", uuid, index, key)
   175  				}
   176  			}
   177  			if len(insertErrs) != 0 || len(parseErrs) != 0 {
   178  				fmt.Printf("Errors with log index %d:\n", index)
   179  				for _, e := range insertErrs {
   180  					fmt.Println(e)
   181  				}
   182  				for _, e := range parseErrs {
   183  					fmt.Println(e)
   184  				}
   185  			} else {
   186  				fmt.Printf("Completed log index %d\n", index)
   187  			}
   188  			resultChan <- result{
   189  				index:      index,
   190  				parseErrs:  parseErrs,
   191  				insertErrs: insertErrs,
   192  			}
   193  
   194  			return nil
   195  		})
   196  	}
   197  	err = group.Wait()
   198  	if err != nil {
   199  		log.Fatalf("error running backfill: %v", err)
   200  	}
   201  	close(resultChan)
   202  	fmt.Println("Backfill complete")
   203  	if len(parseErrs) > 0 {
   204  		fmt.Printf("Failed to parse %d entries: %v\n", len(parseErrs), parseErrs)
   205  	}
   206  	if len(insertErrs) > 0 {
   207  		fmt.Printf("Failed to insert/remove %d entries: %v\n", len(insertErrs), insertErrs)
   208  	}
   209  }
   210  
   211  func redisClient() *redis.Client {
   212  
   213  	opts := &redis.Options{
   214  		Addr:     fmt.Sprintf("%s:%s", *redisHostname, *redisPort),
   215  		Password: *redisPassword,
   216  		Network:  "tcp",
   217  		DB:       0, // default DB
   218  	}
   219  
   220  	// #nosec G402
   221  	if *enableTLS {
   222  		opts.TLSConfig = &tls.Config{
   223  			InsecureSkipVerify: *insecureSkipVerify, //nolint: gosec
   224  		}
   225  	}
   226  
   227  	return redis.NewClient(opts)
   228  }
   229  
   230  // unmarshalEntryImpl decodes the base64-encoded entry to a specific entry type (types.EntryImpl).
   231  // Taken from Cosign
   232  func unmarshalEntryImpl(e string) (types.EntryImpl, string, string, error) {
   233  	b, err := base64.StdEncoding.DecodeString(e)
   234  	if err != nil {
   235  		return nil, "", "", err
   236  	}
   237  
   238  	pe, err := models.UnmarshalProposedEntry(bytes.NewReader(b), runtime.JSONConsumer())
   239  	if err != nil {
   240  		return nil, "", "", err
   241  	}
   242  
   243  	entry, err := types.UnmarshalEntry(pe)
   244  	if err != nil {
   245  		return nil, "", "", err
   246  	}
   247  	return entry, pe.Kind(), entry.APIVersion(), nil
   248  }
   249  
   250  // removeFromIndex removes all occurrences of a value from a given key. This guards against
   251  // multiple invocations of backfilling creating duplicates.
   252  func removeFromIndex(ctx context.Context, redisClient *redis.Client, key, value string) error {
   253  	if *dryRun {
   254  		return nil
   255  	}
   256  	_, err := redisClient.LRem(ctx, key, 0, value).Result()
   257  	return err
   258  }
   259  
   260  // addToIndex pushes a value onto a key of type list.
   261  func addToIndex(ctx context.Context, redisClient *redis.Client, key, value string) error {
   262  	if *dryRun {
   263  		return nil
   264  	}
   265  	_, err := redisClient.LPush(ctx, key, value).Result()
   266  	return err
   267  }
   268  

View as plain text