1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package scanner
16
17 import (
18 "context"
19 "net/http"
20 "sync"
21 "time"
22
23 ct "github.com/google/certificate-transparency-go"
24 "github.com/google/certificate-transparency-go/jsonclient"
25 "github.com/google/trillian/client/backoff"
26 "k8s.io/klog/v2"
27 )
28
29
30 type LogClient interface {
31 BaseURI() string
32 GetSTH(context.Context) (*ct.SignedTreeHead, error)
33 GetRawEntries(ctx context.Context, start, end int64) (*ct.GetEntriesResponse, error)
34 }
35
36
37 type FetcherOptions struct {
38
39 BatchSize int
40
41
42 ParallelFetch int
43
44
45
46 StartIndex int64
47 EndIndex int64
48
49
50
51 Continuous bool
52 }
53
54
55 func DefaultFetcherOptions() *FetcherOptions {
56 return &FetcherOptions{
57 BatchSize: 1000,
58 ParallelFetch: 1,
59 StartIndex: 0,
60 EndIndex: 0,
61 Continuous: false,
62 }
63 }
64
65
66 type Fetcher struct {
67
68 uri string
69
70 client LogClient
71
72 opts *FetcherOptions
73
74
75 sth *ct.SignedTreeHead
76
77 sthBackoff *backoff.Backoff
78
79
80 mu sync.Mutex
81 cancel context.CancelFunc
82 }
83
84
85 type EntryBatch struct {
86 Start int64
87 Entries []ct.LeafEntry
88 }
89
90
91 type fetchRange struct {
92 start int64
93 end int64
94 }
95
96
97
98 func NewFetcher(client LogClient, opts *FetcherOptions) *Fetcher {
99 cancel := func() {}
100 return &Fetcher{
101 uri: client.BaseURI(),
102 client: client,
103 opts: opts,
104 cancel: cancel,
105 }
106 }
107
108
109
110 func (f *Fetcher) Prepare(ctx context.Context) (*ct.SignedTreeHead, error) {
111 if f.sth != nil {
112 return f.sth, nil
113 }
114
115 sth, err := f.client.GetSTH(ctx)
116 if err != nil {
117 klog.Errorf("%s: GetSTH() failed: %v", f.uri, err)
118 return nil, err
119 }
120 klog.V(1).Infof("%s: Got STH with %d certs", f.uri, sth.TreeSize)
121
122 if size := int64(sth.TreeSize); f.opts.EndIndex == 0 || f.opts.EndIndex > size {
123 klog.V(1).Infof("%s: Reset EndIndex from %d to %d", f.uri, f.opts.EndIndex, size)
124 f.opts.EndIndex = size
125 }
126 f.sth = sth
127 return sth, nil
128 }
129
130
131
132
133 func (f *Fetcher) Run(ctx context.Context, fn func(EntryBatch)) error {
134 klog.V(1).Infof("%s: Starting up Fetcher...", f.uri)
135 if _, err := f.Prepare(ctx); err != nil {
136 return err
137 }
138
139 cctx, cancel := context.WithCancel(ctx)
140 defer cancel()
141
142 f.mu.Lock()
143 f.cancel = cancel
144 f.mu.Unlock()
145
146
147
148
149 ranges := f.genRanges(cctx)
150
151
152 var wg sync.WaitGroup
153 for w, cnt := 0, f.opts.ParallelFetch; w < cnt; w++ {
154 wg.Add(1)
155 go func(idx int) {
156 defer wg.Done()
157 klog.V(1).Infof("%s: Fetcher worker %d starting...", f.uri, idx)
158 f.runWorker(ctx, ranges, fn)
159 klog.V(1).Infof("%s: Fetcher worker %d finished", f.uri, idx)
160 }(w)
161 }
162 wg.Wait()
163
164 klog.V(1).Infof("%s: Fetcher terminated", f.uri)
165 return nil
166 }
167
168
169
170
171 func (f *Fetcher) Stop() {
172 f.mu.Lock()
173 defer f.mu.Unlock()
174 f.cancel()
175 }
176
177
178
179
180 func (f *Fetcher) genRanges(ctx context.Context) <-chan fetchRange {
181 batch := int64(f.opts.BatchSize)
182 ranges := make(chan fetchRange)
183
184 go func() {
185 klog.V(1).Infof("%s: Range generator starting", f.uri)
186 defer klog.V(1).Infof("%s: Range generator finished", f.uri)
187 defer close(ranges)
188 start, end := f.opts.StartIndex, f.opts.EndIndex
189
190 for start < end || f.opts.Continuous {
191
192
193 if start == end {
194 if err := f.updateSTH(ctx); err != nil {
195 klog.Warningf("%s: Failed to obtain bigger STH: %v", f.uri, err)
196 return
197 }
198 end = f.opts.EndIndex
199 }
200
201 batchEnd := start + min(end-start, batch)
202 next := fetchRange{start, batchEnd - 1}
203 select {
204 case <-ctx.Done():
205 klog.Warningf("%s: Cancelling genRanges: %v", f.uri, ctx.Err())
206 return
207 case ranges <- next:
208 }
209 start = batchEnd
210 }
211 }()
212
213 return ranges
214 }
215
216
217
218
219
220
221
222 func (f *Fetcher) updateSTH(ctx context.Context) error {
223
224 const quickDur = 45 * time.Second
225 if f.sthBackoff == nil {
226 f.sthBackoff = &backoff.Backoff{
227 Min: 1 * time.Second,
228 Max: 30 * time.Second,
229 Factor: 2,
230 Jitter: true,
231 }
232 }
233
234 lastSize := uint64(f.opts.EndIndex)
235 targetSize := lastSize + uint64(f.opts.BatchSize)
236 quickDeadline := time.Now().Add(quickDur)
237
238 return f.sthBackoff.Retry(ctx, func() error {
239 sth, err := f.client.GetSTH(ctx)
240 if err != nil {
241 return backoff.RetriableErrorf("GetSTH: %v", err)
242 }
243 klog.V(2).Infof("%s: Got STH with %d certs", f.uri, sth.TreeSize)
244
245 quick := time.Now().Before(quickDeadline)
246 if sth.TreeSize <= lastSize || quick && sth.TreeSize < targetSize {
247 return backoff.RetriableErrorf("wait for bigger STH than %d (last=%d, target=%d)", sth.TreeSize, lastSize, targetSize)
248 }
249
250 if quick {
251 f.sthBackoff.Reset()
252 }
253 f.sth = sth
254 f.opts.EndIndex = int64(sth.TreeSize)
255 return nil
256 })
257 }
258
259
260
261
262
263 func (f *Fetcher) runWorker(ctx context.Context, ranges <-chan fetchRange, fn func(EntryBatch)) {
264 for r := range ranges {
265
266
267 for r.start <= r.end {
268 if ctx.Err() != nil {
269 return
270 }
271
272
273
274
275 bo := &backoff.Backoff{
276 Min: 1 * time.Second,
277 Max: 30 * time.Second,
278 Factor: 2,
279 Jitter: true,
280 }
281
282 var resp *ct.GetEntriesResponse
283
284 if err := bo.Retry(ctx, func() error {
285 var err error
286 resp, err = f.client.GetRawEntries(ctx, r.start, r.end)
287 return err
288 }); err != nil {
289 if rspErr, isRspErr := err.(jsonclient.RspError); isRspErr && rspErr.StatusCode == http.StatusTooManyRequests {
290 klog.V(2).Infof("%s: GetRawEntries() failed: %v", f.uri, err)
291 } else {
292 klog.Errorf("%s: GetRawEntries() failed: %v", f.uri, err)
293 }
294
295 continue
296 }
297 fn(EntryBatch{Start: r.start, Entries: resp.Entries})
298 r.start += int64(len(resp.Entries))
299 }
300 }
301 }
302
303 func min(a, b int64) int64 {
304 if a < b {
305 return a
306 }
307 return b
308 }
309
View as plain text