1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package etcdmain
16
17 import (
18 "encoding/json"
19 "fmt"
20 "io/ioutil"
21 "net/http"
22 "os"
23 "path/filepath"
24 "reflect"
25 "runtime"
26 "strings"
27 "time"
28
29 "go.etcd.io/etcd/client/pkg/v3/fileutil"
30 "go.etcd.io/etcd/client/pkg/v3/logutil"
31 "go.etcd.io/etcd/client/pkg/v3/transport"
32 "go.etcd.io/etcd/client/pkg/v3/types"
33 pkgioutil "go.etcd.io/etcd/pkg/v3/ioutil"
34 "go.etcd.io/etcd/pkg/v3/osutil"
35 "go.etcd.io/etcd/server/v3/embed"
36 "go.etcd.io/etcd/server/v3/etcdserver"
37 "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
38 "go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery"
39 "go.etcd.io/etcd/server/v3/proxy/httpproxy"
40
41 "go.uber.org/zap"
42 "google.golang.org/grpc"
43 )
44
45 type dirType string
46
47 var (
48 dirMember = dirType("member")
49 dirProxy = dirType("proxy")
50 dirEmpty = dirType("empty")
51 )
52
53 func startEtcdOrProxyV2(args []string) {
54 grpc.EnableTracing = false
55
56 cfg := newConfig()
57 defaultInitialCluster := cfg.ec.InitialCluster
58
59 err := cfg.parse(args[1:])
60 lg := cfg.ec.GetLogger()
61
62
63
64 if lg == nil {
65 var zapError error
66
67 lg, zapError = logutil.CreateDefaultZapLogger(zap.InfoLevel)
68 if zapError != nil {
69 fmt.Printf("error creating zap logger %v", zapError)
70 os.Exit(1)
71 }
72 }
73 lg.Info("Running: ", zap.Strings("args", args))
74 if err != nil {
75 lg.Warn("failed to verify flags", zap.Error(err))
76 switch err {
77 case embed.ErrUnsetAdvertiseClientURLsFlag:
78 lg.Warn("advertise client URLs are not set", zap.Error(err))
79 }
80 os.Exit(1)
81 }
82
83 cfg.ec.SetupGlobalLoggers()
84
85 defer func() {
86 logger := cfg.ec.GetLogger()
87 if logger != nil {
88 logger.Sync()
89 }
90 }()
91
92 defaultHost, dhErr := (&cfg.ec).UpdateDefaultClusterFromName(defaultInitialCluster)
93 if defaultHost != "" {
94 lg.Info(
95 "detected default host for advertise",
96 zap.String("host", defaultHost),
97 )
98 }
99 if dhErr != nil {
100 lg.Info("failed to detect default host", zap.Error(dhErr))
101 }
102
103 if cfg.ec.Dir == "" {
104 cfg.ec.Dir = fmt.Sprintf("%v.etcd", cfg.ec.Name)
105 lg.Warn(
106 "'data-dir' was empty; using default",
107 zap.String("data-dir", cfg.ec.Dir),
108 )
109 }
110
111 var stopped <-chan struct{}
112 var errc <-chan error
113
114 which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)
115 if which != dirEmpty {
116 lg.Info(
117 "server has been already initialized",
118 zap.String("data-dir", cfg.ec.Dir),
119 zap.String("dir-type", string(which)),
120 )
121 switch which {
122 case dirMember:
123 stopped, errc, err = startEtcd(&cfg.ec)
124 case dirProxy:
125 err = startProxy(cfg)
126 default:
127 lg.Panic(
128 "unknown directory type",
129 zap.String("dir-type", string(which)),
130 )
131 }
132 } else {
133 shouldProxy := cfg.isProxy()
134 if !shouldProxy {
135 stopped, errc, err = startEtcd(&cfg.ec)
136 if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == v2discovery.ErrFullCluster {
137 if cfg.shouldFallbackToProxy() {
138 lg.Warn(
139 "discovery cluster is full, falling back to proxy",
140 zap.String("fallback-proxy", fallbackFlagProxy),
141 zap.Error(err),
142 )
143 shouldProxy = true
144 }
145 } else if err != nil {
146 lg.Warn("failed to start etcd", zap.Error(err))
147 }
148 }
149 if shouldProxy {
150 err = startProxy(cfg)
151 }
152 }
153
154 if err != nil {
155 if derr, ok := err.(*etcdserver.DiscoveryError); ok {
156 switch derr.Err {
157 case v2discovery.ErrDuplicateID:
158 lg.Warn(
159 "member has been registered with discovery service",
160 zap.String("name", cfg.ec.Name),
161 zap.String("discovery-token", cfg.ec.Durl),
162 zap.Error(derr.Err),
163 )
164 lg.Warn(
165 "but could not find valid cluster configuration",
166 zap.String("data-dir", cfg.ec.Dir),
167 )
168 lg.Warn("check data dir if previous bootstrap succeeded")
169 lg.Warn("or use a new discovery token if previous bootstrap failed")
170
171 case v2discovery.ErrDuplicateName:
172 lg.Warn(
173 "member with duplicated name has already been registered",
174 zap.String("discovery-token", cfg.ec.Durl),
175 zap.Error(derr.Err),
176 )
177 lg.Warn("cURL the discovery token URL for details")
178 lg.Warn("do not reuse discovery token; generate a new one to bootstrap a cluster")
179
180 default:
181 lg.Warn(
182 "failed to bootstrap; discovery token was already used",
183 zap.String("discovery-token", cfg.ec.Durl),
184 zap.Error(err),
185 )
186 lg.Warn("do not reuse discovery token; generate a new one to bootstrap a cluster")
187 }
188 os.Exit(1)
189 }
190
191 if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") {
192 lg.Warn("failed to start", zap.Error(err))
193 if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) {
194 lg.Warn("forgot to set --initial-cluster?")
195 }
196 if types.URLs(cfg.ec.AdvertisePeerUrls).String() == embed.DefaultInitialAdvertisePeerURLs {
197 lg.Warn("forgot to set --initial-advertise-peer-urls?")
198 }
199 if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) && len(cfg.ec.Durl) == 0 {
200 lg.Warn("--discovery flag is not set")
201 }
202 os.Exit(1)
203 }
204 lg.Fatal("discovery failed", zap.Error(err))
205 }
206
207 osutil.HandleInterrupts(lg)
208
209
210
211
212
213
214 notifySystemd(lg)
215
216 select {
217 case lerr := <-errc:
218
219 lg.Fatal("listener failed", zap.Error(lerr))
220 case <-stopped:
221 }
222
223 osutil.Exit(0)
224 }
225
226
227 func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
228 e, err := embed.StartEtcd(cfg)
229 if err != nil {
230 return nil, nil, err
231 }
232 osutil.RegisterInterruptHandler(e.Close)
233 select {
234 case <-e.Server.ReadyNotify():
235 case <-e.Server.StopNotify():
236 }
237 return e.Server.StopNotify(), e.Err(), nil
238 }
239
240
241 func startProxy(cfg *config) error {
242 lg := cfg.ec.GetLogger()
243 lg.Info("v2 API proxy starting")
244
245 clientTLSInfo := cfg.ec.ClientTLSInfo
246 if clientTLSInfo.Empty() {
247
248
249 clientTLSInfo = cfg.ec.PeerTLSInfo
250 }
251 clientTLSInfo.InsecureSkipVerify = cfg.ec.ClientAutoTLS
252 cfg.ec.PeerTLSInfo.InsecureSkipVerify = cfg.ec.PeerAutoTLS
253
254 pt, err := transport.NewTimeoutTransport(
255 clientTLSInfo,
256 time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond,
257 time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond,
258 time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond,
259 )
260 if err != nil {
261 return err
262 }
263 pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost
264
265 if err = cfg.ec.PeerSelfCert(); err != nil {
266 lg.Fatal("failed to get self-signed certs for peer", zap.Error(err))
267 }
268 tr, err := transport.NewTimeoutTransport(
269 cfg.ec.PeerTLSInfo,
270 time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond,
271 time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond,
272 time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond,
273 )
274 if err != nil {
275 return err
276 }
277
278 cfg.ec.Dir = filepath.Join(cfg.ec.Dir, "proxy")
279 err = fileutil.TouchDirAll(lg, cfg.ec.Dir)
280 if err != nil {
281 return err
282 }
283
284 var peerURLs []string
285 clusterfile := filepath.Join(cfg.ec.Dir, "cluster")
286
287 b, err := ioutil.ReadFile(clusterfile)
288 switch {
289 case err == nil:
290 if cfg.ec.Durl != "" {
291 lg.Warn(
292 "discovery token ignored since the proxy has already been initialized; valid cluster file found",
293 zap.String("cluster-file", clusterfile),
294 )
295 }
296 if cfg.ec.DNSCluster != "" {
297 lg.Warn(
298 "DNS SRV discovery ignored since the proxy has already been initialized; valid cluster file found",
299 zap.String("cluster-file", clusterfile),
300 )
301 }
302 urls := struct{ PeerURLs []string }{}
303 err = json.Unmarshal(b, &urls)
304 if err != nil {
305 return err
306 }
307 peerURLs = urls.PeerURLs
308 lg.Info(
309 "proxy using peer URLS from cluster file",
310 zap.Strings("peer-urls", peerURLs),
311 zap.String("cluster-file", clusterfile),
312 )
313
314 case os.IsNotExist(err):
315 var urlsmap types.URLsMap
316 urlsmap, _, err = cfg.ec.PeerURLsMapAndToken("proxy")
317 if err != nil {
318 return fmt.Errorf("error setting up initial cluster: %v", err)
319 }
320
321 if cfg.ec.Durl != "" {
322 var s string
323 s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy)
324 if err != nil {
325 return err
326 }
327 if urlsmap, err = types.NewURLsMap(s); err != nil {
328 return err
329 }
330 }
331 peerURLs = urlsmap.URLs()
332 lg.Info("proxy using peer URLS", zap.Strings("peer-urls", peerURLs))
333
334 default:
335 return err
336 }
337
338 clientURLs := []string{}
339 uf := func() []string {
340 gcls, gerr := etcdserver.GetClusterFromRemotePeers(lg, peerURLs, tr)
341 if gerr != nil {
342 lg.Warn(
343 "failed to get cluster from remote peers",
344 zap.Strings("peer-urls", peerURLs),
345 zap.Error(gerr),
346 )
347 return []string{}
348 }
349
350 clientURLs = gcls.ClientURLs()
351 urls := struct{ PeerURLs []string }{gcls.PeerURLs()}
352 b, jerr := json.Marshal(urls)
353 if jerr != nil {
354 lg.Warn("proxy failed to marshal peer URLs", zap.Error(jerr))
355 return clientURLs
356 }
357
358 err = pkgioutil.WriteAndSyncFile(clusterfile+".bak", b, 0600)
359 if err != nil {
360 lg.Warn("proxy failed to write cluster file", zap.Error(err))
361 return clientURLs
362 }
363 err = os.Rename(clusterfile+".bak", clusterfile)
364 if err != nil {
365 lg.Warn(
366 "proxy failed to rename cluster file",
367 zap.String("path", clusterfile),
368 zap.Error(err),
369 )
370 return clientURLs
371 }
372 if !reflect.DeepEqual(gcls.PeerURLs(), peerURLs) {
373 lg.Info(
374 "proxy updated peer URLs",
375 zap.Strings("from", peerURLs),
376 zap.Strings("to", gcls.PeerURLs()),
377 )
378 }
379 peerURLs = gcls.PeerURLs()
380
381 return clientURLs
382 }
383 ph := httpproxy.NewHandler(lg, pt, uf, time.Duration(cfg.cp.ProxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.cp.ProxyRefreshIntervalMs)*time.Millisecond)
384 ph = embed.WrapCORS(cfg.ec.CORS, ph)
385
386 if cfg.isReadonlyProxy() {
387 ph = httpproxy.NewReadonlyHandler(ph)
388 }
389
390
391 cHosts, cTLS := []string{}, false
392 for _, u := range cfg.ec.ListenClientUrls {
393 cHosts = append(cHosts, u.Host)
394 cTLS = cTLS || u.Scheme == "https"
395 }
396 for _, u := range cfg.ec.AdvertiseClientUrls {
397 cHosts = append(cHosts, u.Host)
398 cTLS = cTLS || u.Scheme == "https"
399 }
400 listenerTLS := cfg.ec.ClientTLSInfo
401 if cfg.ec.ClientAutoTLS && cTLS {
402 listenerTLS, err = transport.SelfCert(cfg.ec.GetLogger(), filepath.Join(cfg.ec.Dir, "clientCerts"), cHosts, cfg.ec.SelfSignedCertValidity)
403 if err != nil {
404 lg.Fatal("failed to initialize self-signed client cert", zap.Error(err))
405 }
406 }
407
408
409 for _, u := range cfg.ec.ListenClientUrls {
410 l, err := transport.NewListener(u.Host, u.Scheme, &listenerTLS)
411 if err != nil {
412 return err
413 }
414
415 host := u.String()
416 go func() {
417 lg.Info("v2 proxy started listening on client requests", zap.String("host", host))
418 mux := http.NewServeMux()
419 etcdhttp.HandleMetrics(mux)
420 mux.Handle("/", ph)
421 lg.Fatal("done serving", zap.Error(http.Serve(l, mux)))
422 }()
423 }
424 return nil
425 }
426
427
428
429 func identifyDataDirOrDie(lg *zap.Logger, dir string) dirType {
430 names, err := fileutil.ReadDir(dir)
431 if err != nil {
432 if os.IsNotExist(err) {
433 return dirEmpty
434 }
435 lg.Fatal("failed to list data directory", zap.String("dir", dir), zap.Error(err))
436 }
437
438 var m, p bool
439 for _, name := range names {
440 switch dirType(name) {
441 case dirMember:
442 m = true
443 case dirProxy:
444 p = true
445 default:
446 lg.Warn(
447 "found invalid file under data directory",
448 zap.String("filename", name),
449 zap.String("data-dir", dir),
450 )
451 }
452 }
453
454 if m && p {
455 lg.Fatal("invalid datadir; both member and proxy directories exist")
456 }
457 if m {
458 return dirMember
459 }
460 if p {
461 return dirProxy
462 }
463 return dirEmpty
464 }
465
466 func checkSupportArch() {
467 lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
468 if err != nil {
469 panic(err)
470 }
471
472 if runtime.GOARCH == "amd64" ||
473 runtime.GOARCH == "arm64" ||
474 runtime.GOARCH == "ppc64le" ||
475 runtime.GOARCH == "s390x" {
476 return
477 }
478
479
480 defer os.Unsetenv("ETCD_UNSUPPORTED_ARCH")
481 if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH {
482 lg.Info("running etcd on unsupported architecture since ETCD_UNSUPPORTED_ARCH is set", zap.String("arch", env))
483 return
484 }
485
486 lg.Error("running etcd on unsupported architecture since ETCD_UNSUPPORTED_ARCH is set", zap.String("arch", runtime.GOARCH))
487 os.Exit(1)
488 }
489
View as plain text