1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package witness
16
17 import (
18 "context"
19 "encoding/hex"
20 "fmt"
21 "strconv"
22 "time"
23
24 "github.com/google/trillian"
25 "github.com/google/trillian/types"
26 "github.com/prometheus/client_golang/prometheus"
27 "github.com/redis/go-redis/v9"
28 "github.com/sigstore/rekor/pkg/log"
29 "github.com/sigstore/rekor/pkg/trillianclient"
30 "github.com/sigstore/rekor/pkg/util"
31 "github.com/sigstore/sigstore/pkg/signature"
32 "google.golang.org/grpc/codes"
33 )
34
35
36 type CheckpointPublisher struct {
37 ctx context.Context
38
39 logClient trillian.TrillianLogClient
40
41 treeID int64
42
43 hostname string
44
45 signer signature.Signer
46
47 checkpointFreq uint
48
49 redisClient *redis.Client
50
51 reqCounter *prometheus.CounterVec
52 }
53
54
55 const (
56 Success = iota
57 SuccessObtainLock
58 GetCheckpoint
59 UnmarshalCheckpoint
60 SignCheckpoint
61 RedisFailure
62 RedisLatestFailure
63 )
64
65
66 func NewCheckpointPublisher(ctx context.Context,
67 logClient trillian.TrillianLogClient,
68 treeID int64,
69 hostname string,
70 signer signature.Signer,
71 redisClient *redis.Client,
72 checkpointFreq uint,
73 reqCounter *prometheus.CounterVec) CheckpointPublisher {
74 return CheckpointPublisher{ctx: ctx, logClient: logClient, treeID: treeID, hostname: hostname,
75 signer: signer, checkpointFreq: checkpointFreq, redisClient: redisClient, reqCounter: reqCounter}
76 }
77
78
79
80
81
82
83 func (c *CheckpointPublisher) StartPublisher(ctx context.Context) {
84 tc := trillianclient.NewTrillianClient(context.Background(), c.logClient, c.treeID)
85 sTreeID := strconv.FormatInt(c.treeID, 10)
86
87
88 c.publish(&tc, sTreeID)
89
90 ticker := time.NewTicker(time.Duration(c.checkpointFreq) * time.Minute)
91 go func() {
92 for {
93 select {
94 case <-ctx.Done():
95 return
96 case <-ticker.C:
97 c.publish(&tc, sTreeID)
98 }
99 }
100 }()
101 }
102
103
104 func (c *CheckpointPublisher) publish(tc *trillianclient.TrillianClient, sTreeID string) {
105
106 resp := tc.GetLatest(0)
107 if resp.Status != codes.OK {
108 c.reqCounter.With(
109 map[string]string{
110 "shard": sTreeID,
111 "code": strconv.Itoa(GetCheckpoint),
112 }).Inc()
113 log.Logger.Errorf("error getting latest checkpoint to publish: %v", resp.Status)
114 return
115 }
116
117
118 root := &types.LogRootV1{}
119 if err := root.UnmarshalBinary(resp.GetLatestResult.SignedLogRoot.LogRoot); err != nil {
120 c.reqCounter.With(
121 map[string]string{
122 "shard": sTreeID,
123 "code": strconv.Itoa(UnmarshalCheckpoint),
124 }).Inc()
125 log.Logger.Errorf("error unmarshalling latest checkpoint to publish: %v", err)
126 return
127 }
128
129
130 checkpoint, err := util.CreateAndSignCheckpoint(context.Background(), c.hostname, c.treeID, root.TreeSize, root.RootHash, c.signer)
131 if err != nil {
132 c.reqCounter.With(
133 map[string]string{
134 "shard": sTreeID,
135 "code": strconv.Itoa(SignCheckpoint),
136 }).Inc()
137 log.Logger.Errorf("error signing checkpoint to publish: %v", err)
138 return
139 }
140
141
142 hexCP := hex.EncodeToString(checkpoint)
143
144
145
146 ts := time.Now().Truncate(time.Duration(c.checkpointFreq) * time.Minute).UnixNano()
147
148 key := fmt.Sprintf("%d/%d", c.treeID, ts)
149 ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
150 defer cancel()
151
152
153
154
155 value := true
156 successNX, err := c.redisClient.SetNX(ctx, key, value, 0).Result()
157 if err != nil {
158 c.reqCounter.With(
159 map[string]string{
160 "shard": sTreeID,
161 "code": strconv.Itoa(RedisFailure),
162 }).Inc()
163 log.Logger.Errorf("error with client publishing checkpoint: %v", err)
164 return
165 }
166
167 if !successNX {
168 return
169 }
170
171
172 c.reqCounter.With(
173 map[string]string{
174 "shard": sTreeID,
175 "code": strconv.Itoa(SuccessObtainLock),
176 }).Inc()
177
178
179 latestKey := fmt.Sprintf("%d/latest", c.treeID)
180 latestCtx, latestCancel := context.WithTimeout(c.ctx, 10*time.Second)
181 defer latestCancel()
182
183
184
185 if _, err = c.redisClient.Set(latestCtx, latestKey, hexCP, 0).Result(); err != nil {
186 c.reqCounter.With(
187 map[string]string{
188 "shard": sTreeID,
189 "code": strconv.Itoa(RedisLatestFailure),
190 }).Inc()
191 log.Logger.Errorf("error with client publishing latest checkpoint: %v", err)
192 return
193 }
194
195
196 c.reqCounter.With(
197 map[string]string{
198 "shard": sTreeID,
199 "code": strconv.Itoa(Success),
200 }).Inc()
201 }
202
View as plain text