1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package core
17
18 import (
19 "context"
20 "fmt"
21 "math/rand"
22 "strconv"
23 "sync"
24 "time"
25
26 ct "github.com/google/certificate-transparency-go"
27 "github.com/google/certificate-transparency-go/client"
28 "github.com/google/certificate-transparency-go/scanner"
29 "github.com/google/certificate-transparency-go/trillian/migrillian/configpb"
30 "k8s.io/klog/v2"
31
32 "github.com/google/trillian/monitoring"
33 "github.com/google/trillian/util/clock"
34 "github.com/google/trillian/util/election2"
35 "github.com/transparency-dev/merkle/proof"
36 "github.com/transparency-dev/merkle/rfc6962"
37 )
38
39 var (
40 metrics treeMetrics
41 metricsOnce sync.Once
42 )
43
44
45 type treeMetrics struct {
46 masterRuns monitoring.Counter
47 masterCancels monitoring.Counter
48 controllerStarts monitoring.Counter
49 isMaster monitoring.Gauge
50 entriesFetched monitoring.Counter
51 entriesSeen monitoring.Counter
52 entriesStored monitoring.Counter
53 sthTimestamp monitoring.Gauge
54 sthTreeSize monitoring.Gauge
55 }
56
57
58 func initMetrics(mf monitoring.MetricFactory) {
59 const treeID = "tree_id"
60 metricsOnce.Do(func() {
61 metrics = treeMetrics{
62 masterRuns: mf.NewCounter("master_runs", "Number of mastership runs.", treeID),
63 masterCancels: mf.NewCounter("master_cancels", "Number of unexpected mastership cancelations.", treeID),
64 controllerStarts: mf.NewCounter("controller_starts", "Number of Controller (re-)starts.", treeID),
65 isMaster: mf.NewGauge("is_master", "The instance is currently the master.", treeID),
66 entriesFetched: mf.NewCounter("entries_fetched", "Entries fetched from the source log.", treeID),
67 entriesSeen: mf.NewCounter("entries_seen", "Entries seen by the submitters.", treeID),
68 entriesStored: mf.NewCounter("entries_stored", "Entries successfully submitted to Trillian.", treeID),
69 sthTimestamp: mf.NewGauge("sth_timestamp", "Timestamp of the last seen STH.", treeID),
70 sthTreeSize: mf.NewGauge("sth_tree_size", "Tree size of the last seen STH.", treeID),
71 }
72 })
73 }
74
75
76 type Options struct {
77 scanner.FetcherOptions
78 Submitters int
79 ChannelSize int
80 NoConsistencyCheck bool
81 StartDelay time.Duration
82 StopAfter time.Duration
83 }
84
85
86 func OptionsFromConfig(cfg *configpb.MigrationConfig) Options {
87 opts := Options{
88 FetcherOptions: scanner.FetcherOptions{
89 BatchSize: int(cfg.BatchSize),
90 ParallelFetch: int(cfg.NumFetchers),
91 StartIndex: cfg.StartIndex,
92 EndIndex: cfg.EndIndex,
93 Continuous: cfg.IsContinuous,
94 },
95 Submitters: int(cfg.NumSubmitters),
96 ChannelSize: int(cfg.ChannelSize),
97 NoConsistencyCheck: cfg.NoConsistencyCheck,
98 }
99 if cfg.NumFetchers == 0 {
100 opts.ParallelFetch = 1
101 }
102 if cfg.NumSubmitters == 0 {
103 opts.Submitters = 1
104 }
105 return opts
106 }
107
108
109 type Controller struct {
110 opts Options
111 ctClient *client.LogClient
112 plClient *PreorderedLogClient
113 ef election2.Factory
114 label string
115 }
116
117
118
119
120
121
122 func NewController(
123 opts Options,
124 ctClient *client.LogClient,
125 plClient *PreorderedLogClient,
126 ef election2.Factory,
127 mf monitoring.MetricFactory,
128 ) *Controller {
129 initMetrics(mf)
130 l := strconv.FormatInt(plClient.treeID, 10)
131 return &Controller{opts: opts, ctClient: ctClient, plClient: plClient, ef: ef, label: l}
132 }
133
134
135
136 func (c *Controller) RunWhenMasterWithRestarts(ctx context.Context) {
137 uri := c.ctClient.BaseURI()
138 treeID := c.plClient.treeID
139 for run := true; run; run = c.opts.Continuous && ctx.Err() == nil {
140 klog.Infof("Starting migration Controller (%d<-%q)", treeID, uri)
141 if err := c.RunWhenMaster(ctx); err != nil {
142 klog.Errorf("Controller.RunWhenMaster(%d<-%q): %v", treeID, uri, err)
143 continue
144 }
145 klog.Infof("Controller stopped (%d<-%q)", treeID, uri)
146 }
147 }
148
149
150
151
152
153
154 func (c *Controller) RunWhenMaster(ctx context.Context) error {
155
156 if err := sleepRandom(ctx, 0, c.opts.StartDelay); err != nil {
157 return err
158 }
159
160 el, err := c.ef.NewElection(ctx, c.label)
161 if err != nil {
162 return err
163 }
164 metrics.isMaster.Set(0, c.label)
165 defer func(ctx context.Context) {
166 metrics.isMaster.Set(0, c.label)
167 if err := el.Close(ctx); err != nil {
168 klog.Warningf("%s: Election.Close(): %v", c.label, err)
169 }
170 }(ctx)
171
172 for {
173 if err := el.Await(ctx); err != nil {
174 return err
175 }
176 metrics.isMaster.Set(1, c.label)
177
178 mctx, err := el.WithMastership(ctx)
179 if err != nil {
180 return err
181 } else if err := mctx.Err(); err != nil {
182 return err
183 }
184
185 klog.Infof("%s: running as master", c.label)
186 metrics.masterRuns.Inc(c.label)
187
188
189 err = c.runWithRestarts(mctx)
190 if ctx.Err() != nil {
191
192
193 return err
194 } else if mctx.Err() == nil {
195
196 if rerr := el.Resign(ctx); rerr != nil {
197 klog.Errorf("%s: Election.Resign(): %v", c.label, rerr)
198 }
199 return err
200 }
201
202
203 metrics.isMaster.Set(0, c.label)
204 metrics.masterCancels.Inc(c.label)
205 }
206 }
207
208
209
210 func (c *Controller) runWithRestarts(ctx context.Context) error {
211 err := c.Run(ctx)
212 if !c.opts.Continuous {
213 return err
214 }
215 for err != nil && ctx.Err() == nil {
216 klog.Errorf("%s: Controller.Run: %v", c.label, err)
217 if slerr := sleepRandom(ctx, 0, c.opts.StartDelay); slerr == nil {
218 err = c.Run(ctx)
219 }
220 }
221 return ctx.Err()
222 }
223
224
225
226
227
228
229 func (c *Controller) Run(ctx context.Context) error {
230 metrics.controllerStarts.Inc(c.label)
231 stopAfter := randDuration(c.opts.StopAfter, c.opts.StopAfter)
232 start := time.Now()
233
234
235 pos, err := c.fetchTail(ctx, 0)
236 if err != nil {
237 return err
238 }
239 if !c.opts.Continuous {
240 return nil
241 }
242
243 for stopAfter == 0 || time.Since(start) < stopAfter {
244
245 next, err := c.fetchTail(ctx, pos)
246 if err != nil {
247 return err
248 }
249 if next == pos {
250
251
252 if err := clock.SleepContext(ctx, 30*time.Second); err != nil {
253 return err
254 }
255 }
256 pos = next
257 }
258
259 return nil
260 }
261
262
263
264
265 func (c *Controller) fetchTail(ctx context.Context, begin uint64) (uint64, error) {
266 treeSize, rootHash, err := c.plClient.getRoot(ctx)
267 if err != nil {
268 return 0, err
269 }
270
271 fo := c.opts.FetcherOptions
272 if fo.Continuous {
273 fo.StartIndex, fo.EndIndex = int64(treeSize), 0
274
275
276 fo.Continuous = false
277 } else if fo.StartIndex < 0 {
278 fo.StartIndex = int64(treeSize)
279 }
280 if int64(begin) > fo.StartIndex {
281 fo.StartIndex = int64(begin)
282 }
283 klog.Infof("%s: fetching range [%d, %d)", c.label, fo.StartIndex, fo.EndIndex)
284
285 fetcher := scanner.NewFetcher(c.ctClient, &fo)
286 sth, err := fetcher.Prepare(ctx)
287 if err != nil {
288 return 0, err
289 }
290 metrics.sthTimestamp.Set(float64(sth.Timestamp), c.label)
291 metrics.sthTreeSize.Set(float64(sth.TreeSize), c.label)
292 if sth.TreeSize <= begin {
293 return begin, nil
294 }
295
296 if err := c.verifyConsistency(ctx, treeSize, rootHash, sth); err != nil {
297 return 0, err
298 }
299
300 var wg sync.WaitGroup
301 batches := make(chan scanner.EntryBatch, c.opts.ChannelSize)
302 cctx, cancel := context.WithCancel(ctx)
303 defer cancel()
304
305 for w, cnt := 0, c.opts.Submitters; w < cnt; w++ {
306 wg.Add(1)
307 go func() {
308 defer wg.Done()
309 if err := c.runSubmitter(cctx, batches); err != nil {
310 klog.Errorf("%s: Stopping due to submitter error: %v", c.label, err)
311 cancel()
312 }
313 }()
314 }
315
316 handler := func(b scanner.EntryBatch) {
317 metrics.entriesFetched.Add(float64(len(b.Entries)), c.label)
318 select {
319 case batches <- b:
320 case <-cctx.Done():
321 }
322 }
323
324 err = fetcher.Run(cctx, handler)
325 close(batches)
326 wg.Wait()
327 if err != nil {
328 return 0, err
329 }
330
331 if err := cctx.Err(); err != nil {
332 return 0, fmt.Errorf("failed to fetch and submit the entire tail: %v", err)
333 }
334 return sth.TreeSize, nil
335 }
336
337
338
339 func (c *Controller) verifyConsistency(ctx context.Context, treeSize uint64, rootHash []byte, sth *ct.SignedTreeHead) error {
340 if treeSize == 0 {
341
342 return nil
343 }
344 if c.opts.NoConsistencyCheck {
345 klog.Warningf("%s: skipping consistency check", c.label)
346 return nil
347 }
348 pf, err := c.ctClient.GetSTHConsistency(ctx, treeSize, sth.TreeSize)
349 if err != nil {
350 return err
351 }
352 return proof.VerifyConsistency(rfc6962.DefaultHasher, treeSize, sth.TreeSize,
353 pf, rootHash, sth.SHA256RootHash[:])
354 }
355
356
357
358
359
360 func (c *Controller) runSubmitter(ctx context.Context, batches <-chan scanner.EntryBatch) error {
361 for b := range batches {
362 entries := float64(len(b.Entries))
363 metrics.entriesSeen.Add(entries, c.label)
364
365 end := b.Start + int64(len(b.Entries))
366 if err := c.plClient.addSequencedLeaves(ctx, &b); err != nil {
367
368
369
370 return fmt.Errorf("failed to add batch [%d, %d): %v", b.Start, end, err)
371 }
372 klog.Infof("%s: added batch [%d, %d)", c.label, b.Start, end)
373 metrics.entriesStored.Add(entries, c.label)
374 }
375 return nil
376 }
377
378
379 func sleepRandom(ctx context.Context, base, spread time.Duration) error {
380 d := randDuration(base, spread)
381 if d == 0 {
382 return nil
383 }
384 return clock.SleepContext(ctx, d)
385 }
386
387
388 func randDuration(base, spread time.Duration) time.Duration {
389 d := base
390 if spread != 0 {
391 d += time.Duration(rand.Int63n(int64(spread)))
392 }
393 return d
394 }
395
View as plain text