...

Source file src/go.etcd.io/etcd/server/v3/embed/etcd.go

Documentation: go.etcd.io/etcd/server/v3/embed

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package embed
    16  
    17  import (
    18  	"context"
    19  	"crypto/tls"
    20  	"fmt"
    21  	"io/ioutil"
    22  	defaultLog "log"
    23  	"math"
    24  	"net"
    25  	"net/http"
    26  	"net/url"
    27  	"runtime"
    28  	"sort"
    29  	"strconv"
    30  	"sync"
    31  	"time"
    32  
    33  	"go.etcd.io/etcd/api/v3/version"
    34  	"go.etcd.io/etcd/client/pkg/v3/transport"
    35  	"go.etcd.io/etcd/client/pkg/v3/types"
    36  	"go.etcd.io/etcd/client/v3/credentials"
    37  	"go.etcd.io/etcd/pkg/v3/debugutil"
    38  	runtimeutil "go.etcd.io/etcd/pkg/v3/runtime"
    39  	"go.etcd.io/etcd/server/v3/config"
    40  	"go.etcd.io/etcd/server/v3/etcdserver"
    41  	"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
    42  	"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
    43  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2http"
    44  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2v3"
    45  	"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
    46  	"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
    47  	"go.etcd.io/etcd/server/v3/verify"
    48  
    49  	grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
    50  	"github.com/soheilhy/cmux"
    51  	"go.uber.org/zap"
    52  	"google.golang.org/grpc"
    53  	"google.golang.org/grpc/credentials/insecure"
    54  	"google.golang.org/grpc/keepalive"
    55  )
    56  
    57  const (
    58  	// internal fd usage includes disk usage and transport usage.
    59  	// To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
    60  	// at most 2 to read/lock/write WALs. One case that it needs to 2 is to
    61  	// read all logs after some snapshot index, which locates at the end of
    62  	// the second last and the head of the last. For purging, it needs to read
    63  	// directory, so it needs 1. For fd monitor, it needs 1.
    64  	// For transport, rafthttp builds two long-polling connections and at most
    65  	// four temporary connections with each member. There are at most 9 members
    66  	// in a cluster, so it should reserve 96.
    67  	// For the safety, we set the total reserved number to 150.
    68  	reservedInternalFDNum = 150
    69  )
    70  
    71  // Etcd contains a running etcd server and its listeners.
    72  type Etcd struct {
    73  	Peers   []*peerListener
    74  	Clients []net.Listener
    75  	// a map of contexts for the servers that serves client requests.
    76  	sctxs            map[string]*serveCtx
    77  	metricsListeners []net.Listener
    78  
    79  	tracingExporterShutdown func()
    80  
    81  	Server *etcdserver.EtcdServer
    82  
    83  	cfg   Config
    84  	stopc chan struct{}
    85  	errc  chan error
    86  
    87  	closeOnce sync.Once
    88  }
    89  
    90  type peerListener struct {
    91  	net.Listener
    92  	serve func() error
    93  	close func(context.Context) error
    94  }
    95  
    96  // StartEtcd launches the etcd server and HTTP handlers for client/server communication.
    97  // The returned Etcd.Server is not guaranteed to have joined the cluster. Wait
    98  // on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use.
    99  func StartEtcd(inCfg *Config) (e *Etcd, err error) {
   100  	if err = inCfg.Validate(); err != nil {
   101  		return nil, err
   102  	}
   103  	serving := false
   104  	e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
   105  	cfg := &e.cfg
   106  	defer func() {
   107  		if e == nil || err == nil {
   108  			return
   109  		}
   110  		if !serving {
   111  			// errored before starting gRPC server for serveCtx.serversC
   112  			for _, sctx := range e.sctxs {
   113  				close(sctx.serversC)
   114  			}
   115  		}
   116  		e.Close()
   117  		e = nil
   118  	}()
   119  
   120  	if !cfg.SocketOpts.Empty() {
   121  		cfg.logger.Info(
   122  			"configuring socket options",
   123  			zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress),
   124  			zap.Bool("reuse-port", cfg.SocketOpts.ReusePort),
   125  		)
   126  	}
   127  	e.cfg.logger.Info(
   128  		"configuring peer listeners",
   129  		zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()),
   130  	)
   131  	if e.Peers, err = configurePeerListeners(cfg); err != nil {
   132  		return e, err
   133  	}
   134  
   135  	e.cfg.logger.Info(
   136  		"configuring client listeners",
   137  		zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()),
   138  	)
   139  	if e.sctxs, err = configureClientListeners(cfg); err != nil {
   140  		return e, err
   141  	}
   142  
   143  	for _, sctx := range e.sctxs {
   144  		e.Clients = append(e.Clients, sctx.l)
   145  	}
   146  
   147  	var (
   148  		urlsmap types.URLsMap
   149  		token   string
   150  	)
   151  	memberInitialized := true
   152  	if !isMemberInitialized(cfg) {
   153  		memberInitialized = false
   154  		urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
   155  		if err != nil {
   156  			return e, fmt.Errorf("error setting up initial cluster: %v", err)
   157  		}
   158  	}
   159  
   160  	// AutoCompactionRetention defaults to "0" if not set.
   161  	if len(cfg.AutoCompactionRetention) == 0 {
   162  		cfg.AutoCompactionRetention = "0"
   163  	}
   164  	autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)
   165  	if err != nil {
   166  		return e, err
   167  	}
   168  
   169  	backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
   170  
   171  	srvcfg := config.ServerConfig{
   172  		Name:                                     cfg.Name,
   173  		ClientURLs:                               cfg.AdvertiseClientUrls,
   174  		PeerURLs:                                 cfg.AdvertisePeerUrls,
   175  		DataDir:                                  cfg.Dir,
   176  		DedicatedWALDir:                          cfg.WalDir,
   177  		SnapshotCount:                            cfg.SnapshotCount,
   178  		SnapshotCatchUpEntries:                   cfg.SnapshotCatchUpEntries,
   179  		MaxSnapFiles:                             cfg.MaxSnapFiles,
   180  		MaxWALFiles:                              cfg.MaxWalFiles,
   181  		InitialPeerURLsMap:                       urlsmap,
   182  		InitialClusterToken:                      token,
   183  		DiscoveryURL:                             cfg.Durl,
   184  		DiscoveryProxy:                           cfg.Dproxy,
   185  		NewCluster:                               cfg.IsNewCluster(),
   186  		PeerTLSInfo:                              cfg.PeerTLSInfo,
   187  		TickMs:                                   cfg.TickMs,
   188  		ElectionTicks:                            cfg.ElectionTicks(),
   189  		InitialElectionTickAdvance:               cfg.InitialElectionTickAdvance,
   190  		AutoCompactionRetention:                  autoCompactionRetention,
   191  		AutoCompactionMode:                       cfg.AutoCompactionMode,
   192  		QuotaBackendBytes:                        cfg.QuotaBackendBytes,
   193  		BackendBatchLimit:                        cfg.BackendBatchLimit,
   194  		BackendFreelistType:                      backendFreelistType,
   195  		BackendBatchInterval:                     cfg.BackendBatchInterval,
   196  		MaxTxnOps:                                cfg.MaxTxnOps,
   197  		MaxRequestBytes:                          cfg.MaxRequestBytes,
   198  		MaxConcurrentStreams:                     cfg.MaxConcurrentStreams,
   199  		SocketOpts:                               cfg.SocketOpts,
   200  		StrictReconfigCheck:                      cfg.StrictReconfigCheck,
   201  		ClientCertAuthEnabled:                    cfg.ClientTLSInfo.ClientCertAuth,
   202  		AuthToken:                                cfg.AuthToken,
   203  		BcryptCost:                               cfg.BcryptCost,
   204  		TokenTTL:                                 cfg.AuthTokenTTL,
   205  		CORS:                                     cfg.CORS,
   206  		HostWhitelist:                            cfg.HostWhitelist,
   207  		InitialCorruptCheck:                      cfg.ExperimentalInitialCorruptCheck,
   208  		CorruptCheckTime:                         cfg.ExperimentalCorruptCheckTime,
   209  		CompactHashCheckEnabled:                  cfg.ExperimentalCompactHashCheckEnabled,
   210  		CompactHashCheckTime:                     cfg.ExperimentalCompactHashCheckTime,
   211  		PreVote:                                  cfg.PreVote,
   212  		Logger:                                   cfg.logger,
   213  		ForceNewCluster:                          cfg.ForceNewCluster,
   214  		EnableGRPCGateway:                        cfg.EnableGRPCGateway,
   215  		ExperimentalEnableDistributedTracing:     cfg.ExperimentalEnableDistributedTracing,
   216  		UnsafeNoFsync:                            cfg.UnsafeNoFsync,
   217  		EnableLeaseCheckpoint:                    cfg.ExperimentalEnableLeaseCheckpoint,
   218  		LeaseCheckpointPersist:                   cfg.ExperimentalEnableLeaseCheckpointPersist,
   219  		CompactionBatchLimit:                     cfg.ExperimentalCompactionBatchLimit,
   220  		WatchProgressNotifyInterval:              cfg.ExperimentalWatchProgressNotifyInterval,
   221  		DowngradeCheckTime:                       cfg.ExperimentalDowngradeCheckTime,
   222  		WarningApplyDuration:                     cfg.ExperimentalWarningApplyDuration,
   223  		ExperimentalMemoryMlock:                  cfg.ExperimentalMemoryMlock,
   224  		ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
   225  		ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
   226  		V2Deprecation: cfg.V2DeprecationEffective(),
   227  	}
   228  
   229  	if srvcfg.ExperimentalEnableDistributedTracing {
   230  		tctx := context.Background()
   231  		tracingExporter, err := newTracingExporter(tctx, cfg)
   232  		if err != nil {
   233  			return e, err
   234  		}
   235  		e.tracingExporterShutdown = func() {
   236  			tracingExporter.Close(tctx)
   237  		}
   238  		srvcfg.ExperimentalTracerOptions = tracingExporter.opts
   239  
   240  		e.cfg.logger.Info("distributed tracing setup enabled")
   241  	}
   242  
   243  	print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
   244  
   245  	if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
   246  		return e, err
   247  	}
   248  
   249  	// buffer channel so goroutines on closed connections won't wait forever
   250  	e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
   251  
   252  	// newly started member ("memberInitialized==false")
   253  	// does not need corruption check
   254  	if memberInitialized && srvcfg.InitialCorruptCheck {
   255  		if err = e.Server.CorruptionChecker().InitialCheck(); err != nil {
   256  			// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
   257  			// (nothing to close since rafthttp transports have not been started)
   258  
   259  			e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err))
   260  			e.Server.Cleanup()
   261  			e.Server = nil
   262  			return e, err
   263  		}
   264  	}
   265  	e.Server.Start()
   266  
   267  	if err = e.servePeers(); err != nil {
   268  		return e, err
   269  	}
   270  	if err = e.serveClients(); err != nil {
   271  		return e, err
   272  	}
   273  	if err = e.serveMetrics(); err != nil {
   274  		return e, err
   275  	}
   276  
   277  	e.cfg.logger.Info(
   278  		"now serving peer/client/metrics",
   279  		zap.String("local-member-id", e.Server.ID().String()),
   280  		zap.Strings("initial-advertise-peer-urls", e.cfg.getAdvertisePeerUrls()),
   281  		zap.Strings("listen-peer-urls", e.cfg.getListenPeerUrls()),
   282  		zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()),
   283  		zap.Strings("listen-client-urls", e.cfg.getListenClientUrls()),
   284  		zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()),
   285  	)
   286  	serving = true
   287  	return e, nil
   288  }
   289  
   290  func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized bool) {
   291  	cors := make([]string, 0, len(ec.CORS))
   292  	for v := range ec.CORS {
   293  		cors = append(cors, v)
   294  	}
   295  	sort.Strings(cors)
   296  
   297  	hss := make([]string, 0, len(ec.HostWhitelist))
   298  	for v := range ec.HostWhitelist {
   299  		hss = append(hss, v)
   300  	}
   301  	sort.Strings(hss)
   302  
   303  	quota := ec.QuotaBackendBytes
   304  	if quota == 0 {
   305  		quota = etcdserver.DefaultQuotaBytes
   306  	}
   307  
   308  	lg.Info(
   309  		"starting an etcd server",
   310  		zap.String("etcd-version", version.Version),
   311  		zap.String("git-sha", version.GitSHA),
   312  		zap.String("go-version", runtime.Version()),
   313  		zap.String("go-os", runtime.GOOS),
   314  		zap.String("go-arch", runtime.GOARCH),
   315  		zap.Int("max-cpu-set", runtime.GOMAXPROCS(0)),
   316  		zap.Int("max-cpu-available", runtime.NumCPU()),
   317  		zap.Bool("member-initialized", memberInitialized),
   318  		zap.String("name", sc.Name),
   319  		zap.String("data-dir", sc.DataDir),
   320  		zap.String("wal-dir", ec.WalDir),
   321  		zap.String("wal-dir-dedicated", sc.DedicatedWALDir),
   322  		zap.String("member-dir", sc.MemberDir()),
   323  		zap.Bool("force-new-cluster", sc.ForceNewCluster),
   324  		zap.String("heartbeat-interval", fmt.Sprintf("%v", time.Duration(sc.TickMs)*time.Millisecond)),
   325  		zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(sc.ElectionTicks*int(sc.TickMs))*time.Millisecond)),
   326  		zap.Bool("initial-election-tick-advance", sc.InitialElectionTickAdvance),
   327  		zap.Uint64("snapshot-count", sc.SnapshotCount),
   328  		zap.Uint("max-wals", sc.MaxWALFiles),
   329  		zap.Uint("max-snapshots", sc.MaxSnapFiles),
   330  		zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries),
   331  		zap.Strings("initial-advertise-peer-urls", ec.getAdvertisePeerUrls()),
   332  		zap.Strings("listen-peer-urls", ec.getListenPeerUrls()),
   333  		zap.Strings("advertise-client-urls", ec.getAdvertiseClientUrls()),
   334  		zap.Strings("listen-client-urls", ec.getListenClientUrls()),
   335  		zap.Strings("listen-metrics-urls", ec.getMetricsURLs()),
   336  		zap.Strings("cors", cors),
   337  		zap.Strings("host-whitelist", hss),
   338  		zap.String("initial-cluster", sc.InitialPeerURLsMap.String()),
   339  		zap.String("initial-cluster-state", ec.ClusterState),
   340  		zap.String("initial-cluster-token", sc.InitialClusterToken),
   341  		zap.Int64("quota-backend-bytes", quota),
   342  		zap.Uint("max-request-bytes", sc.MaxRequestBytes),
   343  		zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams),
   344  
   345  		zap.Bool("pre-vote", sc.PreVote),
   346  		zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
   347  		zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),
   348  		zap.Bool("compact-check-time-enabled", sc.CompactHashCheckEnabled),
   349  		zap.Duration("compact-check-time-interval", sc.CompactHashCheckTime),
   350  		zap.String("auto-compaction-mode", sc.AutoCompactionMode),
   351  		zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention),
   352  		zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
   353  		zap.String("discovery-url", sc.DiscoveryURL),
   354  		zap.String("discovery-proxy", sc.DiscoveryProxy),
   355  		zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()),
   356  	)
   357  }
   358  
   359  // Config returns the current configuration.
   360  func (e *Etcd) Config() Config {
   361  	return e.cfg
   362  }
   363  
   364  // Close gracefully shuts down all servers/listeners.
   365  // Client requests will be terminated with request timeout.
   366  // After timeout, enforce remaning requests be closed immediately.
   367  func (e *Etcd) Close() {
   368  	fields := []zap.Field{
   369  		zap.String("name", e.cfg.Name),
   370  		zap.String("data-dir", e.cfg.Dir),
   371  		zap.Strings("advertise-peer-urls", e.cfg.getAdvertisePeerUrls()),
   372  		zap.Strings("advertise-client-urls", e.cfg.getAdvertiseClientUrls()),
   373  	}
   374  	lg := e.GetLogger()
   375  	lg.Info("closing etcd server", fields...)
   376  	defer func() {
   377  		lg.Info("closed etcd server", fields...)
   378  		verify.MustVerifyIfEnabled(verify.Config{
   379  			Logger:     lg,
   380  			DataDir:    e.cfg.Dir,
   381  			ExactIndex: false,
   382  		})
   383  		lg.Sync()
   384  	}()
   385  
   386  	e.closeOnce.Do(func() {
   387  		close(e.stopc)
   388  	})
   389  
   390  	// close client requests with request timeout
   391  	timeout := 2 * time.Second
   392  	if e.Server != nil {
   393  		timeout = e.Server.Cfg.ReqTimeout()
   394  	}
   395  	for _, sctx := range e.sctxs {
   396  		for ss := range sctx.serversC {
   397  			ctx, cancel := context.WithTimeout(context.Background(), timeout)
   398  			stopServers(ctx, ss)
   399  			cancel()
   400  		}
   401  	}
   402  
   403  	for _, sctx := range e.sctxs {
   404  		sctx.cancel()
   405  	}
   406  
   407  	for i := range e.Clients {
   408  		if e.Clients[i] != nil {
   409  			e.Clients[i].Close()
   410  		}
   411  	}
   412  
   413  	for i := range e.metricsListeners {
   414  		e.metricsListeners[i].Close()
   415  	}
   416  
   417  	// shutdown tracing exporter
   418  	if e.tracingExporterShutdown != nil {
   419  		e.tracingExporterShutdown()
   420  	}
   421  
   422  	// close rafthttp transports
   423  	if e.Server != nil {
   424  		e.Server.Stop()
   425  	}
   426  
   427  	// close all idle connections in peer handler (wait up to 1-second)
   428  	for i := range e.Peers {
   429  		if e.Peers[i] != nil && e.Peers[i].close != nil {
   430  			ctx, cancel := context.WithTimeout(context.Background(), time.Second)
   431  			e.Peers[i].close(ctx)
   432  			cancel()
   433  		}
   434  	}
   435  	if e.errc != nil {
   436  		close(e.errc)
   437  	}
   438  }
   439  
   440  func stopServers(ctx context.Context, ss *servers) {
   441  	// first, close the http.Server
   442  	if ss.http != nil {
   443  		ss.http.Shutdown(ctx)
   444  	}
   445  	if ss.grpc == nil {
   446  		return
   447  	}
   448  	// do not grpc.Server.GracefulStop when grpc runs under http server
   449  	// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
   450  	// and https://github.com/etcd-io/etcd/issues/8916
   451  	if ss.secure && ss.http != nil {
   452  		ss.grpc.Stop()
   453  		return
   454  	}
   455  
   456  	ch := make(chan struct{})
   457  	go func() {
   458  		defer close(ch)
   459  		// close listeners to stop accepting new connections,
   460  		// will block on any existing transports
   461  		ss.grpc.GracefulStop()
   462  	}()
   463  
   464  	// wait until all pending RPCs are finished
   465  	select {
   466  	case <-ch:
   467  	case <-ctx.Done():
   468  		// took too long, manually close open transports
   469  		// e.g. watch streams
   470  		ss.grpc.Stop()
   471  
   472  		// concurrent GracefulStop should be interrupted
   473  		<-ch
   474  	}
   475  }
   476  
   477  // Err - return channel used to report errors during etcd run/shutdown.
   478  // Since etcd 3.5 the channel is being closed when the etcd is over.
   479  func (e *Etcd) Err() <-chan error {
   480  	return e.errc
   481  }
   482  
   483  func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
   484  	if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
   485  		return nil, err
   486  	}
   487  	if err = cfg.PeerSelfCert(); err != nil {
   488  		cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err))
   489  	}
   490  
   491  	updateMinMaxVersions(&cfg.PeerTLSInfo, cfg.TlsMinVersion, cfg.TlsMaxVersion)
   492  
   493  	if !cfg.PeerTLSInfo.Empty() {
   494  		cfg.logger.Info(
   495  			"starting with peer TLS",
   496  			zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)),
   497  			zap.Strings("cipher-suites", cfg.CipherSuites),
   498  		)
   499  	}
   500  
   501  	peers = make([]*peerListener, len(cfg.ListenPeerUrls))
   502  	defer func() {
   503  		if err == nil {
   504  			return
   505  		}
   506  		for i := range peers {
   507  			if peers[i] != nil && peers[i].close != nil {
   508  				cfg.logger.Warn(
   509  					"closing peer listener",
   510  					zap.String("address", cfg.ListenPeerUrls[i].String()),
   511  					zap.Error(err),
   512  				)
   513  				ctx, cancel := context.WithTimeout(context.Background(), time.Second)
   514  				peers[i].close(ctx)
   515  				cancel()
   516  			}
   517  		}
   518  	}()
   519  
   520  	for i, u := range cfg.ListenPeerUrls {
   521  		if u.Scheme == "http" {
   522  			if !cfg.PeerTLSInfo.Empty() {
   523  				cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String()))
   524  			}
   525  			if cfg.PeerTLSInfo.ClientCertAuth {
   526  				cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String()))
   527  			}
   528  		}
   529  		peers[i] = &peerListener{close: func(context.Context) error { return nil }}
   530  		peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
   531  			transport.WithTLSInfo(&cfg.PeerTLSInfo),
   532  			transport.WithSocketOpts(&cfg.SocketOpts),
   533  			transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
   534  		)
   535  		if err != nil {
   536  			cfg.logger.Error("creating peer listener failed", zap.Error(err))
   537  			return nil, err
   538  		}
   539  		// once serve, overwrite with 'http.Server.Shutdown'
   540  		peers[i].close = func(context.Context) error {
   541  			return peers[i].Listener.Close()
   542  		}
   543  	}
   544  	return peers, nil
   545  }
   546  
   547  // configure peer handlers after rafthttp.Transport started
   548  func (e *Etcd) servePeers() (err error) {
   549  	ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
   550  	var peerTLScfg *tls.Config
   551  	if !e.cfg.PeerTLSInfo.Empty() {
   552  		if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
   553  			return err
   554  		}
   555  	}
   556  
   557  	for _, p := range e.Peers {
   558  		u := p.Listener.Addr().String()
   559  		gs := v3rpc.Server(e.Server, peerTLScfg, nil)
   560  		m := cmux.New(p.Listener)
   561  		go gs.Serve(m.Match(cmux.HTTP2()))
   562  		srv := &http.Server{
   563  			Handler:     grpcHandlerFunc(gs, ph),
   564  			ReadTimeout: 5 * time.Minute,
   565  			ErrorLog:    defaultLog.New(ioutil.Discard, "", 0), // do not log user error
   566  		}
   567  		go srv.Serve(m.Match(cmux.Any()))
   568  		p.serve = func() error {
   569  			e.cfg.logger.Info(
   570  				"cmux::serve",
   571  				zap.String("address", u),
   572  			)
   573  			return m.Serve()
   574  		}
   575  		p.close = func(ctx context.Context) error {
   576  			// gracefully shutdown http.Server
   577  			// close open listeners, idle connections
   578  			// until context cancel or time-out
   579  			e.cfg.logger.Info(
   580  				"stopping serving peer traffic",
   581  				zap.String("address", u),
   582  			)
   583  			stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv})
   584  			e.cfg.logger.Info(
   585  				"stopped serving peer traffic",
   586  				zap.String("address", u),
   587  			)
   588  			m.Close()
   589  			return nil
   590  		}
   591  	}
   592  
   593  	// start peer servers in a goroutine
   594  	for _, pl := range e.Peers {
   595  		go func(l *peerListener) {
   596  			u := l.Addr().String()
   597  			e.cfg.logger.Info(
   598  				"serving peer traffic",
   599  				zap.String("address", u),
   600  			)
   601  			e.errHandler(l.serve())
   602  		}(pl)
   603  	}
   604  	return nil
   605  }
   606  
   607  func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
   608  	if err = updateCipherSuites(&cfg.ClientTLSInfo, cfg.CipherSuites); err != nil {
   609  		return nil, err
   610  	}
   611  	if err = cfg.ClientSelfCert(); err != nil {
   612  		cfg.logger.Fatal("failed to get client self-signed certs", zap.Error(err))
   613  	}
   614  
   615  	updateMinMaxVersions(&cfg.ClientTLSInfo, cfg.TlsMinVersion, cfg.TlsMaxVersion)
   616  
   617  	if cfg.EnablePprof {
   618  		cfg.logger.Info("pprof is enabled", zap.String("path", debugutil.HTTPPrefixPProf))
   619  	}
   620  
   621  	sctxs = make(map[string]*serveCtx)
   622  	for _, u := range append(cfg.ListenClientUrls, cfg.ListenClientHttpUrls...) {
   623  		if u.Scheme == "http" || u.Scheme == "unix" {
   624  			if !cfg.ClientTLSInfo.Empty() {
   625  				cfg.logger.Warn("scheme is http or unix while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String()))
   626  			}
   627  			if cfg.ClientTLSInfo.ClientCertAuth {
   628  				cfg.logger.Warn("scheme is http or unix while --client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("client-url", u.String()))
   629  			}
   630  		}
   631  		if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() {
   632  			return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPS scheme", u.String())
   633  		}
   634  	}
   635  
   636  	for _, u := range cfg.ListenClientUrls {
   637  		addr, secure, network := resolveUrl(u)
   638  		sctx := sctxs[addr]
   639  		if sctx == nil {
   640  			sctx = newServeCtx(cfg.logger)
   641  			sctxs[addr] = sctx
   642  		}
   643  		sctx.secure = sctx.secure || secure
   644  		sctx.insecure = sctx.insecure || !secure
   645  		sctx.scheme = u.Scheme
   646  		sctx.addr = addr
   647  		sctx.network = network
   648  	}
   649  	for _, u := range cfg.ListenClientHttpUrls {
   650  		addr, secure, network := resolveUrl(u)
   651  
   652  		sctx := sctxs[addr]
   653  		if sctx == nil {
   654  			sctx = newServeCtx(cfg.logger)
   655  			sctxs[addr] = sctx
   656  		} else if !sctx.httpOnly {
   657  			return nil, fmt.Errorf("cannot bind both --client-listen-urls and --client-listen-http-urls on the same url %s", u.String())
   658  		}
   659  		sctx.secure = sctx.secure || secure
   660  		sctx.insecure = sctx.insecure || !secure
   661  		sctx.scheme = u.Scheme
   662  		sctx.addr = addr
   663  		sctx.network = network
   664  		sctx.httpOnly = true
   665  	}
   666  
   667  	for _, sctx := range sctxs {
   668  		if sctx.l, err = transport.NewListenerWithOpts(sctx.addr, sctx.scheme,
   669  			transport.WithSocketOpts(&cfg.SocketOpts),
   670  			transport.WithSkipTLSInfoCheck(true),
   671  		); err != nil {
   672  			return nil, err
   673  		}
   674  		// net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
   675  		// hosts that disable ipv6. So, use the address given by the user.
   676  
   677  		if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
   678  			if fdLimit <= reservedInternalFDNum {
   679  				cfg.logger.Fatal(
   680  					"file descriptor limit of etcd process is too low; please set higher",
   681  					zap.Uint64("limit", fdLimit),
   682  					zap.Int("recommended-limit", reservedInternalFDNum),
   683  				)
   684  			}
   685  			sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
   686  		}
   687  
   688  		defer func(sctx *serveCtx) {
   689  			if err == nil || sctx.l == nil {
   690  				return
   691  			}
   692  			sctx.l.Close()
   693  			cfg.logger.Warn(
   694  				"closing peer listener",
   695  				zap.String("address", sctx.addr),
   696  				zap.Error(err),
   697  			)
   698  		}(sctx)
   699  		for k := range cfg.UserHandlers {
   700  			sctx.userHandlers[k] = cfg.UserHandlers[k]
   701  		}
   702  		sctx.serviceRegister = cfg.ServiceRegister
   703  		if cfg.EnablePprof || cfg.LogLevel == "debug" {
   704  			sctx.registerPprof()
   705  		}
   706  		if cfg.LogLevel == "debug" {
   707  			sctx.registerTrace()
   708  		}
   709  	}
   710  	return sctxs, nil
   711  }
   712  
   713  func resolveUrl(u url.URL) (addr string, secure bool, network string) {
   714  	addr = u.Host
   715  	network = "tcp"
   716  	if u.Scheme == "unix" || u.Scheme == "unixs" {
   717  		addr = u.Host + u.Path
   718  		network = "unix"
   719  	}
   720  	secure = u.Scheme == "https" || u.Scheme == "unixs"
   721  	return addr, secure, network
   722  }
   723  
   724  func (e *Etcd) serveClients() (err error) {
   725  	if !e.cfg.ClientTLSInfo.Empty() {
   726  		e.cfg.logger.Info(
   727  			"starting with client TLS",
   728  			zap.String("tls-info", fmt.Sprintf("%+v", e.cfg.ClientTLSInfo)),
   729  			zap.Strings("cipher-suites", e.cfg.CipherSuites),
   730  		)
   731  	}
   732  
   733  	// Start a client server goroutine for each listen address
   734  	var h http.Handler
   735  	if e.Config().EnableV2 {
   736  		if e.Config().V2DeprecationEffective().IsAtLeast(config.V2_DEPR_1_WRITE_ONLY) {
   737  			return fmt.Errorf("--enable-v2 and --v2-deprecation=%s are mutually exclusive", e.Config().V2DeprecationEffective())
   738  		}
   739  		e.cfg.logger.Warn("Flag `enable-v2` is deprecated and will get removed in etcd 3.6.")
   740  		if len(e.Config().ExperimentalEnableV2V3) > 0 {
   741  			e.cfg.logger.Warn("Flag `experimental-enable-v2v3` is deprecated and will get removed in etcd 3.6.")
   742  			srv := v2v3.NewServer(e.cfg.logger, v3client.New(e.Server), e.cfg.ExperimentalEnableV2V3)
   743  			h = v2http.NewClientHandler(e.GetLogger(), srv, e.Server.Cfg.ReqTimeout())
   744  		} else {
   745  			h = v2http.NewClientHandler(e.GetLogger(), e.Server, e.Server.Cfg.ReqTimeout())
   746  		}
   747  	} else {
   748  		mux := http.NewServeMux()
   749  		etcdhttp.HandleBasic(e.cfg.logger, mux, e.Server)
   750  		etcdhttp.HandleMetrics(mux)
   751  		etcdhttp.HandleHealth(e.cfg.logger, mux, e.Server)
   752  		h = mux
   753  	}
   754  
   755  	gopts := []grpc.ServerOption{}
   756  	if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
   757  		gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
   758  			MinTime:             e.cfg.GRPCKeepAliveMinTime,
   759  			PermitWithoutStream: false,
   760  		}))
   761  	}
   762  	if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
   763  		e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
   764  		gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
   765  			Time:    e.cfg.GRPCKeepAliveInterval,
   766  			Timeout: e.cfg.GRPCKeepAliveTimeout,
   767  		}))
   768  	}
   769  
   770  	splitHttp := false
   771  	for _, sctx := range e.sctxs {
   772  		if sctx.httpOnly {
   773  			splitHttp = true
   774  		}
   775  	}
   776  
   777  	// start client servers in each goroutine
   778  	for _, sctx := range e.sctxs {
   779  		go func(s *serveCtx) {
   780  			e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, e.grpcGatewayDial(splitHttp), splitHttp, gopts...))
   781  		}(sctx)
   782  	}
   783  	return nil
   784  }
   785  
   786  func (e *Etcd) grpcGatewayDial(splitHttp bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) {
   787  	if !e.cfg.EnableGRPCGateway {
   788  		return nil
   789  	}
   790  	sctx := e.pickGrpcGatewayServeContext(splitHttp)
   791  	addr := sctx.addr
   792  	if network := sctx.network; network == "unix" {
   793  		// explicitly define unix network for gRPC socket support
   794  		addr = fmt.Sprintf("%s:%s", network, addr)
   795  	}
   796  
   797  	opts := []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32))}
   798  	if sctx.secure {
   799  		tlscfg, tlsErr := e.cfg.ClientTLSInfo.ServerConfig()
   800  		if tlsErr != nil {
   801  			return func(ctx context.Context) (*grpc.ClientConn, error) {
   802  				return nil, tlsErr
   803  			}
   804  		}
   805  		dtls := tlscfg.Clone()
   806  		// trust local server
   807  		dtls.InsecureSkipVerify = true
   808  		bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls})
   809  		opts = append(opts, grpc.WithTransportCredentials(bundle.TransportCredentials()))
   810  	} else {
   811  		opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
   812  	}
   813  
   814  	return func(ctx context.Context) (*grpc.ClientConn, error) {
   815  		conn, err := grpc.DialContext(ctx, addr, opts...)
   816  		if err != nil {
   817  			sctx.lg.Error("grpc gateway failed to dial", zap.String("addr", addr), zap.Error(err))
   818  			return nil, err
   819  		}
   820  		return conn, err
   821  	}
   822  }
   823  
   824  func (e *Etcd) pickGrpcGatewayServeContext(splitHttp bool) *serveCtx {
   825  	for _, sctx := range e.sctxs {
   826  		if !splitHttp || !sctx.httpOnly {
   827  			return sctx
   828  		}
   829  	}
   830  	panic("Expect at least one context able to serve grpc")
   831  }
   832  
   833  func (e *Etcd) serveMetrics() (err error) {
   834  	if e.cfg.Metrics == "extensive" {
   835  		grpc_prometheus.EnableHandlingTimeHistogram()
   836  	}
   837  
   838  	if len(e.cfg.ListenMetricsUrls) > 0 {
   839  		metricsMux := http.NewServeMux()
   840  		etcdhttp.HandleMetrics(metricsMux)
   841  		etcdhttp.HandleHealth(e.cfg.logger, metricsMux, e.Server)
   842  
   843  		for _, murl := range e.cfg.ListenMetricsUrls {
   844  			tlsInfo := &e.cfg.ClientTLSInfo
   845  			if murl.Scheme == "http" {
   846  				tlsInfo = nil
   847  			}
   848  			ml, err := transport.NewListenerWithOpts(murl.Host, murl.Scheme,
   849  				transport.WithTLSInfo(tlsInfo),
   850  				transport.WithSocketOpts(&e.cfg.SocketOpts),
   851  			)
   852  			if err != nil {
   853  				return err
   854  			}
   855  			e.metricsListeners = append(e.metricsListeners, ml)
   856  			go func(u url.URL, ln net.Listener) {
   857  				e.cfg.logger.Info(
   858  					"serving metrics",
   859  					zap.String("address", u.String()),
   860  				)
   861  				e.errHandler(http.Serve(ln, metricsMux))
   862  			}(murl, ml)
   863  		}
   864  	}
   865  	return nil
   866  }
   867  
   868  func (e *Etcd) errHandler(err error) {
   869  	select {
   870  	case <-e.stopc:
   871  		return
   872  	default:
   873  	}
   874  	select {
   875  	case <-e.stopc:
   876  	case e.errc <- err:
   877  	}
   878  }
   879  
   880  // GetLogger returns the logger.
   881  func (e *Etcd) GetLogger() *zap.Logger {
   882  	e.cfg.loggerMu.RLock()
   883  	l := e.cfg.logger
   884  	e.cfg.loggerMu.RUnlock()
   885  	return l
   886  }
   887  
   888  func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) {
   889  	h, err := strconv.Atoi(retention)
   890  	if err == nil && h >= 0 {
   891  		switch mode {
   892  		case CompactorModeRevision:
   893  			ret = time.Duration(int64(h))
   894  		case CompactorModePeriodic:
   895  			ret = time.Duration(int64(h)) * time.Hour
   896  		}
   897  	} else {
   898  		// periodic compaction
   899  		ret, err = time.ParseDuration(retention)
   900  		if err != nil {
   901  			return 0, fmt.Errorf("error parsing CompactionRetention: %v", err)
   902  		}
   903  	}
   904  	return ret, nil
   905  }
   906  

View as plain text