1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package main
18
19 import (
20 "context"
21 "errors"
22 "flag"
23 "fmt"
24 "net/http"
25 "os"
26 "strings"
27 "time"
28
29 clientv3 "go.etcd.io/etcd/client/v3"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/credentials/insecure"
32 "k8s.io/klog/v2"
33
34 "github.com/google/certificate-transparency-go/client"
35 "github.com/google/certificate-transparency-go/jsonclient"
36 "github.com/google/certificate-transparency-go/trillian/migrillian/configpb"
37 "github.com/google/certificate-transparency-go/trillian/migrillian/core"
38 "github.com/google/trillian"
39 "github.com/google/trillian/monitoring"
40 "github.com/google/trillian/monitoring/prometheus"
41 "github.com/google/trillian/util"
42 "github.com/google/trillian/util/election2"
43 etcdelect "github.com/google/trillian/util/election2/etcd"
44 "github.com/prometheus/client_golang/prometheus/promhttp"
45 )
46
47 var (
48 cfgPath = flag.String("config", "", "Path to migration config file")
49
50 forceMaster = flag.Bool("force_master", false, "If true, assume master for all logs")
51 etcdServers = flag.String("etcd_servers", "", "A comma-separated list of etcd servers; no etcd registration if empty")
52 lockDir = flag.String("lock_file_path", "/migrillian/master", "etcd lock file directory path")
53 electionDelay = flag.Duration("election_delay", 0, "Max random pause before participating in master election")
54 backend = flag.String("backend", "", "GRPC endpoint to connect to Trillian logservers")
55
56 metricsEndpoint = flag.String("metrics_endpoint", "localhost:8099", "Endpoint for serving metrics")
57
58 maxIdleConnsPerHost = flag.Int("max_idle_conns_per_host", 10, "Max idle HTTP connections per host (0 = DefaultMaxIdleConnsPerHost)")
59 maxIdleConns = flag.Int("max_idle_conns", 100, "Max number of idle HTTP connections across all hosts (0 = unlimited)")
60 )
61
62 func main() {
63 klog.InitFlags(nil)
64 flag.Parse()
65 klog.CopyStandardLogTo("WARNING")
66 defer klog.Flush()
67
68 if *backend == "" {
69 klog.Exit("--backend flag must be specified")
70 }
71 cfg, err := getConfig()
72 if err != nil {
73 klog.Exitf("Failed to load MigrillianConfig: %v", err)
74 }
75 if err := core.ValidateConfig(cfg); err != nil {
76 klog.Exitf("Failed to validate MigrillianConfig: %v", err)
77 }
78
79 klog.Infof("Dialling Trillian backend: %v", *backend)
80 conn, err := grpc.Dial(*backend, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
81 if err != nil {
82 klog.Exitf("Could not dial Trillian server: %v: %v", *backend, err)
83 }
84 defer conn.Close()
85
86 httpClient := getHTTPClient()
87 mf := prometheus.MetricFactory{}
88 ef, closeFn := getElectionFactory()
89 defer closeFn()
90
91 ctx := context.Background()
92 var ctrls []*core.Controller
93 for _, mc := range cfg.MigrationConfigs.Config {
94 ctrl, err := getController(ctx, mc, httpClient, mf, ef, conn)
95 if err != nil {
96 klog.Exitf("Failed to create Controller for %q: %v", mc.SourceUri, err)
97 }
98 ctrls = append(ctrls, ctrl)
99 }
100
101
102 http.Handle("/metrics", promhttp.Handler())
103 go func() {
104 err := http.ListenAndServe(*metricsEndpoint, nil)
105 klog.Fatalf("http.ListenAndServe(): %v", err)
106 }()
107
108 cctx, cancel := context.WithCancel(ctx)
109 defer cancel()
110 go util.AwaitSignal(cctx, cancel)
111
112 core.RunMigration(cctx, ctrls)
113 }
114
115
116 func getController(
117 ctx context.Context,
118 cfg *configpb.MigrationConfig,
119 httpClient *http.Client,
120 mf monitoring.MetricFactory,
121 ef election2.Factory,
122 conn *grpc.ClientConn,
123 ) (*core.Controller, error) {
124 ctOpts := jsonclient.Options{PublicKeyDER: cfg.PublicKey.Der, UserAgent: "ct-go-migrillian/1.0"}
125 ctClient, err := client.New(cfg.SourceUri, httpClient, ctOpts)
126 if err != nil {
127 return nil, fmt.Errorf("failed to create CT client: %v", err)
128 }
129 plClient, err := newPreorderedLogClient(ctx, conn, cfg)
130 if err != nil {
131 return nil, fmt.Errorf("failed to create PreorderedLogClient: %v", err)
132 }
133
134 opts := core.OptionsFromConfig(cfg)
135 opts.StartDelay = *electionDelay
136 return core.NewController(opts, ctClient, plClient, ef, mf), nil
137 }
138
139
140 func getConfig() (*configpb.MigrillianConfig, error) {
141 if len(*cfgPath) == 0 {
142 return nil, errors.New("config file not specified")
143 }
144 cfg, err := core.LoadConfigFromFile(*cfgPath)
145 if err != nil {
146 return nil, err
147 }
148 return cfg, nil
149 }
150
151
152 func getHTTPClient() *http.Client {
153 transport := &http.Transport{
154 TLSHandshakeTimeout: 30 * time.Second,
155 DisableKeepAlives: false,
156 MaxIdleConns: *maxIdleConns,
157 MaxIdleConnsPerHost: *maxIdleConnsPerHost,
158 IdleConnTimeout: 90 * time.Second,
159 ResponseHeaderTimeout: 30 * time.Second,
160 ExpectContinueTimeout: 1 * time.Second,
161 }
162
163 return &http.Client{
164 Timeout: 10 * time.Second,
165 Transport: transport,
166 }
167 }
168
169
170 func newPreorderedLogClient(
171 ctx context.Context,
172 conn *grpc.ClientConn,
173 cfg *configpb.MigrationConfig,
174 ) (*core.PreorderedLogClient, error) {
175 admin := trillian.NewTrillianAdminClient(conn)
176 gt := trillian.GetTreeRequest{TreeId: cfg.LogId}
177 tree, err := admin.GetTree(ctx, >)
178 if err != nil {
179 return nil, err
180 }
181 log := trillian.NewTrillianLogClient(conn)
182 pref := fmt.Sprintf("%d", cfg.LogId)
183 return core.NewPreorderedLogClient(log, tree, cfg.IdentityFunction, pref)
184 }
185
186
187
188 func getElectionFactory() (election2.Factory, func()) {
189 if *forceMaster {
190 klog.Warning("Acting as master for all logs")
191 return election2.NoopFactory{}, func() {}
192 }
193 if len(*etcdServers) == 0 {
194 klog.Exit("Either --force_master or --etcd_servers must be supplied")
195 }
196
197 cli, err := clientv3.New(clientv3.Config{
198 Endpoints: strings.Split(*etcdServers, ","),
199 DialTimeout: 5 * time.Second,
200 })
201 if err != nil || cli == nil {
202 klog.Exitf("Failed to create etcd client: %v", err)
203 }
204 closeFn := func() {
205 if err := cli.Close(); err != nil {
206 klog.Warningf("etcd client Close(): %v", err)
207 }
208 }
209
210 hostname, _ := os.Hostname()
211 instanceID := fmt.Sprintf("%s.%d", hostname, os.Getpid())
212 factory := etcdelect.NewFactory(instanceID, cli, *lockDir)
213
214 return factory, closeFn
215 }
216
View as plain text