...

Source file src/go.etcd.io/etcd/server/v3/etcdmain/etcd.go

Documentation: go.etcd.io/etcd/server/v3/etcdmain

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package etcdmain
    16  
    17  import (
    18  	"encoding/json"
    19  	"fmt"
    20  	"io/ioutil"
    21  	"net/http"
    22  	"os"
    23  	"path/filepath"
    24  	"reflect"
    25  	"runtime"
    26  	"strings"
    27  	"time"
    28  
    29  	"go.etcd.io/etcd/client/pkg/v3/fileutil"
    30  	"go.etcd.io/etcd/client/pkg/v3/logutil"
    31  	"go.etcd.io/etcd/client/pkg/v3/transport"
    32  	"go.etcd.io/etcd/client/pkg/v3/types"
    33  	pkgioutil "go.etcd.io/etcd/pkg/v3/ioutil"
    34  	"go.etcd.io/etcd/pkg/v3/osutil"
    35  	"go.etcd.io/etcd/server/v3/embed"
    36  	"go.etcd.io/etcd/server/v3/etcdserver"
    37  	"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
    38  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery"
    39  	"go.etcd.io/etcd/server/v3/proxy/httpproxy"
    40  
    41  	"go.uber.org/zap"
    42  	"google.golang.org/grpc"
    43  )
    44  
    45  type dirType string
    46  
    47  var (
    48  	dirMember = dirType("member")
    49  	dirProxy  = dirType("proxy")
    50  	dirEmpty  = dirType("empty")
    51  )
    52  
    53  func startEtcdOrProxyV2(args []string) {
    54  	grpc.EnableTracing = false
    55  
    56  	cfg := newConfig()
    57  	defaultInitialCluster := cfg.ec.InitialCluster
    58  
    59  	err := cfg.parse(args[1:])
    60  	lg := cfg.ec.GetLogger()
    61  	// If we failed to parse the whole configuration, print the error using
    62  	// preferably the resolved logger from the config,
    63  	// but if does not exists, create a new temporary logger.
    64  	if lg == nil {
    65  		var zapError error
    66  		// use this logger
    67  		lg, zapError = logutil.CreateDefaultZapLogger(zap.InfoLevel)
    68  		if zapError != nil {
    69  			fmt.Printf("error creating zap logger %v", zapError)
    70  			os.Exit(1)
    71  		}
    72  	}
    73  	lg.Info("Running: ", zap.Strings("args", args))
    74  	if err != nil {
    75  		lg.Warn("failed to verify flags", zap.Error(err))
    76  		switch err {
    77  		case embed.ErrUnsetAdvertiseClientURLsFlag:
    78  			lg.Warn("advertise client URLs are not set", zap.Error(err))
    79  		}
    80  		os.Exit(1)
    81  	}
    82  
    83  	cfg.ec.SetupGlobalLoggers()
    84  
    85  	defer func() {
    86  		logger := cfg.ec.GetLogger()
    87  		if logger != nil {
    88  			logger.Sync()
    89  		}
    90  	}()
    91  
    92  	defaultHost, dhErr := (&cfg.ec).UpdateDefaultClusterFromName(defaultInitialCluster)
    93  	if defaultHost != "" {
    94  		lg.Info(
    95  			"detected default host for advertise",
    96  			zap.String("host", defaultHost),
    97  		)
    98  	}
    99  	if dhErr != nil {
   100  		lg.Info("failed to detect default host", zap.Error(dhErr))
   101  	}
   102  
   103  	if cfg.ec.Dir == "" {
   104  		cfg.ec.Dir = fmt.Sprintf("%v.etcd", cfg.ec.Name)
   105  		lg.Warn(
   106  			"'data-dir' was empty; using default",
   107  			zap.String("data-dir", cfg.ec.Dir),
   108  		)
   109  	}
   110  
   111  	var stopped <-chan struct{}
   112  	var errc <-chan error
   113  
   114  	which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)
   115  	if which != dirEmpty {
   116  		lg.Info(
   117  			"server has been already initialized",
   118  			zap.String("data-dir", cfg.ec.Dir),
   119  			zap.String("dir-type", string(which)),
   120  		)
   121  		switch which {
   122  		case dirMember:
   123  			stopped, errc, err = startEtcd(&cfg.ec)
   124  		case dirProxy:
   125  			err = startProxy(cfg)
   126  		default:
   127  			lg.Panic(
   128  				"unknown directory type",
   129  				zap.String("dir-type", string(which)),
   130  			)
   131  		}
   132  	} else {
   133  		shouldProxy := cfg.isProxy()
   134  		if !shouldProxy {
   135  			stopped, errc, err = startEtcd(&cfg.ec)
   136  			if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == v2discovery.ErrFullCluster {
   137  				if cfg.shouldFallbackToProxy() {
   138  					lg.Warn(
   139  						"discovery cluster is full, falling back to proxy",
   140  						zap.String("fallback-proxy", fallbackFlagProxy),
   141  						zap.Error(err),
   142  					)
   143  					shouldProxy = true
   144  				}
   145  			} else if err != nil {
   146  				lg.Warn("failed to start etcd", zap.Error(err))
   147  			}
   148  		}
   149  		if shouldProxy {
   150  			err = startProxy(cfg)
   151  		}
   152  	}
   153  
   154  	if err != nil {
   155  		if derr, ok := err.(*etcdserver.DiscoveryError); ok {
   156  			switch derr.Err {
   157  			case v2discovery.ErrDuplicateID:
   158  				lg.Warn(
   159  					"member has been registered with discovery service",
   160  					zap.String("name", cfg.ec.Name),
   161  					zap.String("discovery-token", cfg.ec.Durl),
   162  					zap.Error(derr.Err),
   163  				)
   164  				lg.Warn(
   165  					"but could not find valid cluster configuration",
   166  					zap.String("data-dir", cfg.ec.Dir),
   167  				)
   168  				lg.Warn("check data dir if previous bootstrap succeeded")
   169  				lg.Warn("or use a new discovery token if previous bootstrap failed")
   170  
   171  			case v2discovery.ErrDuplicateName:
   172  				lg.Warn(
   173  					"member with duplicated name has already been registered",
   174  					zap.String("discovery-token", cfg.ec.Durl),
   175  					zap.Error(derr.Err),
   176  				)
   177  				lg.Warn("cURL the discovery token URL for details")
   178  				lg.Warn("do not reuse discovery token; generate a new one to bootstrap a cluster")
   179  
   180  			default:
   181  				lg.Warn(
   182  					"failed to bootstrap; discovery token was already used",
   183  					zap.String("discovery-token", cfg.ec.Durl),
   184  					zap.Error(err),
   185  				)
   186  				lg.Warn("do not reuse discovery token; generate a new one to bootstrap a cluster")
   187  			}
   188  			os.Exit(1)
   189  		}
   190  
   191  		if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") {
   192  			lg.Warn("failed to start", zap.Error(err))
   193  			if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) {
   194  				lg.Warn("forgot to set --initial-cluster?")
   195  			}
   196  			if types.URLs(cfg.ec.AdvertisePeerUrls).String() == embed.DefaultInitialAdvertisePeerURLs {
   197  				lg.Warn("forgot to set --initial-advertise-peer-urls?")
   198  			}
   199  			if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) && len(cfg.ec.Durl) == 0 {
   200  				lg.Warn("--discovery flag is not set")
   201  			}
   202  			os.Exit(1)
   203  		}
   204  		lg.Fatal("discovery failed", zap.Error(err))
   205  	}
   206  
   207  	osutil.HandleInterrupts(lg)
   208  
   209  	// At this point, the initialization of etcd is done.
   210  	// The listeners are listening on the TCP ports and ready
   211  	// for accepting connections. The etcd instance should be
   212  	// joined with the cluster and ready to serve incoming
   213  	// connections.
   214  	notifySystemd(lg)
   215  
   216  	select {
   217  	case lerr := <-errc:
   218  		// fatal out on listener errors
   219  		lg.Fatal("listener failed", zap.Error(lerr))
   220  	case <-stopped:
   221  	}
   222  
   223  	osutil.Exit(0)
   224  }
   225  
   226  // startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
   227  func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
   228  	e, err := embed.StartEtcd(cfg)
   229  	if err != nil {
   230  		return nil, nil, err
   231  	}
   232  	osutil.RegisterInterruptHandler(e.Close)
   233  	select {
   234  	case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
   235  	case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
   236  	}
   237  	return e.Server.StopNotify(), e.Err(), nil
   238  }
   239  
   240  // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
   241  func startProxy(cfg *config) error {
   242  	lg := cfg.ec.GetLogger()
   243  	lg.Info("v2 API proxy starting")
   244  
   245  	clientTLSInfo := cfg.ec.ClientTLSInfo
   246  	if clientTLSInfo.Empty() {
   247  		// Support old proxy behavior of defaulting to PeerTLSInfo
   248  		// for both client and peer connections.
   249  		clientTLSInfo = cfg.ec.PeerTLSInfo
   250  	}
   251  	clientTLSInfo.InsecureSkipVerify = cfg.ec.ClientAutoTLS
   252  	cfg.ec.PeerTLSInfo.InsecureSkipVerify = cfg.ec.PeerAutoTLS
   253  
   254  	pt, err := transport.NewTimeoutTransport(
   255  		clientTLSInfo,
   256  		time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond,
   257  		time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond,
   258  		time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond,
   259  	)
   260  	if err != nil {
   261  		return err
   262  	}
   263  	pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost
   264  
   265  	if err = cfg.ec.PeerSelfCert(); err != nil {
   266  		lg.Fatal("failed to get self-signed certs for peer", zap.Error(err))
   267  	}
   268  	tr, err := transport.NewTimeoutTransport(
   269  		cfg.ec.PeerTLSInfo,
   270  		time.Duration(cfg.cp.ProxyDialTimeoutMs)*time.Millisecond,
   271  		time.Duration(cfg.cp.ProxyReadTimeoutMs)*time.Millisecond,
   272  		time.Duration(cfg.cp.ProxyWriteTimeoutMs)*time.Millisecond,
   273  	)
   274  	if err != nil {
   275  		return err
   276  	}
   277  
   278  	cfg.ec.Dir = filepath.Join(cfg.ec.Dir, "proxy")
   279  	err = fileutil.TouchDirAll(lg, cfg.ec.Dir)
   280  	if err != nil {
   281  		return err
   282  	}
   283  
   284  	var peerURLs []string
   285  	clusterfile := filepath.Join(cfg.ec.Dir, "cluster")
   286  
   287  	b, err := ioutil.ReadFile(clusterfile)
   288  	switch {
   289  	case err == nil:
   290  		if cfg.ec.Durl != "" {
   291  			lg.Warn(
   292  				"discovery token ignored since the proxy has already been initialized; valid cluster file found",
   293  				zap.String("cluster-file", clusterfile),
   294  			)
   295  		}
   296  		if cfg.ec.DNSCluster != "" {
   297  			lg.Warn(
   298  				"DNS SRV discovery ignored since the proxy has already been initialized; valid cluster file found",
   299  				zap.String("cluster-file", clusterfile),
   300  			)
   301  		}
   302  		urls := struct{ PeerURLs []string }{}
   303  		err = json.Unmarshal(b, &urls)
   304  		if err != nil {
   305  			return err
   306  		}
   307  		peerURLs = urls.PeerURLs
   308  		lg.Info(
   309  			"proxy using peer URLS from cluster file",
   310  			zap.Strings("peer-urls", peerURLs),
   311  			zap.String("cluster-file", clusterfile),
   312  		)
   313  
   314  	case os.IsNotExist(err):
   315  		var urlsmap types.URLsMap
   316  		urlsmap, _, err = cfg.ec.PeerURLsMapAndToken("proxy")
   317  		if err != nil {
   318  			return fmt.Errorf("error setting up initial cluster: %v", err)
   319  		}
   320  
   321  		if cfg.ec.Durl != "" {
   322  			var s string
   323  			s, err = v2discovery.GetCluster(lg, cfg.ec.Durl, cfg.ec.Dproxy)
   324  			if err != nil {
   325  				return err
   326  			}
   327  			if urlsmap, err = types.NewURLsMap(s); err != nil {
   328  				return err
   329  			}
   330  		}
   331  		peerURLs = urlsmap.URLs()
   332  		lg.Info("proxy using peer URLS", zap.Strings("peer-urls", peerURLs))
   333  
   334  	default:
   335  		return err
   336  	}
   337  
   338  	clientURLs := []string{}
   339  	uf := func() []string {
   340  		gcls, gerr := etcdserver.GetClusterFromRemotePeers(lg, peerURLs, tr)
   341  		if gerr != nil {
   342  			lg.Warn(
   343  				"failed to get cluster from remote peers",
   344  				zap.Strings("peer-urls", peerURLs),
   345  				zap.Error(gerr),
   346  			)
   347  			return []string{}
   348  		}
   349  
   350  		clientURLs = gcls.ClientURLs()
   351  		urls := struct{ PeerURLs []string }{gcls.PeerURLs()}
   352  		b, jerr := json.Marshal(urls)
   353  		if jerr != nil {
   354  			lg.Warn("proxy failed to marshal peer URLs", zap.Error(jerr))
   355  			return clientURLs
   356  		}
   357  
   358  		err = pkgioutil.WriteAndSyncFile(clusterfile+".bak", b, 0600)
   359  		if err != nil {
   360  			lg.Warn("proxy failed to write cluster file", zap.Error(err))
   361  			return clientURLs
   362  		}
   363  		err = os.Rename(clusterfile+".bak", clusterfile)
   364  		if err != nil {
   365  			lg.Warn(
   366  				"proxy failed to rename cluster file",
   367  				zap.String("path", clusterfile),
   368  				zap.Error(err),
   369  			)
   370  			return clientURLs
   371  		}
   372  		if !reflect.DeepEqual(gcls.PeerURLs(), peerURLs) {
   373  			lg.Info(
   374  				"proxy updated peer URLs",
   375  				zap.Strings("from", peerURLs),
   376  				zap.Strings("to", gcls.PeerURLs()),
   377  			)
   378  		}
   379  		peerURLs = gcls.PeerURLs()
   380  
   381  		return clientURLs
   382  	}
   383  	ph := httpproxy.NewHandler(lg, pt, uf, time.Duration(cfg.cp.ProxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.cp.ProxyRefreshIntervalMs)*time.Millisecond)
   384  	ph = embed.WrapCORS(cfg.ec.CORS, ph)
   385  
   386  	if cfg.isReadonlyProxy() {
   387  		ph = httpproxy.NewReadonlyHandler(ph)
   388  	}
   389  
   390  	// setup self signed certs when serving https
   391  	cHosts, cTLS := []string{}, false
   392  	for _, u := range cfg.ec.ListenClientUrls {
   393  		cHosts = append(cHosts, u.Host)
   394  		cTLS = cTLS || u.Scheme == "https"
   395  	}
   396  	for _, u := range cfg.ec.AdvertiseClientUrls {
   397  		cHosts = append(cHosts, u.Host)
   398  		cTLS = cTLS || u.Scheme == "https"
   399  	}
   400  	listenerTLS := cfg.ec.ClientTLSInfo
   401  	if cfg.ec.ClientAutoTLS && cTLS {
   402  		listenerTLS, err = transport.SelfCert(cfg.ec.GetLogger(), filepath.Join(cfg.ec.Dir, "clientCerts"), cHosts, cfg.ec.SelfSignedCertValidity)
   403  		if err != nil {
   404  			lg.Fatal("failed to initialize self-signed client cert", zap.Error(err))
   405  		}
   406  	}
   407  
   408  	// Start a proxy server goroutine for each listen address
   409  	for _, u := range cfg.ec.ListenClientUrls {
   410  		l, err := transport.NewListener(u.Host, u.Scheme, &listenerTLS)
   411  		if err != nil {
   412  			return err
   413  		}
   414  
   415  		host := u.String()
   416  		go func() {
   417  			lg.Info("v2 proxy started listening on client requests", zap.String("host", host))
   418  			mux := http.NewServeMux()
   419  			etcdhttp.HandleMetrics(mux) // v2 proxy just uses the same port
   420  			mux.Handle("/", ph)
   421  			lg.Fatal("done serving", zap.Error(http.Serve(l, mux)))
   422  		}()
   423  	}
   424  	return nil
   425  }
   426  
   427  // identifyDataDirOrDie returns the type of the data dir.
   428  // Dies if the datadir is invalid.
   429  func identifyDataDirOrDie(lg *zap.Logger, dir string) dirType {
   430  	names, err := fileutil.ReadDir(dir)
   431  	if err != nil {
   432  		if os.IsNotExist(err) {
   433  			return dirEmpty
   434  		}
   435  		lg.Fatal("failed to list data directory", zap.String("dir", dir), zap.Error(err))
   436  	}
   437  
   438  	var m, p bool
   439  	for _, name := range names {
   440  		switch dirType(name) {
   441  		case dirMember:
   442  			m = true
   443  		case dirProxy:
   444  			p = true
   445  		default:
   446  			lg.Warn(
   447  				"found invalid file under data directory",
   448  				zap.String("filename", name),
   449  				zap.String("data-dir", dir),
   450  			)
   451  		}
   452  	}
   453  
   454  	if m && p {
   455  		lg.Fatal("invalid datadir; both member and proxy directories exist")
   456  	}
   457  	if m {
   458  		return dirMember
   459  	}
   460  	if p {
   461  		return dirProxy
   462  	}
   463  	return dirEmpty
   464  }
   465  
   466  func checkSupportArch() {
   467  	lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
   468  	if err != nil {
   469  		panic(err)
   470  	}
   471  	// to add a new platform, check https://github.com/etcd-io/website/blob/main/content/en/docs/next/op-guide/supported-platform.md
   472  	if runtime.GOARCH == "amd64" ||
   473  		runtime.GOARCH == "arm64" ||
   474  		runtime.GOARCH == "ppc64le" ||
   475  		runtime.GOARCH == "s390x" {
   476  		return
   477  	}
   478  	// unsupported arch only configured via environment variable
   479  	// so unset here to not parse through flag
   480  	defer os.Unsetenv("ETCD_UNSUPPORTED_ARCH")
   481  	if env, ok := os.LookupEnv("ETCD_UNSUPPORTED_ARCH"); ok && env == runtime.GOARCH {
   482  		lg.Info("running etcd on unsupported architecture since ETCD_UNSUPPORTED_ARCH is set", zap.String("arch", env))
   483  		return
   484  	}
   485  
   486  	lg.Error("running etcd on unsupported architecture since ETCD_UNSUPPORTED_ARCH is set", zap.String("arch", runtime.GOARCH))
   487  	os.Exit(1)
   488  }
   489  

View as plain text