1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package scanner
17
18 import (
19 "context"
20 "fmt"
21 "sync"
22 "sync/atomic"
23 "time"
24
25 ct "github.com/google/certificate-transparency-go"
26 "github.com/google/certificate-transparency-go/x509"
27 "k8s.io/klog/v2"
28 )
29
30
31 type ScannerOptions struct {
32 FetcherOptions
33
34
35
36
37 Matcher interface{}
38
39
40 PrecertOnly bool
41
42
43 NumWorkers int
44
45
46 BufferSize int
47 }
48
49
50 func DefaultScannerOptions() *ScannerOptions {
51 return &ScannerOptions{
52 FetcherOptions: *DefaultFetcherOptions(),
53 Matcher: &MatchAll{},
54 PrecertOnly: false,
55 NumWorkers: 1,
56 }
57 }
58
59
60 type Scanner struct {
61
62
63
64
65 certsProcessed int64
66 certsMatched int64
67
68
69 precertsSeen int64
70
71 unparsableEntries int64
72 entriesWithNonFatalErrors int64
73
74 fetcher *Fetcher
75
76
77 opts ScannerOptions
78 }
79
80
81 type entryInfo struct {
82
83 index int64
84
85 entry ct.LeafEntry
86 }
87
88
89
90
91
92
93
94
95 func (s *Scanner) isCertErrorFatal(err error, logEntry *ct.LogEntry, index int64) bool {
96 if err == nil {
97
98 return false
99 } else if !x509.IsFatal(err) {
100 atomic.AddInt64(&s.entriesWithNonFatalErrors, 1)
101
102 klog.V(1).Infof("Non-fatal error in %v at index %d: %v", logEntry.Leaf.TimestampedEntry.EntryType, index, err)
103 return false
104 }
105 return true
106 }
107
108
109 func (s *Scanner) processEntry(info entryInfo, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
110 atomic.AddInt64(&s.certsProcessed, 1)
111
112 switch matcher := s.opts.Matcher.(type) {
113 case Matcher:
114 return s.processMatcherEntry(matcher, info, foundCert, foundPrecert)
115 case LeafMatcher:
116 return s.processMatcherLeafEntry(matcher, info, foundCert, foundPrecert)
117 default:
118 return fmt.Errorf("unexpected matcher type %T", matcher)
119 }
120 }
121
122 func (s *Scanner) processMatcherEntry(matcher Matcher, info entryInfo, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
123 rawLogEntry, err := ct.RawLogEntryFromLeaf(info.index, &info.entry)
124 if err != nil {
125 return fmt.Errorf("failed to build raw log entry %d: %v", info.index, err)
126 }
127
128 logEntry, err := rawLogEntry.ToLogEntry()
129 if s.isCertErrorFatal(err, logEntry, info.index) {
130 return fmt.Errorf("failed to parse [pre-]certificate in MerkleTreeLeaf[%d]: %v", info.index, err)
131 }
132
133 switch {
134 case logEntry.X509Cert != nil:
135 if s.opts.PrecertOnly {
136
137 return nil
138 }
139 if matcher.CertificateMatches(logEntry.X509Cert) {
140 atomic.AddInt64(&s.certsMatched, 1)
141 foundCert(rawLogEntry)
142 }
143 case logEntry.Precert != nil:
144 if matcher.PrecertificateMatches(logEntry.Precert) {
145 atomic.AddInt64(&s.certsMatched, 1)
146 foundPrecert(rawLogEntry)
147 }
148 atomic.AddInt64(&s.precertsSeen, 1)
149 default:
150 return fmt.Errorf("saw unknown entry type: %v", logEntry.Leaf.TimestampedEntry.EntryType)
151 }
152 return nil
153 }
154
155 func (s *Scanner) processMatcherLeafEntry(matcher LeafMatcher, info entryInfo, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
156 if !matcher.Matches(&info.entry) {
157 return nil
158 }
159
160 rawLogEntry, err := ct.RawLogEntryFromLeaf(info.index, &info.entry)
161 if rawLogEntry == nil {
162 return fmt.Errorf("failed to build raw log entry %d: %v", info.index, err)
163 }
164 switch eType := rawLogEntry.Leaf.TimestampedEntry.EntryType; eType {
165 case ct.X509LogEntryType:
166 if s.opts.PrecertOnly {
167
168 return nil
169 }
170 foundCert(rawLogEntry)
171 case ct.PrecertLogEntryType:
172 foundPrecert(rawLogEntry)
173 atomic.AddInt64(&s.precertsSeen, 1)
174 default:
175 return fmt.Errorf("saw unknown entry type: %v", eType)
176 }
177 return nil
178 }
179
180
181
182
183 func (s *Scanner) matcherJob(entries <-chan entryInfo, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) {
184 for e := range entries {
185 if err := s.processEntry(e, foundCert, foundPrecert); err != nil {
186 atomic.AddInt64(&s.unparsableEntries, 1)
187 klog.Errorf("Failed to parse entry at index %d: %s", e.index, err.Error())
188 }
189 }
190 }
191
192
193 func humanTime(dur time.Duration) string {
194 hours := int(dur / time.Hour)
195 dur %= time.Hour
196 minutes := int(dur / time.Minute)
197 dur %= time.Minute
198 seconds := int(dur / time.Second)
199 s := ""
200 if hours > 0 {
201 s += fmt.Sprintf("%d hours ", hours)
202 }
203 if minutes > 0 {
204 s += fmt.Sprintf("%d minutes ", minutes)
205 }
206 if seconds > 0 || len(s) == 0 {
207 s += fmt.Sprintf("%d seconds ", seconds)
208 }
209 return s
210 }
211
212 func (s *Scanner) logThroughput(treeSize int64, stop <-chan bool) {
213 const wndSize = 15
214 wnd := make([]int64, wndSize)
215 wndTotal := int64(0)
216
217 ticker := time.NewTicker(time.Second)
218 defer ticker.Stop()
219
220 for slot, filled, prevCnt := 0, 0, int64(0); ; slot = (slot + 1) % wndSize {
221 select {
222 case <-stop:
223 return
224 case <-ticker.C:
225 certsCnt := atomic.LoadInt64(&s.certsProcessed)
226 certsMatched := atomic.LoadInt64(&s.certsMatched)
227
228 slotValue := certsCnt - prevCnt
229 wndTotal += slotValue - wnd[slot]
230 wnd[slot], prevCnt = slotValue, certsCnt
231
232 if filled < wndSize {
233 filled++
234 }
235
236 throughput := float64(wndTotal) / float64(filled)
237 remainingCerts := treeSize - int64(s.opts.StartIndex) - certsCnt
238 remainingSeconds := int(float64(remainingCerts) / throughput)
239 remainingString := humanTime(time.Duration(remainingSeconds) * time.Second)
240 klog.V(1).Infof("Processed: %d certs (to index %d), matched %d (%2.2f%%). Throughput (last %ds): %3.2f ETA: %s\n",
241 certsCnt, s.opts.StartIndex+certsCnt, certsMatched,
242 (100.0*float64(certsMatched))/float64(certsCnt),
243 filled, throughput, remainingString)
244 }
245 }
246 }
247
248
249
250
251
252
253
254 func (s *Scanner) Scan(ctx context.Context, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) error {
255 _, err := s.ScanLog(ctx, foundCert, foundPrecert)
256 return err
257 }
258
259
260 func (s *Scanner) ScanLog(ctx context.Context, foundCert func(*ct.RawLogEntry), foundPrecert func(*ct.RawLogEntry)) (int64, error) {
261 klog.V(1).Infof("Starting up Scanner...")
262 s.certsProcessed = 0
263 s.certsMatched = 0
264 s.precertsSeen = 0
265 s.unparsableEntries = 0
266 s.entriesWithNonFatalErrors = 0
267
268 sth, err := s.fetcher.Prepare(ctx)
269 if err != nil {
270 return -1, err
271 }
272
273 startTime := time.Now()
274 stop := make(chan bool)
275 go s.logThroughput(int64(sth.TreeSize), stop)
276 defer func() {
277 stop <- true
278 close(stop)
279 }()
280
281
282 var wg sync.WaitGroup
283 entries := make(chan entryInfo, s.opts.BufferSize)
284 for w, cnt := 0, s.opts.NumWorkers; w < cnt; w++ {
285 wg.Add(1)
286 go func(idx int) {
287 defer wg.Done()
288 klog.V(1).Infof("Matcher %d starting", idx)
289 s.matcherJob(entries, foundCert, foundPrecert)
290 klog.V(1).Infof("Matcher %d finished", idx)
291 }(w)
292 }
293
294 flatten := func(b EntryBatch) {
295 for i, e := range b.Entries {
296 entries <- entryInfo{index: b.Start + int64(i), entry: e}
297 }
298 }
299 err = s.fetcher.Run(ctx, flatten)
300 close(entries)
301 wg.Wait()
302 if err != nil {
303 return -1, err
304 }
305
306 klog.V(1).Infof("Completed %d certs in %s", atomic.LoadInt64(&s.certsProcessed), humanTime(time.Since(startTime)))
307 klog.V(1).Infof("Saw %d precerts", atomic.LoadInt64(&s.precertsSeen))
308 klog.V(1).Infof("Saw %d unparsable entries", atomic.LoadInt64(&s.unparsableEntries))
309 klog.V(1).Infof("Saw %d non-fatal errors", atomic.LoadInt64(&s.entriesWithNonFatalErrors))
310
311 return int64(s.fetcher.opts.EndIndex), nil
312 }
313
314
315
316 func NewScanner(client LogClient, opts ScannerOptions) *Scanner {
317 var scanner Scanner
318 scanner.opts = opts
319 scanner.fetcher = NewFetcher(client, &scanner.opts.FetcherOptions)
320
321
322 if opts.Matcher == nil {
323 opts.Matcher = &MatchAll{}
324 }
325 return &scanner
326 }
327
View as plain text