1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package submission
16
17 import (
18 "context"
19 "fmt"
20 "net/http"
21 "net/url"
22 "strconv"
23 "sync"
24 "time"
25
26 "github.com/google/certificate-transparency-go/client"
27 "github.com/google/certificate-transparency-go/ctpolicy"
28 "github.com/google/certificate-transparency-go/jsonclient"
29 "github.com/google/certificate-transparency-go/loglist3"
30 "github.com/google/certificate-transparency-go/trillian/ctfe"
31 "github.com/google/certificate-transparency-go/x509"
32 "github.com/google/certificate-transparency-go/x509util"
33 "github.com/google/trillian/monitoring"
34 "k8s.io/klog/v2"
35
36 ct "github.com/google/certificate-transparency-go"
37 )
38
39 var (
40
41 distOnce sync.Once
42 reqsCounter monitoring.Counter
43 rspsCounter monitoring.Counter
44 errCounter monitoring.Counter
45 logRspLatency monitoring.Histogram
46
47 lastGetRootsSuccess monitoring.Gauge
48 )
49
50
51 func distInitMetrics(mf monitoring.MetricFactory) {
52 reqsCounter = mf.NewCounter("http_reqs", "Number of requests", "logurl", "ep")
53 rspsCounter = mf.NewCounter("http_rsps", "Number of responses", "logurl", "ep", "httpstatus")
54 errCounter = mf.NewCounter("err_count", "Number of errors", "logurl", "ep", "errtype")
55 logRspLatency = mf.NewHistogram("http_log_latency", "Latency of responses in seconds", "logurl", "ep")
56 lastGetRootsSuccess = mf.NewGauge("last_get_roots_success", "Unix timestamp for last successful get-roots request", "logurl")
57 }
58
59 const (
60
61 getRootsTimeout = time.Second * 10
62 )
63
64
65
66 type pendingLogsPolicy struct {
67 }
68
69 func (stubP pendingLogsPolicy) LogsByGroup(cert *x509.Certificate, approved *loglist3.LogList) (ctpolicy.LogPolicyData, error) {
70 baseGroup, err := ctpolicy.BaseGroupFor(approved, 1)
71 groups := ctpolicy.LogPolicyData{baseGroup.Name: baseGroup}
72 return groups, err
73 }
74
75 func (stubP pendingLogsPolicy) Name() string {
76 return "Pending/Qualified Policy"
77 }
78
79
80 type Distributor struct {
81 ll *loglist3.LogList
82 usableLl *loglist3.LogList
83 pendingQualifiedLl *loglist3.LogList
84
85 mu sync.RWMutex
86
87
88 logClients map[string]client.AddLogClient
89 logRoots loglist3.LogRoots
90 rootPool *x509util.PEMCertPool
91
92 rootDataFull bool
93
94 policy ctpolicy.CTPolicy
95 pendingLogsPolicy ctpolicy.CTPolicy
96 }
97
98
99
100
101
102
103 func (d *Distributor) RefreshRoots(ctx context.Context) map[string]error {
104 type RootsResult struct {
105 LogURL string
106 Roots *x509util.PEMCertPool
107 Err error
108 }
109 ch := make(chan RootsResult, len(d.logClients))
110
111 rctx, cancel := context.WithTimeout(ctx, getRootsTimeout)
112 defer cancel()
113
114 for logURL, lc := range d.logClients {
115 go func(logURL string, lc client.AddLogClient) {
116 res := RootsResult{LogURL: logURL}
117
118 roots, err := lc.GetAcceptedRoots(rctx)
119 if err != nil {
120 res.Err = fmt.Errorf("roots refresh for %s: couldn't collect roots. %s", logURL, err)
121 ch <- res
122 return
123 }
124 res.Roots = x509util.NewPEMCertPool()
125 for _, r := range roots {
126 parsed, err := x509.ParseCertificate(r.Data)
127 if x509.IsFatal(err) {
128 errS := fmt.Errorf("roots refresh for %s: unable to parse root cert: %s", logURL, err)
129 if res.Err != nil {
130 res.Err = fmt.Errorf("%s\n%s", res.Err, errS)
131 } else {
132 res.Err = errS
133 }
134 continue
135 }
136 res.Roots.AddCert(parsed)
137 }
138 ch <- res
139 }(logURL, lc)
140 }
141
142
143 freshRoots := make(loglist3.LogRoots)
144 errors := make(map[string]error)
145 for range d.logClients {
146 r := <-ch
147
148 if r.Err != nil {
149 errors[r.LogURL] = r.Err
150 }
151
152 if r.Roots != nil {
153 freshRoots[r.LogURL] = r.Roots
154 lastGetRootsSuccess.Set(float64(time.Now().Unix()), r.LogURL)
155 }
156 }
157
158 d.mu.Lock()
159 defer d.mu.Unlock()
160
161 d.logRoots = freshRoots
162 d.rootDataFull = len(d.logRoots) == len(d.logClients)
163
164 d.rootPool = x509util.NewPEMCertPool()
165 for _, pool := range d.logRoots {
166 for _, c := range pool.RawCertificates() {
167 d.rootPool.AddCert(c)
168 }
169 }
170
171 return errors
172 }
173
174
175 func incRspsCounter(logURL string, endpoint string, rspErr error) {
176 status := http.StatusOK
177 if rspErr != nil {
178 status = http.StatusBadRequest
179 if err, ok := rspErr.(client.RspError); ok {
180 status = err.StatusCode
181 }
182 }
183 rspsCounter.Inc(logURL, endpoint, strconv.Itoa(status))
184 }
185
186
187
188 func incErrCounter(logURL string, endpoint string, rspErr error) {
189 if rspErr == nil {
190 return
191 }
192 err, ok := rspErr.(client.RspError)
193 switch {
194 case !ok:
195 klog.Errorf("unknown_error (%s, %s) => %v", logURL, endpoint, rspErr)
196 errCounter.Inc(logURL, endpoint, "unknown_error")
197 case err.Err != nil && err.StatusCode == http.StatusOK:
198 klog.Errorf("invalid_sct (%s, %s) => HTTP details: status=%d, body:\n%s", logURL, endpoint, err.StatusCode, err.Body)
199 errCounter.Inc(logURL, endpoint, "invalid_sct")
200 case err.Err != nil:
201 klog.Errorf("connection_error (%s, %s) => HTTP details: status=%d, body:\n%s", logURL, endpoint, err.StatusCode, err.Body)
202 errCounter.Inc(logURL, endpoint, "connection_error")
203 }
204 }
205
206
207 func (d *Distributor) SubmitToLog(ctx context.Context, logURL string, chain []ct.ASN1Cert, asPreChain bool) (*ct.SignedCertificateTimestamp, error) {
208 lc, ok := d.logClients[logURL]
209 if !ok {
210 return nil, fmt.Errorf("no client registered for Log with URL %q", logURL)
211 }
212
213
214 endpoint := string(ctfe.AddChainName)
215 if asPreChain {
216 endpoint = string(ctfe.AddPreChainName)
217 }
218
219 defer func(start time.Time) {
220 logRspLatency.Observe(time.Since(start).Seconds(), logURL, endpoint)
221 }(time.Now())
222 reqsCounter.Inc(logURL, endpoint)
223 addChain := lc.AddChain
224 if asPreChain {
225 addChain = lc.AddPreChain
226 }
227 sct, err := addChain(ctx, chain)
228 incRspsCounter(logURL, endpoint, err)
229 incErrCounter(logURL, endpoint, err)
230 return sct, err
231 }
232
233
234 func parseRawChain(rawChain [][]byte) ([]*x509.Certificate, error) {
235 parsedChain := make([]*x509.Certificate, 0, len(rawChain))
236 for _, certBytes := range rawChain {
237 cert, err := x509.ParseCertificate(certBytes)
238 if x509.IsFatal(err) {
239 return nil, fmt.Errorf("distributor unable to parse cert-chain %v", err)
240 }
241 parsedChain = append(parsedChain, cert)
242 }
243 return parsedChain, nil
244 }
245
246
247
248 func (d *Distributor) addSomeChain(ctx context.Context, rawChain [][]byte, loadPendingLogs bool, asPreChain bool) ([]*AssignedSCT, error) {
249 if len(rawChain) == 0 {
250 return nil, fmt.Errorf("distributor unable to process empty chain")
251 }
252
253
254 compatibleLogsAndChain := func() (loglist3.LogList, []*x509.Certificate, error) {
255 d.mu.RLock()
256 defer d.mu.RUnlock()
257 vOpts := ctfe.NewCertValidationOpts(d.rootPool, time.Time{}, false, false, nil, nil, false, nil)
258 rootedChain, err := ctfe.ValidateChain(rawChain, vOpts)
259 if err == nil {
260 return d.usableLl.Compatible(rootedChain[0], rootedChain[len(rootedChain)-1], d.logRoots), rootedChain, nil
261 }
262 if d.rootDataFull {
263
264 return loglist3.LogList{}, nil, fmt.Errorf("distributor unable to process cert-chain: %v", err)
265 }
266
267
268 parsedChain, err := parseRawChain(rawChain)
269 if err != nil {
270 return loglist3.LogList{}, nil, fmt.Errorf("distributor unable to parse cert-chain: %v", err)
271 }
272 return d.usableLl.Compatible(parsedChain[0], nil, d.logRoots), parsedChain, nil
273 }
274 compatibleLogs, parsedChain, err := compatibleLogsAndChain()
275 if err != nil {
276 return nil, err
277 }
278
279
280 isPrecert, err := ctfe.IsPrecertificate(parsedChain[0])
281 if err != nil {
282 return nil, fmt.Errorf("distributor unable to check certificate %v: \n%v", parsedChain[0], err)
283 }
284 if isPrecert != asPreChain {
285 var methodType, inputType string
286 if asPreChain {
287 methodType = "pre-"
288 }
289 if isPrecert {
290 inputType = "pre-"
291 }
292 return nil, fmt.Errorf("add-%schain method expected %scertificate, got %scertificate", methodType, methodType, inputType)
293 }
294
295
296 groups, err := d.policy.LogsByGroup(parsedChain[0], &compatibleLogs)
297 if err != nil {
298 return nil, fmt.Errorf("distributor does not have enough compatible Logs to comply with the policy: %v", err)
299 }
300 chain := make([]ct.ASN1Cert, len(parsedChain))
301 for i, c := range parsedChain {
302 chain[i] = ct.ASN1Cert{Data: c.Raw}
303 }
304 if loadPendingLogs {
305 go func() {
306 pendingGroup, err := d.pendingLogsPolicy.LogsByGroup(parsedChain[0], d.pendingQualifiedLl)
307 if err != nil {
308 return
309 }
310 if _, err := GetSCTs(ctx, d, chain, asPreChain, pendingGroup); err != nil {
311 klog.Errorf("GetSCTs(): %v", err)
312 }
313 }()
314 }
315 return GetSCTs(ctx, d, chain, asPreChain, groups)
316 }
317
318
319
320
321 func (d *Distributor) AddPreChain(ctx context.Context, rawChain [][]byte, loadPendingLogs bool) ([]*AssignedSCT, error) {
322 return d.addSomeChain(ctx, rawChain, loadPendingLogs, true)
323 }
324
325
326
327
328 func (d *Distributor) AddChain(ctx context.Context, rawChain [][]byte, loadPendingLogs bool) ([]*AssignedSCT, error) {
329 return d.addSomeChain(ctx, rawChain, loadPendingLogs, false)
330 }
331
332
333 type LogClientBuilder func(*loglist3.Log) (client.AddLogClient, error)
334
335
336 func BuildLogClient(log *loglist3.Log) (client.AddLogClient, error) {
337 u, err := url.Parse(log.URL)
338 if err != nil {
339 return nil, err
340 }
341 if u.Scheme == "" {
342 u.Scheme = "https"
343 }
344 hc := &http.Client{Timeout: time.Second * 10}
345 return client.New(u.String(), hc, jsonclient.Options{PublicKeyDER: log.Key})
346 }
347
348
349
350
351
352 func NewDistributor(ll *loglist3.LogList, plc ctpolicy.CTPolicy, lcBuilder LogClientBuilder, mf monitoring.MetricFactory) (*Distributor, error) {
353 var d Distributor
354
355 d.ll = ll
356 usableStat := []loglist3.LogStatus{loglist3.UsableLogStatus}
357 active := ll.SelectByStatus(usableStat)
358 d.usableLl = &active
359 pendingQualifiedStat := []loglist3.LogStatus{
360 loglist3.PendingLogStatus, loglist3.QualifiedLogStatus}
361 pending := ll.SelectByStatus(pendingQualifiedStat)
362 d.pendingQualifiedLl = &pending
363
364 d.policy = plc
365 d.pendingLogsPolicy = pendingLogsPolicy{}
366 d.logClients = make(map[string]client.AddLogClient)
367 d.logRoots = make(loglist3.LogRoots)
368 d.rootPool = x509util.NewPEMCertPool()
369
370
371 if err := d.buildLogClients(lcBuilder, d.usableLl); err != nil {
372 return nil, err
373 }
374 if err := d.buildLogClients(lcBuilder, d.pendingQualifiedLl); err != nil {
375 return nil, err
376 }
377
378 if mf == nil {
379 mf = monitoring.InertMetricFactory{}
380 }
381 distOnce.Do(func() { distInitMetrics(mf) })
382 return &d, nil
383 }
384
385
386
387 func (d *Distributor) buildLogClients(lcBuilder LogClientBuilder, ll *loglist3.LogList) error {
388 for _, op := range ll.Operators {
389 for _, log := range op.Logs {
390 lc, err := lcBuilder(log)
391 if err != nil {
392 return fmt.Errorf("failed to create log client for %s: %v", log.URL, err)
393 }
394 d.logClients[log.URL] = lc
395 }
396 }
397 return nil
398 }
399
View as plain text