1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package embed
16
17 import (
18 "context"
19 "crypto/tls"
20 "fmt"
21 "io/ioutil"
22 defaultLog "log"
23 "math"
24 "net"
25 "net/http"
26 "net/url"
27 "runtime"
28 "sort"
29 "strconv"
30 "sync"
31 "time"
32
33 "go.etcd.io/etcd/api/v3/version"
34 "go.etcd.io/etcd/client/pkg/v3/transport"
35 "go.etcd.io/etcd/client/pkg/v3/types"
36 "go.etcd.io/etcd/client/v3/credentials"
37 "go.etcd.io/etcd/pkg/v3/debugutil"
38 runtimeutil "go.etcd.io/etcd/pkg/v3/runtime"
39 "go.etcd.io/etcd/server/v3/config"
40 "go.etcd.io/etcd/server/v3/etcdserver"
41 "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
42 "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
43 "go.etcd.io/etcd/server/v3/etcdserver/api/v2http"
44 "go.etcd.io/etcd/server/v3/etcdserver/api/v2v3"
45 "go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
46 "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
47 "go.etcd.io/etcd/server/v3/verify"
48
49 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
50 "github.com/soheilhy/cmux"
51 "go.uber.org/zap"
52 "google.golang.org/grpc"
53 "google.golang.org/grpc/credentials/insecure"
54 "google.golang.org/grpc/keepalive"
55 )
56
57 const (
58
59
60
61
62
63
64
65
66
67
68 reservedInternalFDNum = 150
69 )
70
71
72 type Etcd struct {
73 Peers []*peerListener
74 Clients []net.Listener
75
76 sctxs map[string]*serveCtx
77 metricsListeners []net.Listener
78
79 tracingExporterShutdown func()
80
81 Server *etcdserver.EtcdServer
82
83 cfg Config
84 stopc chan struct{}
85 errc chan error
86
87 closeOnce sync.Once
88 }
89
90 type peerListener struct {
91 net.Listener
92 serve func() error
93 close func(context.Context) error
94 }
95
96
97
98
99 func StartEtcd(inCfg *Config) (e *Etcd, err error) {
100 if err = inCfg.Validate(); err != nil {
101 return nil, err
102 }
103 serving := false
104 e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
105 cfg := &e.cfg
106 defer func() {
107 if e == nil || err == nil {
108 return
109 }
110 if !serving {
111
112 for _, sctx := range e.sctxs {
113 close(sctx.serversC)
114 }
115 }
116 e.Close()
117 e = nil
118 }()
119
120 if !cfg.SocketOpts.Empty() {
121 cfg.logger.Info(
122 "configuring socket options",
123 zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress),
124 zap.Bool("reuse-port", cfg.SocketOpts.ReusePort),
125 )
126 }
127 e.cfg.logger.Info(
128 "configuring peer listeners",
129 zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()),
130 )
131 if e.Peers, err = configurePeerListeners(cfg); err != nil {
132 return e, err
133 }
134
135 e.cfg.logger.Info(
136 "configuring client listeners",
137 zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()),
138 )
139 if e.sctxs, err = configureClientListeners(cfg); err != nil {
140 return e, err
141 }
142
143 for _, sctx := range e.sctxs {
144 e.Clients = append(e.Clients, sctx.l)
145 }
146
147 var (
148 urlsmap types.URLsMap
149 token string
150 )
151 memberInitialized := true
152 if !isMemberInitialized(cfg) {
153 memberInitialized = false
154 urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
155 if err != nil {
156 return e, fmt.Errorf("error setting up initial cluster: %v", err)
157 }
158 }
159
160
161 if len(cfg.AutoCompactionRetention) == 0 {
162 cfg.AutoCompactionRetention = "0"
163 }
164 autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)
165 if err != nil {
166 return e, err
167 }
168
169 backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
170
171 srvcfg := config.ServerConfig{
172 Name: cfg.Name,
173 ClientURLs: cfg.AdvertiseClientUrls,
174 PeerURLs: cfg.AdvertisePeerUrls,
175 DataDir: cfg.Dir,
176 DedicatedWALDir: cfg.WalDir,
177 SnapshotCount: cfg.SnapshotCount,
178 SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
179 MaxSnapFiles: cfg.MaxSnapFiles,
180 MaxWALFiles: cfg.MaxWalFiles,
181 InitialPeerURLsMap: urlsmap,
182 InitialClusterToken: token,
183 DiscoveryURL: cfg.Durl,
184 DiscoveryProxy: cfg.Dproxy,
185 NewCluster: cfg.IsNewCluster(),
186 PeerTLSInfo: cfg.PeerTLSInfo,
187 TickMs: cfg.TickMs,
188 ElectionTicks: cfg.ElectionTicks(),
189 InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
190 AutoCompactionRetention: autoCompactionRetention,
191 AutoCompactionMode: cfg.AutoCompactionMode,
192 QuotaBackendBytes: cfg.QuotaBackendBytes,
193 BackendBatchLimit: cfg.BackendBatchLimit,
194 BackendFreelistType: backendFreelistType,
195 BackendBatchInterval: cfg.BackendBatchInterval,
196 MaxTxnOps: cfg.MaxTxnOps,
197 MaxRequestBytes: cfg.MaxRequestBytes,
198 MaxConcurrentStreams: cfg.MaxConcurrentStreams,
199 SocketOpts: cfg.SocketOpts,
200 StrictReconfigCheck: cfg.StrictReconfigCheck,
201 ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
202 AuthToken: cfg.AuthToken,
203 BcryptCost: cfg.BcryptCost,
204 TokenTTL: cfg.AuthTokenTTL,
205 CORS: cfg.CORS,
206 HostWhitelist: cfg.HostWhitelist,
207 InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
208 CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
209 CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled,
210 CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime,
211 PreVote: cfg.PreVote,
212 Logger: cfg.logger,
213 ForceNewCluster: cfg.ForceNewCluster,
214 EnableGRPCGateway: cfg.EnableGRPCGateway,
215 ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
216 UnsafeNoFsync: cfg.UnsafeNoFsync,
217 EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
218 LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
219 CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
220 WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
221 DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
222 WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
223 ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
224 ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
225 ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
226 V2Deprecation: cfg.V2DeprecationEffective(),
227 }
228
229 if srvcfg.ExperimentalEnableDistributedTracing {
230 tctx := context.Background()
231 tracingExporter, err := newTracingExporter(tctx, cfg)
232 if err != nil {
233 return e, err
234 }
235 e.tracingExporterShutdown = func() {
236 tracingExporter.Close(tctx)
237 }
238 srvcfg.ExperimentalTracerOptions = tracingExporter.opts
239
240 e.cfg.logger.Info("distributed tracing setup enabled")
241 }
242
243 print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
244
245 if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
246 return e, err
247 }
248
249
250 e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
251
252
253
254 if memberInitialized && srvcfg.InitialCorruptCheck {
255 if err = e.Server.CorruptionChecker().InitialCheck(); err != nil {
256
257
258
259 e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err))
260 e.Server.Cleanup()
261 e.Server = nil
262 return e, err
263 }
264 }
265 e.Server.Start()
266
267 if err = e.servePeers(); err != nil {
268 return e, err
269 }
270 if err = e.serveClients(); err != nil {
271 return e, err
272 }
273 if err = e.serveMetrics(); err != nil {
274 return e, err
275 }
276
277 e.cfg.logger.Info(
278 "now serving peer/client/metrics",
279 zap.String("local-member-id", e.Server.ID().String()),
280 zap.Strings("initial-advertise-peer-urls", e.cfg.getAdvertisePeerUrls()),
281 zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()),
282 zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()),
283 zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()),
284 zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()),
285 )
286 serving = true
287 return e, nil
288 }
289
290 func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized bool) {
291 cors := make([]string, 0, len(ec.CORS))
292 for v := range ec.CORS {
293 cors = append(cors, v)
294 }
295 sort.Strings(cors)
296
297 hss := make([]string, 0, len(ec.HostWhitelist))
298 for v := range ec.HostWhitelist {
299 hss = append(hss, v)
300 }
301 sort.Strings(hss)
302
303 quota := ec.QuotaBackendBytes
304 if quota == 0 {
305 quota = etcdserver.DefaultQuotaBytes
306 }
307
308 lg.Info(
309 "starting an etcd server",
310 zap.String("etcd-version", version.Version),
311 zap.String("git-sha", version.GitSHA),
312 zap.String("go-version", runtime.Version()),
313 zap.String("go-os", runtime.GOOS),
314 zap.String("go-arch", runtime.GOARCH),
315 zap.Int("max-cpu-set", runtime.GOMAXPROCS(0)),
316 zap.Int("max-cpu-available", runtime.NumCPU()),
317 zap.Bool("member-initialized", memberInitialized),
318 zap.String("name", sc.Name),
319 zap.String("data-dir", sc.DataDir),
320 zap.String("wal-dir", ec.WalDir),
321 zap.String("wal-dir-dedicated", sc.DedicatedWALDir),
322 zap.String("member-dir", sc.MemberDir()),
323 zap.Bool("force-new-cluster", sc.ForceNewCluster),
324 zap.String("heartbeat-interval", fmt.Sprintf("%v", time.Duration(sc.TickMs)*time.Millisecond)),
325 zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(sc.ElectionTicks*int(sc.TickMs))*time.Millisecond)),
326 zap.Bool("initial-election-tick-advance", sc.InitialElectionTickAdvance),
327 zap.Uint64("snapshot-count", sc.SnapshotCount),
328 zap.Uint("max-wals", sc.MaxWALFiles),
329 zap.Uint("max-snapshots", sc.MaxSnapFiles),
330 zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries),
331 zap.Strings("initial-advertise-peer-urls", ec.getAdvertisePeerUrls()),
332 zap.Strings("listen-peer-urls", ec.getListenPeerUrls()),
333 zap.Strings("advertise-client-urls", ec.getAdvertiseClientUrls()),
334 zap.Strings("listen-client-urls", ec.getListenClientUrls()),
335 zap.Strings("listen-metrics-urls", ec.getMetricsURLs()),
336 zap.Strings("cors", cors),
337 zap.Strings("host-whitelist", hss),
338 zap.String("initial-cluster", sc.InitialPeerURLsMap.String()),
339 zap.String("initial-cluster-state", ec.ClusterState),
340 zap.String("initial-cluster-token", sc.InitialClusterToken),
341 zap.Int64("quota-backend-bytes", quota),
342 zap.Uint("max-request-bytes", sc.MaxRequestBytes),
343 zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams),
344
345 zap.Bool("pre-vote", sc.PreVote),
346 zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
347 zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),
348 zap.Bool("compact-check-time-enabled", sc.CompactHashCheckEnabled),
349 zap.Duration("compact-check-time-interval", sc.CompactHashCheckTime),
350 zap.String("auto-compaction-mode", sc.AutoCompactionMode),
351 zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention),
352 zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
353 zap.String("discovery-url", sc.DiscoveryURL),
354 zap.String("discovery-proxy", sc.DiscoveryProxy),
355 zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()),
356 )
357 }
358
359
360 func (e *Etcd) Config() Config {
361 return e.cfg
362 }
363
364
365
366
367 func (e *Etcd) Close() {
368 fields := []zap.Field{
369 zap.String("name", e.cfg.Name),
370 zap.String("data-dir", e.cfg.Dir),
371 zap.Strings("advertise-peer-urls", e.cfg.getAdvertisePeerUrls()),
372 zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()),
373 }
374 lg := e.GetLogger()
375 lg.Info("closing etcd server", fields...)
376 defer func() {
377 lg.Info("closed etcd server", fields...)
378 verify.MustVerifyIfEnabled(verify.Config{
379 Logger: lg,
380 DataDir: e.cfg.Dir,
381 ExactIndex: false,
382 })
383 lg.Sync()
384 }()
385
386 e.closeOnce.Do(func() {
387 close(e.stopc)
388 })
389
390
391 timeout := 2 * time.Second
392 if e.Server != nil {
393 timeout = e.Server.Cfg.ReqTimeout()
394 }
395 for _, sctx := range e.sctxs {
396 for ss := range sctx.serversC {
397 ctx, cancel := context.WithTimeout(context.Background(), timeout)
398 stopServers(ctx, ss)
399 cancel()
400 }
401 }
402
403 for _, sctx := range e.sctxs {
404 sctx.cancel()
405 }
406
407 for i := range e.Clients {
408 if e.Clients[i] != nil {
409 e.Clients[i].Close()
410 }
411 }
412
413 for i := range e.metricsListeners {
414 e.metricsListeners[i].Close()
415 }
416
417
418 if e.tracingExporterShutdown != nil {
419 e.tracingExporterShutdown()
420 }
421
422
423 if e.Server != nil {
424 e.Server.Stop()
425 }
426
427
428 for i := range e.Peers {
429 if e.Peers[i] != nil && e.Peers[i].close != nil {
430 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
431 e.Peers[i].close(ctx)
432 cancel()
433 }
434 }
435 if e.errc != nil {
436 close(e.errc)
437 }
438 }
439
440 func stopServers(ctx context.Context, ss *servers) {
441
442 if ss.http != nil {
443 ss.http.Shutdown(ctx)
444 }
445 if ss.grpc == nil {
446 return
447 }
448
449
450
451 if ss.secure && ss.http != nil {
452 ss.grpc.Stop()
453 return
454 }
455
456 ch := make(chan struct{})
457 go func() {
458 defer close(ch)
459
460
461 ss.grpc.GracefulStop()
462 }()
463
464
465 select {
466 case <-ch:
467 case <-ctx.Done():
468
469
470 ss.grpc.Stop()
471
472
473 <-ch
474 }
475 }
476
477
478
479 func (e *Etcd) Err() <-chan error {
480 return e.errc
481 }
482
483 func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
484 if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
485 return nil, err
486 }
487 if err = cfg.PeerSelfCert(); err != nil {
488 cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err))
489 }
490
491 updateMinMaxVersions(&cfg.PeerTLSInfo, cfg.TlsMinVersion, cfg.TlsMaxVersion)
492
493 if !cfg.PeerTLSInfo.Empty() {
494 cfg.logger.Info(
495 "starting with peer TLS",
496 zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)),
497 zap.Strings("cipher-suites", cfg.CipherSuites),
498 )
499 }
500
501 peers = make([]*peerListener, len(cfg.ListenPeerUrls))
502 defer func() {
503 if err == nil {
504 return
505 }
506 for i := range peers {
507 if peers[i] != nil && peers[i].close != nil {
508 cfg.logger.Warn(
509 "closing peer listener",
510 zap.String("address", cfg.ListenPeerUrls[i].String()),
511 zap.Error(err),
512 )
513 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
514 peers[i].close(ctx)
515 cancel()
516 }
517 }
518 }()
519
520 for i, u := range cfg.ListenPeerUrls {
521 if u.Scheme == "http" {
522 if !cfg.PeerTLSInfo.Empty() {
523 cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String()))
524 }
525 if cfg.PeerTLSInfo.ClientCertAuth {
526 cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String()))
527 }
528 }
529 peers[i] = &peerListener{close: func(context.Context) error { return nil }}
530 peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
531 transport.WithTLSInfo(&cfg.PeerTLSInfo),
532 transport.WithSocketOpts(&cfg.SocketOpts),
533 transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
534 )
535 if err != nil {
536 cfg.logger.Error("creating peer listener failed", zap.Error(err))
537 return nil, err
538 }
539
540 peers[i].close = func(context.Context) error {
541 return peers[i].Listener.Close()
542 }
543 }
544 return peers, nil
545 }
546
547
548 func (e *Etcd) servePeers() (err error) {
549 ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
550 var peerTLScfg *tls.Config
551 if !e.cfg.PeerTLSInfo.Empty() {
552 if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
553 return err
554 }
555 }
556
557 for _, p := range e.Peers {
558 u := p.Listener.Addr().String()
559 gs := v3rpc.Server(e.Server, peerTLScfg, nil)
560 m := cmux.New(p.Listener)
561 go gs.Serve(m.Match(cmux.HTTP2()))
562 srv := &http.Server{
563 Handler: grpcHandlerFunc(gs, ph),
564 ReadTimeout: 5 * time.Minute,
565 ErrorLog: defaultLog.New(ioutil.Discard, "", 0),
566 }
567 go srv.Serve(m.Match(cmux.Any()))
568 p.serve = func() error {
569 e.cfg.logger.Info(
570 "cmux::serve",
571 zap.String("address", u),
572 )
573 return m.Serve()
574 }
575 p.close = func(ctx context.Context) error {
576
577
578
579 e.cfg.logger.Info(
580 "stopping serving peer traffic",
581 zap.String("address", u),
582 )
583 stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv})
584 e.cfg.logger.Info(
585 "stopped serving peer traffic",
586 zap.String("address", u),
587 )
588 m.Close()
589 return nil
590 }
591 }
592
593
594 for _, pl := range e.Peers {
595 go func(l *peerListener) {
596 u := l.Addr().String()
597 e.cfg.logger.Info(
598 "serving peer traffic",
599 zap.String("address", u),
600 )
601 e.errHandler(l.serve())
602 }(pl)
603 }
604 return nil
605 }
606
607 func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
608 if err = updateCipherSuites(&cfg.ClientTLSInfo, cfg.CipherSuites); err != nil {
609 return nil, err
610 }
611 if err = cfg.ClientSelfCert(); err != nil {
612 cfg.logger.Fatal("failed to get client self-signed certs", zap.Error(err))
613 }
614
615 updateMinMaxVersions(&cfg.ClientTLSInfo, cfg.TlsMinVersion, cfg.TlsMaxVersion)
616
617 if cfg.EnablePprof {
618 cfg.logger.Info("pprof is enabled", zap.String("path", debugutil.HTTPPrefixPProf))
619 }
620
621 sctxs = make(map[string]*serveCtx)
622 for _, u := range append(cfg.ListenClientUrls, cfg.ListenClientHttpUrls...) {
623 if u.Scheme == "http" || u.Scheme == "unix" {
624 if !cfg.ClientTLSInfo.Empty() {
625 cfg.logger.Warn("scheme is http or unix while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String()))
626 }
627 if cfg.ClientTLSInfo.ClientCertAuth {
628 cfg.logger.Warn("scheme is http or unix while --client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("client-url", u.String()))
629 }
630 }
631 if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() {
632 return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPS scheme", u.String())
633 }
634 }
635
636 for _, u := range cfg.ListenClientUrls {
637 addr, secure, network := resolveUrl(u)
638 sctx := sctxs[addr]
639 if sctx == nil {
640 sctx = newServeCtx(cfg.logger)
641 sctxs[addr] = sctx
642 }
643 sctx.secure = sctx.secure || secure
644 sctx.insecure = sctx.insecure || !secure
645 sctx.scheme = u.Scheme
646 sctx.addr = addr
647 sctx.network = network
648 }
649 for _, u := range cfg.ListenClientHttpUrls {
650 addr, secure, network := resolveUrl(u)
651
652 sctx := sctxs[addr]
653 if sctx == nil {
654 sctx = newServeCtx(cfg.logger)
655 sctxs[addr] = sctx
656 } else if !sctx.httpOnly {
657 return nil, fmt.Errorf("cannot bind both --client-listen-urls and --client-listen-http-urls on the same url %s", u.String())
658 }
659 sctx.secure = sctx.secure || secure
660 sctx.insecure = sctx.insecure || !secure
661 sctx.scheme = u.Scheme
662 sctx.addr = addr
663 sctx.network = network
664 sctx.httpOnly = true
665 }
666
667 for _, sctx := range sctxs {
668 if sctx.l, err = transport.NewListenerWithOpts(sctx.addr, sctx.scheme,
669 transport.WithSocketOpts(&cfg.SocketOpts),
670 transport.WithSkipTLSInfoCheck(true),
671 ); err != nil {
672 return nil, err
673 }
674
675
676
677 if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
678 if fdLimit <= reservedInternalFDNum {
679 cfg.logger.Fatal(
680 "file descriptor limit of etcd process is too low; please set higher",
681 zap.Uint64("limit", fdLimit),
682 zap.Int("recommended-limit", reservedInternalFDNum),
683 )
684 }
685 sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
686 }
687
688 defer func(sctx *serveCtx) {
689 if err == nil || sctx.l == nil {
690 return
691 }
692 sctx.l.Close()
693 cfg.logger.Warn(
694 "closing peer listener",
695 zap.String("address", sctx.addr),
696 zap.Error(err),
697 )
698 }(sctx)
699 for k := range cfg.UserHandlers {
700 sctx.userHandlers[k] = cfg.UserHandlers[k]
701 }
702 sctx.serviceRegister = cfg.ServiceRegister
703 if cfg.EnablePprof || cfg.LogLevel == "debug" {
704 sctx.registerPprof()
705 }
706 if cfg.LogLevel == "debug" {
707 sctx.registerTrace()
708 }
709 }
710 return sctxs, nil
711 }
712
713 func resolveUrl(u url.URL) (addr string, secure bool, network string) {
714 addr = u.Host
715 network = "tcp"
716 if u.Scheme == "unix" || u.Scheme == "unixs" {
717 addr = u.Host + u.Path
718 network = "unix"
719 }
720 secure = u.Scheme == "https" || u.Scheme == "unixs"
721 return addr, secure, network
722 }
723
724 func (e *Etcd) serveClients() (err error) {
725 if !e.cfg.ClientTLSInfo.Empty() {
726 e.cfg.logger.Info(
727 "starting with client TLS",
728 zap.String("tls-info", fmt.Sprintf("%+v", e.cfg.ClientTLSInfo)),
729 zap.Strings("cipher-suites", e.cfg.CipherSuites),
730 )
731 }
732
733
734 var h http.Handler
735 if e.Config().EnableV2 {
736 if e.Config().V2DeprecationEffective().IsAtLeast(config.V2_DEPR_1_WRITE_ONLY) {
737 return fmt.Errorf("--enable-v2 and --v2-deprecation=%s are mutually exclusive", e.Config().V2DeprecationEffective())
738 }
739 e.cfg.logger.Warn("Flag `enable-v2` is deprecated and will get removed in etcd 3.6.")
740 if len(e.Config().ExperimentalEnableV2V3) > 0 {
741 e.cfg.logger.Warn("Flag `experimental-enable-v2v3` is deprecated and will get removed in etcd 3.6.")
742 srv := v2v3.NewServer(e.cfg.logger, v3client.New(e.Server), e.cfg.ExperimentalEnableV2V3)
743 h = v2http.NewClientHandler(e.GetLogger(), srv, e.Server.Cfg.ReqTimeout())
744 } else {
745 h = v2http.NewClientHandler(e.GetLogger(), e.Server, e.Server.Cfg.ReqTimeout())
746 }
747 } else {
748 mux := http.NewServeMux()
749 etcdhttp.HandleBasic(e.cfg.logger, mux, e.Server)
750 etcdhttp.HandleMetrics(mux)
751 etcdhttp.HandleHealth(e.cfg.logger, mux, e.Server)
752 h = mux
753 }
754
755 gopts := []grpc.ServerOption{}
756 if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
757 gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
758 MinTime: e.cfg.GRPCKeepAliveMinTime,
759 PermitWithoutStream: false,
760 }))
761 }
762 if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
763 e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
764 gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
765 Time: e.cfg.GRPCKeepAliveInterval,
766 Timeout: e.cfg.GRPCKeepAliveTimeout,
767 }))
768 }
769
770 splitHttp := false
771 for _, sctx := range e.sctxs {
772 if sctx.httpOnly {
773 splitHttp = true
774 }
775 }
776
777
778 for _, sctx := range e.sctxs {
779 go func(s *serveCtx) {
780 e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(splitHttp), splitHttp, gopts...))
781 }(sctx)
782 }
783 return nil
784 }
785
786 func (e *Etcd) grpcGatewayDial(splitHttp bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) {
787 if !e.cfg.EnableGRPCGateway {
788 return nil
789 }
790 sctx := e.pickGrpcGatewayServeContext(splitHttp)
791 addr := sctx.addr
792 if network := sctx.network; network == "unix" {
793
794 addr = fmt.Sprintf("%s:%s", network, addr)
795 }
796
797 opts := []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32))}
798 if sctx.secure {
799 tlscfg, tlsErr := e.cfg.ClientTLSInfo.ServerConfig()
800 if tlsErr != nil {
801 return func(ctx context.Context) (*grpc.ClientConn, error) {
802 return nil, tlsErr
803 }
804 }
805 dtls := tlscfg.Clone()
806
807 dtls.InsecureSkipVerify = true
808 bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls})
809 opts = append(opts, grpc.WithTransportCredentials(bundle.TransportCredentials()))
810 } else {
811 opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
812 }
813
814 return func(ctx context.Context) (*grpc.ClientConn, error) {
815 conn, err := grpc.DialContext(ctx, addr, opts...)
816 if err != nil {
817 sctx.lg.Error("grpc gateway failed to dial", zap.String("addr", addr), zap.Error(err))
818 return nil, err
819 }
820 return conn, err
821 }
822 }
823
824 func (e *Etcd) pickGrpcGatewayServeContext(splitHttp bool) *serveCtx {
825 for _, sctx := range e.sctxs {
826 if !splitHttp || !sctx.httpOnly {
827 return sctx
828 }
829 }
830 panic("Expect at least one context able to serve grpc")
831 }
832
833 func (e *Etcd) serveMetrics() (err error) {
834 if e.cfg.Metrics == "extensive" {
835 grpc_prometheus.EnableHandlingTimeHistogram()
836 }
837
838 if len(e.cfg.ListenMetricsUrls) > 0 {
839 metricsMux := http.NewServeMux()
840 etcdhttp.HandleMetrics(metricsMux)
841 etcdhttp.HandleHealth(e.cfg.logger, metricsMux, e.Server)
842
843 for _, murl := range e.cfg.ListenMetricsUrls {
844 tlsInfo := &e.cfg.ClientTLSInfo
845 if murl.Scheme == "http" {
846 tlsInfo = nil
847 }
848 ml, err := transport.NewListenerWithOpts(murl.Host, murl.Scheme,
849 transport.WithTLSInfo(tlsInfo),
850 transport.WithSocketOpts(&e.cfg.SocketOpts),
851 )
852 if err != nil {
853 return err
854 }
855 e.metricsListeners = append(e.metricsListeners, ml)
856 go func(u url.URL, ln net.Listener) {
857 e.cfg.logger.Info(
858 "serving metrics",
859 zap.String("address", u.String()),
860 )
861 e.errHandler(http.Serve(ln, metricsMux))
862 }(murl, ml)
863 }
864 }
865 return nil
866 }
867
868 func (e *Etcd) errHandler(err error) {
869 select {
870 case <-e.stopc:
871 return
872 default:
873 }
874 select {
875 case <-e.stopc:
876 case e.errc <- err:
877 }
878 }
879
880
881 func (e *Etcd) GetLogger() *zap.Logger {
882 e.cfg.loggerMu.RLock()
883 l := e.cfg.logger
884 e.cfg.loggerMu.RUnlock()
885 return l
886 }
887
888 func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) {
889 h, err := strconv.Atoi(retention)
890 if err == nil && h >= 0 {
891 switch mode {
892 case CompactorModeRevision:
893 ret = time.Duration(int64(h))
894 case CompactorModePeriodic:
895 ret = time.Duration(int64(h)) * time.Hour
896 }
897 } else {
898
899 ret, err = time.ParseDuration(retention)
900 if err != nil {
901 return 0, fmt.Errorf("error parsing CompactionRetention: %v", err)
902 }
903 }
904 return ret, nil
905 }
906
View as plain text