...

Source file src/github.com/google/certificate-transparency-go/fixchain/logger.go

Documentation: github.com/google/certificate-transparency-go/fixchain

     1  // Copyright 2016 Google LLC. All Rights Reserved.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package fixchain
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"log"
    21  	"sync"
    22  	"sync/atomic"
    23  	"time"
    24  
    25  	ct "github.com/google/certificate-transparency-go"
    26  	"github.com/google/certificate-transparency-go/client"
    27  	"github.com/google/certificate-transparency-go/x509"
    28  )
    29  
    30  // Limiter is an interface to allow different rate limiters to be used with the
    31  // Logger.
    32  type Limiter interface {
    33  	Wait(context.Context) error
    34  }
    35  
    36  // Logger contains methods to asynchronously log certificate chains to a
    37  // Certificate Transparency log and properties to store information about each
    38  // attempt that is made to post a certificate chain to said log.
    39  type Logger struct {
    40  	ctx    context.Context
    41  	client client.AddLogClient
    42  	roots  *x509.CertPool
    43  	toPost chan *toPost
    44  	errors chan<- *FixError
    45  
    46  	active uint32
    47  
    48  	queued        uint32 // How many chains have been queued to be posted.
    49  	posted        uint32 // How many chains have been posted.
    50  	reposted      uint32 // How many chains for an already-posted cert have been queued.
    51  	chainReposted uint32 // How many chains have been queued again.
    52  
    53  	// Note that wg counts the number of active requests, not
    54  	// active servers, because we can't close it to signal the
    55  	// end, because of retries.
    56  	wg      sync.WaitGroup
    57  	limiter Limiter
    58  
    59  	postCertCache  *lockedMap
    60  	postChainCache *lockedMap
    61  }
    62  
    63  // IsPosted tells the caller whether a chain for the given certificate has
    64  // already been successfully posted to the log by this Logger.
    65  func (l *Logger) IsPosted(cert *x509.Certificate) bool {
    66  	return l.postCertCache.get(hash(cert))
    67  }
    68  
    69  // QueueChain adds the given chain to the queue to be posted to the log.
    70  func (l *Logger) QueueChain(chain []*x509.Certificate) {
    71  	if chain == nil {
    72  		return
    73  	}
    74  
    75  	atomic.AddUint32(&l.queued, 1)
    76  	// Has a chain for the cert this chain if for already been successfully
    77  	//posted to the log by this Logger?
    78  	h := hash(chain[0]) // Chains are cert -> root
    79  	if l.postCertCache.get(h) {
    80  		atomic.AddUint32(&l.reposted, 1)
    81  		return // Don't post chain for a cert that has already had a chain posted.
    82  	}
    83  	// If we assume all chains for the same cert are equally
    84  	// likely to succeed, then we could mark the cert as posted
    85  	// here. However, bugs might cause a log to refuse one chain
    86  	// and accept another, so try each unique chain.
    87  
    88  	// Has this Logger already tried to post this chain?
    89  	h = hashChain(chain)
    90  	if l.postChainCache.get(h) {
    91  		atomic.AddUint32(&l.chainReposted, 1)
    92  		return
    93  	}
    94  	l.postChainCache.set(h, true)
    95  
    96  	l.postToLog(&toPost{chain: chain})
    97  }
    98  
    99  // Wait for all of the active requests to finish being processed.
   100  func (l *Logger) Wait() {
   101  	l.wg.Wait()
   102  }
   103  
   104  // RootCerts returns the root certificates that the log accepts.
   105  func (l *Logger) RootCerts() *x509.CertPool {
   106  	if l.roots == nil {
   107  		// Retry if unable to get roots.
   108  		for i := 0; i < 10; i++ {
   109  			roots, err := l.getRoots()
   110  			if err == nil {
   111  				l.roots = roots
   112  				return l.roots
   113  			}
   114  			log.Println(err)
   115  		}
   116  		log.Fatalf("Can't get roots for log")
   117  	}
   118  	return l.roots
   119  }
   120  
   121  func (l *Logger) getRoots() (*x509.CertPool, error) {
   122  	roots, err := l.client.GetAcceptedRoots(l.ctx)
   123  	if err != nil {
   124  		return nil, fmt.Errorf("failed to get roots: %s", err)
   125  	}
   126  	ret := x509.NewCertPool()
   127  	for _, root := range roots {
   128  		r, err := x509.ParseCertificate(root.Data)
   129  		if x509.IsFatal(err) {
   130  			return nil, fmt.Errorf("can't parse certificate: %s %#v", err, root.Data)
   131  		}
   132  		ret.AddCert(r)
   133  	}
   134  	return ret, nil
   135  }
   136  
   137  type toPost struct {
   138  	chain []*x509.Certificate
   139  }
   140  
   141  // postToLog() is used during the initial queueing of chains to avoid spinning
   142  // up an excessive number of goroutines, and unnecessarily using up memory. If
   143  // asyncPostToLog() was called instead, then every time a new chain was queued,
   144  // a new goroutine would be created, each holding their own chain - regardless
   145  // of whether there were postServers available to process them or not.  If a
   146  // large number of chains were queued in a short period of time, this could
   147  // lead to a large number of these additional goroutines being created,
   148  // resulting in excessive memory usage.
   149  func (l *Logger) postToLog(p *toPost) {
   150  	l.wg.Add(1) // Add to the wg as we are adding a new active request to the logger queue.
   151  	l.toPost <- p
   152  }
   153  
   154  func (l *Logger) postChain(p *toPost) {
   155  	h := hash(p.chain[0])
   156  	if l.postCertCache.get(h) {
   157  		atomic.AddUint32(&l.reposted, 1)
   158  		return
   159  	}
   160  
   161  	derChain := make([]ct.ASN1Cert, 0, len(p.chain))
   162  	for _, cert := range p.chain {
   163  		derChain = append(derChain, ct.ASN1Cert{Data: cert.Raw})
   164  	}
   165  
   166  	if err := l.limiter.Wait(l.ctx); err != nil {
   167  		log.Println(err)
   168  	}
   169  	atomic.AddUint32(&l.posted, 1)
   170  	_, err := l.client.AddChain(l.ctx, derChain)
   171  	if err != nil {
   172  		l.errors <- &FixError{
   173  			Type:  LogPostFailed,
   174  			Chain: p.chain,
   175  			Error: fmt.Errorf("add-chain failed: %s", err),
   176  		}
   177  		return
   178  	}
   179  
   180  	// If the post was successful, cache.
   181  	l.postCertCache.set(h, true)
   182  }
   183  
   184  func (l *Logger) postServer() {
   185  	for {
   186  		c := <-l.toPost
   187  		atomic.AddUint32(&l.active, 1)
   188  		l.postChain(c)
   189  		atomic.AddUint32(&l.active, ^uint32(0))
   190  		l.wg.Done()
   191  	}
   192  }
   193  
   194  func (l *Logger) logStats() {
   195  	t := time.NewTicker(time.Second)
   196  	go func() {
   197  		for range t.C {
   198  			log.Printf("posters: %d active, %d posted, %d queued, %d certs requeued, %d chains requeued",
   199  				l.active, l.posted, l.queued, l.reposted, l.chainReposted)
   200  		}
   201  	}()
   202  }
   203  
   204  // NewLogger creates a new asynchronous logger to log chains to the
   205  // Certificate Transparency log at the given url.  It starts up a pool of
   206  // workerCount workers.  Errors are pushed to the errors channel.  client is
   207  // used to post the chains to the log.
   208  func NewLogger(ctx context.Context, workerCount int, errors chan<- *FixError, client client.AddLogClient, limiter Limiter, logStats bool) *Logger {
   209  	l := &Logger{
   210  		ctx:            ctx,
   211  		client:         client,
   212  		errors:         errors,
   213  		toPost:         make(chan *toPost),
   214  		postCertCache:  newLockedMap(),
   215  		postChainCache: newLockedMap(),
   216  		limiter:        limiter,
   217  	}
   218  	l.RootCerts()
   219  
   220  	// Start post server pool.
   221  	for i := 0; i < workerCount; i++ {
   222  		go l.postServer()
   223  	}
   224  
   225  	if logStats {
   226  		l.logStats()
   227  	}
   228  	return l
   229  }
   230  

View as plain text