...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package fixchain
16
17 import (
18 "context"
19 "log"
20 "net/http"
21 "sync"
22 "sync/atomic"
23 "time"
24
25 "github.com/google/certificate-transparency-go/client"
26 "github.com/google/certificate-transparency-go/x509"
27 )
28
29
30 type FixAndLog struct {
31 fixer *Fixer
32 chains chan []*x509.Certificate
33 logger *Logger
34 wg sync.WaitGroup
35
36
37 queued uint32
38
39 done *lockedMap
40
41 alreadyDone uint32
42
43
44 chainsQueued uint32
45
46 alreadyPosted uint32
47
48 chainsSent uint32
49 }
50
51
52
53 func (fl *FixAndLog) QueueAllCertsInChain(chain []*x509.Certificate) {
54 if chain != nil {
55 atomic.AddUint32(&fl.queued, 1)
56 atomic.AddUint32(&fl.chainsQueued, uint32(len(chain)))
57 dchain := newDedupedChain(chain)
58
59 h := hashBag(dchain.certs)
60 if fl.done.get(h) {
61 atomic.AddUint32(&fl.alreadyDone, 1)
62 return
63 }
64 fl.done.set(h, true)
65
66 for _, cert := range dchain.certs {
67 if fl.logger.IsPosted(cert) {
68 atomic.AddUint32(&fl.alreadyPosted, 1)
69 continue
70 }
71 fl.fixer.QueueChain(cert, dchain.certs, fl.logger.RootCerts())
72 atomic.AddUint32(&fl.chainsSent, 1)
73 }
74 }
75 }
76
77
78
79
80
81 func (fl *FixAndLog) QueueChain(chain []*x509.Certificate) {
82 if chain != nil {
83 if fl.logger.IsPosted(chain[0]) {
84 atomic.AddUint32(&fl.alreadyPosted, 1)
85 return
86 }
87 fl.fixer.QueueChain(chain[0], chain, fl.logger.RootCerts())
88 atomic.AddUint32(&fl.chainsSent, 1)
89 }
90 }
91
92
93
94 func (fl *FixAndLog) Wait() {
95 fl.fixer.Wait()
96 close(fl.chains)
97 fl.wg.Wait()
98 fl.logger.Wait()
99 }
100
101
102
103
104
105 func NewFixAndLog(ctx context.Context, fixerWorkerCount int, loggerWorkerCount int, errors chan<- *FixError, client *http.Client, logClient client.AddLogClient, limiter Limiter, logStats bool) *FixAndLog {
106 chains := make(chan []*x509.Certificate)
107 fl := &FixAndLog{
108 fixer: NewFixer(fixerWorkerCount, chains, errors, client, logStats),
109 chains: chains,
110 logger: NewLogger(ctx, loggerWorkerCount, errors, logClient, limiter, logStats),
111 done: newLockedMap(),
112 }
113
114 fl.wg.Add(1)
115 go func() {
116 for chain := range chains {
117 fl.logger.QueueChain(chain)
118 }
119 fl.wg.Done()
120 }()
121
122 if logStats {
123 t := time.NewTicker(time.Second)
124 go func() {
125 for range t.C {
126 log.Printf("fix-then-log: %d whole chains queued, %d whole chains already done, %d total chains queued, %d chains don't need posting (cache hits), %d chains sent to fixer", fl.queued, fl.alreadyDone, fl.chainsQueued, fl.alreadyPosted, fl.chainsSent)
127 }
128 }()
129 }
130
131 return fl
132 }
133
View as plain text