1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package fixchain
16
17 import (
18 "bytes"
19 "log"
20 "net/http"
21 "sort"
22 "sync"
23 "sync/atomic"
24 "time"
25
26 "github.com/google/certificate-transparency-go/x509"
27 )
28
29
30
31
32 type Fixer struct {
33 toFix chan *toFix
34 chains chan<- []*x509.Certificate
35 errors chan<- *FixError
36
37 active uint32
38
39 reconstructed uint32
40 notReconstructed uint32
41 fixed uint32
42 notFixed uint32
43 validChainsProduced uint32
44 validChainsOut uint32
45
46 wg sync.WaitGroup
47 cache *urlCache
48 }
49
50
51
52
53 func (f *Fixer) QueueChain(cert *x509.Certificate, chain []*x509.Certificate, roots *x509.CertPool) {
54 f.toFix <- &toFix{
55 cert: cert,
56 chain: newDedupedChain(chain),
57 roots: roots,
58 cache: f.cache,
59 }
60 }
61
62
63 func (f *Fixer) Wait() {
64 close(f.toFix)
65 f.wg.Wait()
66 }
67
68 func (f *Fixer) updateCounters(chains [][]*x509.Certificate, ferrs []*FixError) {
69 atomic.AddUint32(&f.validChainsProduced, uint32(len(chains)))
70
71 var verifyFailed bool
72 var fixFailed bool
73 for _, ferr := range ferrs {
74 switch ferr.Type {
75 case VerifyFailed:
76 verifyFailed = true
77 case FixFailed:
78 fixFailed = true
79 }
80 }
81
82
83
84
85 if verifyFailed {
86 atomic.AddUint32(&f.notReconstructed, 1)
87
88
89 if fixFailed {
90 atomic.AddUint32(&f.notFixed, 1)
91 return
92 }
93 atomic.AddUint32(&f.fixed, 1)
94 return
95 }
96 atomic.AddUint32(&f.reconstructed, 1)
97 }
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119 type chainSlice struct {
120 chains [][]*x509.Certificate
121 }
122
123 func min(a, b int) int {
124 if a < b {
125 return a
126 }
127 return b
128 }
129
130
131 func (c chainSlice) Len() int { return len(c.chains) }
132
133
134 func (c chainSlice) Less(i, j int) bool {
135 chi := c.chains[i]
136 chj := c.chains[j]
137 for k := 0; k < min(len(chi), len(chj)); k++ {
138 if !chi[k].Equal(chj[k]) {
139 return bytes.Compare(chi[k].Raw, chj[k].Raw) < 0
140 }
141 }
142 return len(chi) < len(chj)
143 }
144
145
146 func (c chainSlice) Swap(i, j int) {
147 t := c.chains[i]
148 c.chains[i] = c.chains[j]
149 c.chains[j] = t
150 }
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179 func removeSuperChains(chains [][]*x509.Certificate) [][]*x509.Certificate {
180
181
182
183
184
185 c := chainSlice{chains: chains}
186 sort.Sort(c)
187 var retChains [][]*x509.Certificate
188 NextChain:
189
190 for i := 0; i < len(c.chains); {
191
192 retChains = append(retChains, c.chains[i])
193
194
195
196
197 for j := i + 1; j < len(c.chains); j++ {
198 for k := range c.chains[i] {
199
200
201
202 if !c.chains[i][k].Equal(c.chains[j][k]) {
203 i = j
204 continue NextChain
205 }
206 }
207 }
208 break
209 }
210 return retChains
211 }
212
213 func (f *Fixer) fixServer() {
214 defer f.wg.Done()
215
216 for fix := range f.toFix {
217 atomic.AddUint32(&f.active, 1)
218 chains, ferrs := fix.handleChain()
219 f.updateCounters(chains, ferrs)
220 for _, ferr := range ferrs {
221 f.errors <- ferr
222 }
223
224
225
226
227
228 for _, chain := range removeSuperChains(chains) {
229 f.chains <- chain
230 atomic.AddUint32(&f.validChainsOut, 1)
231 }
232 atomic.AddUint32(&f.active, ^uint32(0))
233 }
234 }
235
236 func (f *Fixer) newFixServerPool(workerCount int) {
237 for i := 0; i < workerCount; i++ {
238 f.wg.Add(1)
239 go f.fixServer()
240 }
241 }
242
243 func (f *Fixer) logStats() {
244 t := time.NewTicker(time.Second)
245 go func() {
246 for range t.C {
247 log.Printf("fixers: %d active, %d reconstructed, "+
248 "%d not reconstructed, %d fixed, %d not fixed, "+
249 "%d valid chains produced, %d valid chains sent",
250 f.active, f.reconstructed, f.notReconstructed,
251 f.fixed, f.notFixed, f.validChainsProduced, f.validChainsOut)
252 }
253 }()
254 }
255
256
257
258
259
260 func NewFixer(workerCount int, chains chan<- []*x509.Certificate, errors chan<- *FixError, client *http.Client, logStats bool) *Fixer {
261 f := &Fixer{
262 toFix: make(chan *toFix),
263 chains: chains,
264 errors: errors,
265 cache: newURLCache(client, logStats),
266 }
267
268 f.newFixServerPool(workerCount)
269 if logStats {
270 f.logStats()
271 }
272 return f
273 }
274
View as plain text