...

Source file src/github.com/google/certificate-transparency-go/internal/witness/cmd/feeder/main.go

Documentation: github.com/google/certificate-transparency-go/internal/witness/cmd/feeder

     1  // Copyright 2021 Google LLC. All Rights Reserved.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // feeder polls the sumdb log and pushes the results to a generic witness.
    16  package main
    17  
    18  import (
    19  	"context"
    20  	"encoding/base64"
    21  	"encoding/json"
    22  	"encoding/pem"
    23  	"errors"
    24  	"flag"
    25  	"fmt"
    26  	"io"
    27  	"net/http"
    28  	"net/url"
    29  	"os"
    30  	"sync"
    31  	"time"
    32  
    33  	ct "github.com/google/certificate-transparency-go"
    34  	"github.com/google/certificate-transparency-go/client"
    35  	wh "github.com/google/certificate-transparency-go/internal/witness/client/http"
    36  	"github.com/google/certificate-transparency-go/jsonclient"
    37  	"github.com/google/certificate-transparency-go/loglist3"
    38  	"k8s.io/klog/v2"
    39  )
    40  
    41  var (
    42  	logList  = flag.String("log_list_url", "https://www.gstatic.com/ct/log_list/v3/log_list.json", "The location of the log list")
    43  	witness  = flag.String("witness_url", "", "The endpoint of the witness HTTP API")
    44  	interval = flag.Duration("poll", 10*time.Second, "How quickly to poll the log to get updates")
    45  )
    46  
    47  // ctLog contains the latest witnessed STH for a log and a log client.
    48  type ctLog struct {
    49  	id     string
    50  	name   string
    51  	wsth   *ct.SignedTreeHead
    52  	client *client.LogClient
    53  }
    54  
    55  // populateLogs populates a list of ctLogs based on the log list.
    56  func populateLogs(logListURL string) ([]ctLog, error) {
    57  	u, err := url.Parse(logListURL)
    58  	if err != nil {
    59  		return nil, fmt.Errorf("failed to parse URL: %v", err)
    60  	}
    61  	body, err := readURL(u)
    62  	if err != nil {
    63  		return nil, fmt.Errorf("failed to get log list data: %v", err)
    64  	}
    65  	// Get data for all usable logs.
    66  	logList, err := loglist3.NewFromJSON(body)
    67  	if err != nil {
    68  		return nil, fmt.Errorf("failed to parse JSON: %v", err)
    69  	}
    70  	usable := logList.SelectByStatus([]loglist3.LogStatus{loglist3.UsableLogStatus})
    71  	var logs []ctLog
    72  	for _, operator := range usable.Operators {
    73  		for _, log := range operator.Logs {
    74  			logID := base64.StdEncoding.EncodeToString(log.LogID)
    75  			c, err := createLogClient(log.Key, log.URL)
    76  			if err != nil {
    77  				return nil, fmt.Errorf("failed to create log client: %v", err)
    78  			}
    79  			l := ctLog{
    80  				id:     logID,
    81  				name:   log.Description,
    82  				client: c,
    83  			}
    84  			logs = append(logs, l)
    85  		}
    86  	}
    87  	return logs, nil
    88  }
    89  
    90  // createLogClient creates a CT log client from a public key and URL.
    91  func createLogClient(key []byte, url string) (*client.LogClient, error) {
    92  	pemPK := pem.EncodeToMemory(&pem.Block{
    93  		Type:  "PUBLIC KEY",
    94  		Bytes: key,
    95  	})
    96  	opts := jsonclient.Options{PublicKey: string(pemPK)}
    97  	c, err := client.New(url, http.DefaultClient, opts)
    98  	if err != nil {
    99  		return nil, fmt.Errorf("failed to create JSON client: %v", err)
   100  	}
   101  	return c, nil
   102  }
   103  
   104  func main() {
   105  	klog.InitFlags(nil)
   106  	flag.Parse()
   107  	if *witness == "" {
   108  		klog.Exit("--witness_url must not be empty")
   109  	}
   110  	ctx := context.Background()
   111  	// Set up the witness client.
   112  	var w wh.Witness
   113  	if wURL, err := url.Parse(*witness); err != nil {
   114  		klog.Exitf("Failed to parse witness URL: %v", err)
   115  	} else {
   116  		w = wh.Witness{
   117  			URL: wURL,
   118  		}
   119  	}
   120  	// Now set up the log data (with no initial witness STH).
   121  	ctLogs, err := populateLogs(*logList)
   122  	if err != nil {
   123  		klog.Exitf("Failed to set up log data: %v", err)
   124  	}
   125  	// Now feed each log.
   126  	wg := &sync.WaitGroup{}
   127  	for _, log := range ctLogs {
   128  		wg.Add(1)
   129  		go func(witness *wh.Witness, log ctLog) {
   130  			defer wg.Done()
   131  			if err := log.feed(ctx, witness, *interval); err != nil {
   132  				klog.Errorf("feedLog: %v", err)
   133  			}
   134  		}(&w, log)
   135  	}
   136  	wg.Wait()
   137  }
   138  
   139  // feed feeds continuously for a given log, returning only when the context
   140  // is done.
   141  func (l *ctLog) feed(ctx context.Context, witness *wh.Witness, interval time.Duration) error {
   142  	tik := time.NewTicker(interval)
   143  	defer tik.Stop()
   144  	for {
   145  		func() {
   146  			ctx, cancel := context.WithTimeout(ctx, interval)
   147  			defer cancel()
   148  
   149  			klog.V(2).Infof("Start feedOnce for %s", l.name)
   150  			if err := l.feedOnce(ctx, witness); err != nil {
   151  				klog.Warningf("Failed to feed for %s: %v", l.name, err)
   152  			}
   153  			klog.V(2).Infof("feedOnce complete for %s", l.name)
   154  		}()
   155  
   156  		select {
   157  		case <-ctx.Done():
   158  			return ctx.Err()
   159  		case <-tik.C:
   160  		}
   161  	}
   162  }
   163  
   164  // feedOnce attempts to update the STH held by the witness to the latest STH
   165  // provided by the log.
   166  func (l *ctLog) feedOnce(ctx context.Context, w *wh.Witness) error {
   167  	// Get and parse the latest STH from the log.
   168  	var sthResp ct.GetSTHResponse
   169  	_, csthRaw, err := l.client.GetAndParse(ctx, ct.GetSTHPath, nil, &sthResp)
   170  	if err != nil {
   171  		return fmt.Errorf("failed to get latest STH: %v", err)
   172  	}
   173  	csth, err := sthResp.ToSignedTreeHead()
   174  	if err != nil {
   175  		return fmt.Errorf("failed to parse response as STH: %v", err)
   176  	}
   177  	wSize, err := l.latestSize(ctx, w)
   178  	if err != nil {
   179  		return fmt.Errorf("failed to get latest size for %s: %v", l.name, err)
   180  	}
   181  	if wSize >= csth.TreeSize {
   182  		klog.V(1).Infof("Witness size %d >= log size %d for %s - nothing to do", wSize, csth.TreeSize, l.name)
   183  		return nil
   184  	}
   185  
   186  	klog.Infof("Updating witness from size %d to %d for %s", wSize, csth.TreeSize, l.name)
   187  	// If we want to update the witness then let's get a consistency proof.
   188  	var pf [][]byte
   189  	if wSize > 0 {
   190  		pf, err = l.client.GetSTHConsistency(ctx, wSize, csth.TreeSize)
   191  		if err != nil {
   192  			return fmt.Errorf("failed to get consistency proof: %v", err)
   193  		}
   194  	}
   195  	// Now give the new STH and consistency proof to the witness.
   196  	wsthRaw, err := w.Update(ctx, l.id, csthRaw, pf)
   197  	if err != nil && !errors.Is(err, wh.ErrSTHTooOld) {
   198  		return fmt.Errorf("failed to update STH: %v", err)
   199  	}
   200  	if errors.Is(err, wh.ErrSTHTooOld) {
   201  		klog.Infof("STH mismatch at log size %d for %s", wSize, l.name)
   202  		klog.Infof("%s", wsthRaw)
   203  	}
   204  	// Parse the STH it returns.
   205  	var wsthJSON ct.GetSTHResponse
   206  	if err := json.Unmarshal(wsthRaw, &wsthJSON); err != nil {
   207  		return fmt.Errorf("failed to unmarshal json: %v", err)
   208  	}
   209  	wsth, err := wsthJSON.ToSignedTreeHead()
   210  	if err != nil {
   211  		return fmt.Errorf("failed to create STH: %v", err)
   212  	}
   213  	// For now just update our local state with whatever the witness
   214  	// returns, even if we got wh.ErrSTHTooOld.  This is fine if we're the
   215  	// only feeder for this witness.
   216  	l.wsth = wsth
   217  	return nil
   218  }
   219  
   220  // latestSize returns the size of the latest witness STH.  If this is nil then
   221  // it first checks with the witness to see if it has anything stored before
   222  // returning 0.
   223  func (l *ctLog) latestSize(ctx context.Context, w *wh.Witness) (uint64, error) {
   224  	if l.wsth != nil {
   225  		return l.wsth.TreeSize, nil
   226  	}
   227  	wsthRaw, err := w.GetLatestSTH(ctx, l.id)
   228  	if err != nil {
   229  		if errors.Is(err, os.ErrNotExist) {
   230  			// If the witness has no stored STH then 0 is the correct size.
   231  			return 0, nil
   232  		}
   233  		return 0, err
   234  	}
   235  	var wsthJSON ct.GetSTHResponse
   236  	if err := json.Unmarshal(wsthRaw, &wsthJSON); err != nil {
   237  		return 0, fmt.Errorf("failed to unmarshal json: %v", err)
   238  	}
   239  	wsth, err := wsthJSON.ToSignedTreeHead()
   240  	if err != nil {
   241  		return 0, fmt.Errorf("failed to create STH: %v", err)
   242  	}
   243  	l.wsth = wsth
   244  	return wsth.TreeSize, nil
   245  }
   246  
   247  var getByScheme = map[string]func(*url.URL) ([]byte, error){
   248  	"http":  readHTTP,
   249  	"https": readHTTP,
   250  	"file": func(u *url.URL) ([]byte, error) {
   251  		return os.ReadFile(u.Path)
   252  	},
   253  }
   254  
   255  // readHTTP fetches and reads data from an HTTP-based URL.
   256  func readHTTP(u *url.URL) ([]byte, error) {
   257  	resp, err := http.Get(u.String())
   258  	if err != nil {
   259  		return nil, err
   260  	}
   261  	defer resp.Body.Close()
   262  	return io.ReadAll(resp.Body)
   263  }
   264  
   265  // readURL fetches and reads data from an HTTP-based or filesystem URL.
   266  func readURL(u *url.URL) ([]byte, error) {
   267  	s := u.Scheme
   268  	queryFn, ok := getByScheme[s]
   269  	if !ok {
   270  		return nil, fmt.Errorf("failed to identify suitable scheme for the URL %q", u.String())
   271  	}
   272  	return queryFn(u)
   273  }
   274  

View as plain text