1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
26
27 package main
28
29 import (
30 "bytes"
31 "context"
32 "crypto/tls"
33 "encoding/base64"
34 "errors"
35 "flag"
36 "fmt"
37 "log"
38 "os"
39 "os/signal"
40 "syscall"
41
42 "github.com/go-openapi/runtime"
43 "github.com/redis/go-redis/v9"
44 "golang.org/x/sync/errgroup"
45 "sigs.k8s.io/release-utils/version"
46
47 "github.com/sigstore/rekor/pkg/client"
48 "github.com/sigstore/rekor/pkg/generated/client/entries"
49 "github.com/sigstore/rekor/pkg/generated/models"
50 "github.com/sigstore/rekor/pkg/types"
51
52
53 _ "github.com/sigstore/rekor/pkg/types/alpine/v0.0.1"
54 _ "github.com/sigstore/rekor/pkg/types/cose/v0.0.1"
55 _ "github.com/sigstore/rekor/pkg/types/dsse/v0.0.1"
56 _ "github.com/sigstore/rekor/pkg/types/hashedrekord/v0.0.1"
57 _ "github.com/sigstore/rekor/pkg/types/helm/v0.0.1"
58 _ "github.com/sigstore/rekor/pkg/types/intoto/v0.0.1"
59 _ "github.com/sigstore/rekor/pkg/types/intoto/v0.0.2"
60 _ "github.com/sigstore/rekor/pkg/types/jar/v0.0.1"
61 _ "github.com/sigstore/rekor/pkg/types/rekord/v0.0.1"
62 _ "github.com/sigstore/rekor/pkg/types/rfc3161/v0.0.1"
63 _ "github.com/sigstore/rekor/pkg/types/rpm/v0.0.1"
64 _ "github.com/sigstore/rekor/pkg/types/tuf/v0.0.1"
65 )
66
67 var (
68 redisHostname = flag.String("hostname", "", "Hostname for Redis application")
69 redisPort = flag.String("port", "", "Port to Redis application")
70 redisPassword = flag.String("password", "", "Password for Redis authentication")
71 startIndex = flag.Int("start", -1, "First index to backfill")
72 endIndex = flag.Int("end", -1, "Last index to backfill")
73 enableTLS = flag.Bool("enable-tls", false, "Enable TLS for Redis client")
74 insecureSkipVerify = flag.Bool("insecure-skip-verify", false, "Whether to skip TLS verification for Redis client or not")
75 rekorAddress = flag.String("rekor-address", "", "Address for Rekor, e.g. https://rekor.sigstore.dev")
76 versionFlag = flag.Bool("version", false, "Print the current version of Backfill Redis")
77 concurrency = flag.Int("concurrency", 1, "Number of workers to use for backfill")
78 dryRun = flag.Bool("dry-run", false, "Dry run - don't actually insert into Redis")
79 )
80
81 func main() {
82 flag.Parse()
83
84 versionInfo := version.GetVersionInfo()
85 if *versionFlag {
86 fmt.Println(versionInfo.String())
87 os.Exit(0)
88 }
89
90 if *redisHostname == "" {
91 log.Fatal("address must be set")
92 }
93 if *redisPort == "" {
94 log.Fatal("port must be set")
95 }
96 if *startIndex == -1 {
97 log.Fatal("start must be set to >=0")
98 }
99 if *endIndex == -1 {
100 log.Fatal("end must be set to >=0")
101 }
102 if *rekorAddress == "" {
103 log.Fatal("rekor-address must be set")
104 }
105
106 log.Printf("running backfill redis Version: %s GitCommit: %s BuildDate: %s", versionInfo.GitVersion, versionInfo.GitCommit, versionInfo.BuildDate)
107
108 redisClient := redisClient()
109
110 rekorClient, err := client.GetRekorClient(*rekorAddress)
111 if err != nil {
112 log.Fatalf("creating rekor client: %v", err)
113 }
114
115 ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
116 group, ctx := errgroup.WithContext(ctx)
117 group.SetLimit(*concurrency)
118
119 type result struct {
120 index int
121 parseErrs []error
122 insertErrs []error
123 }
124 var resultChan = make(chan result)
125 parseErrs := make([]int, 0)
126 insertErrs := make([]int, 0)
127
128 go func() {
129 for r := range resultChan {
130 if len(r.parseErrs) > 0 {
131 parseErrs = append(parseErrs, r.index)
132 }
133 if len(r.insertErrs) > 0 {
134 insertErrs = append(insertErrs, r.index)
135 }
136 }
137 }()
138
139 for i := *startIndex; i <= *endIndex; i++ {
140 index := i
141 group.Go(func() error {
142 params := entries.NewGetLogEntryByIndexParamsWithContext(ctx)
143 params.SetLogIndex(int64(index))
144 resp, err := rekorClient.Entries.GetLogEntryByIndex(params)
145 if err != nil {
146
147 if errors.Is(err, context.Canceled) {
148 return nil
149 }
150 log.Fatalf("retrieving log uuid by index: %v", err)
151 }
152 var parseErrs []error
153 var insertErrs []error
154 for uuid, entry := range resp.Payload {
155
156 e, _, _, err := unmarshalEntryImpl(entry.Body.(string))
157 if err != nil {
158 parseErrs = append(parseErrs, fmt.Errorf("error unmarshalling entry for %s: %w", uuid, err))
159 continue
160 }
161 keys, err := e.IndexKeys()
162 if err != nil {
163 parseErrs = append(parseErrs, fmt.Errorf("error building index keys for %s: %w", uuid, err))
164 continue
165 }
166 for _, key := range keys {
167
168 if err := removeFromIndex(ctx, redisClient, key, uuid); err != nil {
169 insertErrs = append(insertErrs, fmt.Errorf("error removing UUID %s with key %s: %w", uuid, key, err))
170 }
171 if err := addToIndex(ctx, redisClient, key, uuid); err != nil {
172 insertErrs = append(insertErrs, fmt.Errorf("error inserting UUID %s with key %s: %w", uuid, key, err))
173 }
174 fmt.Printf("Uploaded Redis entry %s, index %d, key %s\n", uuid, index, key)
175 }
176 }
177 if len(insertErrs) != 0 || len(parseErrs) != 0 {
178 fmt.Printf("Errors with log index %d:\n", index)
179 for _, e := range insertErrs {
180 fmt.Println(e)
181 }
182 for _, e := range parseErrs {
183 fmt.Println(e)
184 }
185 } else {
186 fmt.Printf("Completed log index %d\n", index)
187 }
188 resultChan <- result{
189 index: index,
190 parseErrs: parseErrs,
191 insertErrs: insertErrs,
192 }
193
194 return nil
195 })
196 }
197 err = group.Wait()
198 if err != nil {
199 log.Fatalf("error running backfill: %v", err)
200 }
201 close(resultChan)
202 fmt.Println("Backfill complete")
203 if len(parseErrs) > 0 {
204 fmt.Printf("Failed to parse %d entries: %v\n", len(parseErrs), parseErrs)
205 }
206 if len(insertErrs) > 0 {
207 fmt.Printf("Failed to insert/remove %d entries: %v\n", len(insertErrs), insertErrs)
208 }
209 }
210
211 func redisClient() *redis.Client {
212
213 opts := &redis.Options{
214 Addr: fmt.Sprintf("%s:%s", *redisHostname, *redisPort),
215 Password: *redisPassword,
216 Network: "tcp",
217 DB: 0,
218 }
219
220
221 if *enableTLS {
222 opts.TLSConfig = &tls.Config{
223 InsecureSkipVerify: *insecureSkipVerify,
224 }
225 }
226
227 return redis.NewClient(opts)
228 }
229
230
231
232 func unmarshalEntryImpl(e string) (types.EntryImpl, string, string, error) {
233 b, err := base64.StdEncoding.DecodeString(e)
234 if err != nil {
235 return nil, "", "", err
236 }
237
238 pe, err := models.UnmarshalProposedEntry(bytes.NewReader(b), runtime.JSONConsumer())
239 if err != nil {
240 return nil, "", "", err
241 }
242
243 entry, err := types.UnmarshalEntry(pe)
244 if err != nil {
245 return nil, "", "", err
246 }
247 return entry, pe.Kind(), entry.APIVersion(), nil
248 }
249
250
251
252 func removeFromIndex(ctx context.Context, redisClient *redis.Client, key, value string) error {
253 if *dryRun {
254 return nil
255 }
256 _, err := redisClient.LRem(ctx, key, 0, value).Result()
257 return err
258 }
259
260
261 func addToIndex(ctx context.Context, redisClient *redis.Client, key, value string) error {
262 if *dryRun {
263 return nil
264 }
265 _, err := redisClient.LPush(ctx, key, value).Result()
266 return err
267 }
268
View as plain text