1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package fixchain
16
17 import (
18 "context"
19 "fmt"
20 "log"
21 "sync"
22 "sync/atomic"
23 "time"
24
25 ct "github.com/google/certificate-transparency-go"
26 "github.com/google/certificate-transparency-go/client"
27 "github.com/google/certificate-transparency-go/x509"
28 )
29
30
31
32 type Limiter interface {
33 Wait(context.Context) error
34 }
35
36
37
38
39 type Logger struct {
40 ctx context.Context
41 client client.AddLogClient
42 roots *x509.CertPool
43 toPost chan *toPost
44 errors chan<- *FixError
45
46 active uint32
47
48 queued uint32
49 posted uint32
50 reposted uint32
51 chainReposted uint32
52
53
54
55
56 wg sync.WaitGroup
57 limiter Limiter
58
59 postCertCache *lockedMap
60 postChainCache *lockedMap
61 }
62
63
64
65 func (l *Logger) IsPosted(cert *x509.Certificate) bool {
66 return l.postCertCache.get(hash(cert))
67 }
68
69
70 func (l *Logger) QueueChain(chain []*x509.Certificate) {
71 if chain == nil {
72 return
73 }
74
75 atomic.AddUint32(&l.queued, 1)
76
77
78 h := hash(chain[0])
79 if l.postCertCache.get(h) {
80 atomic.AddUint32(&l.reposted, 1)
81 return
82 }
83
84
85
86
87
88
89 h = hashChain(chain)
90 if l.postChainCache.get(h) {
91 atomic.AddUint32(&l.chainReposted, 1)
92 return
93 }
94 l.postChainCache.set(h, true)
95
96 l.postToLog(&toPost{chain: chain})
97 }
98
99
100 func (l *Logger) Wait() {
101 l.wg.Wait()
102 }
103
104
105 func (l *Logger) RootCerts() *x509.CertPool {
106 if l.roots == nil {
107
108 for i := 0; i < 10; i++ {
109 roots, err := l.getRoots()
110 if err == nil {
111 l.roots = roots
112 return l.roots
113 }
114 log.Println(err)
115 }
116 log.Fatalf("Can't get roots for log")
117 }
118 return l.roots
119 }
120
121 func (l *Logger) getRoots() (*x509.CertPool, error) {
122 roots, err := l.client.GetAcceptedRoots(l.ctx)
123 if err != nil {
124 return nil, fmt.Errorf("failed to get roots: %s", err)
125 }
126 ret := x509.NewCertPool()
127 for _, root := range roots {
128 r, err := x509.ParseCertificate(root.Data)
129 if x509.IsFatal(err) {
130 return nil, fmt.Errorf("can't parse certificate: %s %#v", err, root.Data)
131 }
132 ret.AddCert(r)
133 }
134 return ret, nil
135 }
136
137 type toPost struct {
138 chain []*x509.Certificate
139 }
140
141
142
143
144
145
146
147
148
149 func (l *Logger) postToLog(p *toPost) {
150 l.wg.Add(1)
151 l.toPost <- p
152 }
153
154 func (l *Logger) postChain(p *toPost) {
155 h := hash(p.chain[0])
156 if l.postCertCache.get(h) {
157 atomic.AddUint32(&l.reposted, 1)
158 return
159 }
160
161 derChain := make([]ct.ASN1Cert, 0, len(p.chain))
162 for _, cert := range p.chain {
163 derChain = append(derChain, ct.ASN1Cert{Data: cert.Raw})
164 }
165
166 if err := l.limiter.Wait(l.ctx); err != nil {
167 log.Println(err)
168 }
169 atomic.AddUint32(&l.posted, 1)
170 _, err := l.client.AddChain(l.ctx, derChain)
171 if err != nil {
172 l.errors <- &FixError{
173 Type: LogPostFailed,
174 Chain: p.chain,
175 Error: fmt.Errorf("add-chain failed: %s", err),
176 }
177 return
178 }
179
180
181 l.postCertCache.set(h, true)
182 }
183
184 func (l *Logger) postServer() {
185 for {
186 c := <-l.toPost
187 atomic.AddUint32(&l.active, 1)
188 l.postChain(c)
189 atomic.AddUint32(&l.active, ^uint32(0))
190 l.wg.Done()
191 }
192 }
193
194 func (l *Logger) logStats() {
195 t := time.NewTicker(time.Second)
196 go func() {
197 for range t.C {
198 log.Printf("posters: %d active, %d posted, %d queued, %d certs requeued, %d chains requeued",
199 l.active, l.posted, l.queued, l.reposted, l.chainReposted)
200 }
201 }()
202 }
203
204
205
206
207
208 func NewLogger(ctx context.Context, workerCount int, errors chan<- *FixError, client client.AddLogClient, limiter Limiter, logStats bool) *Logger {
209 l := &Logger{
210 ctx: ctx,
211 client: client,
212 errors: errors,
213 toPost: make(chan *toPost),
214 postCertCache: newLockedMap(),
215 postChainCache: newLockedMap(),
216 limiter: limiter,
217 }
218 l.RootCerts()
219
220
221 for i := 0; i < workerCount; i++ {
222 go l.postServer()
223 }
224
225 if logStats {
226 l.logStats()
227 }
228 return l
229 }
230
View as plain text