...

Source file src/go.etcd.io/etcd/server/v3/etcdmain/gateway.go

Documentation: go.etcd.io/etcd/server/v3/etcdmain

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package etcdmain
    16  
    17  import (
    18  	"fmt"
    19  	"net"
    20  	"net/url"
    21  	"os"
    22  	"time"
    23  
    24  	"go.etcd.io/etcd/client/pkg/v3/logutil"
    25  	"go.etcd.io/etcd/server/v3/proxy/tcpproxy"
    26  
    27  	"github.com/spf13/cobra"
    28  	"go.uber.org/zap"
    29  )
    30  
    31  var (
    32  	gatewayListenAddr            string
    33  	gatewayEndpoints             []string
    34  	gatewayDNSCluster            string
    35  	gatewayDNSClusterServiceName string
    36  	gatewayInsecureDiscovery     bool
    37  	gatewayRetryDelay            time.Duration
    38  	gatewayCA                    string
    39  )
    40  
    41  var (
    42  	rootCmd = &cobra.Command{
    43  		Use:        "etcd",
    44  		Short:      "etcd server",
    45  		SuggestFor: []string{"etcd"},
    46  	}
    47  )
    48  
    49  func init() {
    50  	rootCmd.AddCommand(newGatewayCommand())
    51  }
    52  
    53  // newGatewayCommand returns the cobra command for "gateway".
    54  func newGatewayCommand() *cobra.Command {
    55  	lpc := &cobra.Command{
    56  		Use:   "gateway <subcommand>",
    57  		Short: "gateway related command",
    58  	}
    59  	lpc.AddCommand(newGatewayStartCommand())
    60  
    61  	return lpc
    62  }
    63  
    64  func newGatewayStartCommand() *cobra.Command {
    65  	cmd := cobra.Command{
    66  		Use:   "start",
    67  		Short: "start the gateway",
    68  		Run:   startGateway,
    69  	}
    70  
    71  	cmd.Flags().StringVar(&gatewayListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
    72  	cmd.Flags().StringVar(&gatewayDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
    73  	cmd.Flags().StringVar(&gatewayDNSClusterServiceName, "discovery-srv-name", "", "service name to query when using DNS discovery")
    74  	cmd.Flags().BoolVar(&gatewayInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
    75  	cmd.Flags().StringVar(&gatewayCA, "trusted-ca-file", "", "path to the client server TLS CA file for verifying the discovered endpoints when discovery-srv is provided.")
    76  
    77  	cmd.Flags().StringSliceVar(&gatewayEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
    78  
    79  	cmd.Flags().DurationVar(&gatewayRetryDelay, "retry-delay", time.Minute, "duration of delay before retrying failed endpoints")
    80  
    81  	return &cmd
    82  }
    83  
    84  func stripSchema(eps []string) []string {
    85  	var endpoints []string
    86  	for _, ep := range eps {
    87  		if u, err := url.Parse(ep); err == nil && u.Host != "" {
    88  			ep = u.Host
    89  		}
    90  		endpoints = append(endpoints, ep)
    91  	}
    92  	return endpoints
    93  }
    94  
    95  func startGateway(cmd *cobra.Command, args []string) {
    96  	lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
    97  	if err != nil {
    98  		fmt.Fprintln(os.Stderr, err)
    99  		os.Exit(1)
   100  	}
   101  
   102  	// We use os.Args to show all the arguments (not only passed-through Cobra).
   103  	lg.Info("Running: ", zap.Strings("args", os.Args))
   104  
   105  	srvs := discoverEndpoints(lg, gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery, gatewayDNSClusterServiceName)
   106  	if len(srvs.Endpoints) == 0 {
   107  		// no endpoints discovered, fall back to provided endpoints
   108  		srvs.Endpoints = gatewayEndpoints
   109  	}
   110  	// Strip the schema from the endpoints because we start just a TCP proxy
   111  	srvs.Endpoints = stripSchema(srvs.Endpoints)
   112  	if len(srvs.SRVs) == 0 {
   113  		for _, ep := range srvs.Endpoints {
   114  			h, p, serr := net.SplitHostPort(ep)
   115  			if serr != nil {
   116  				fmt.Printf("error parsing endpoint %q", ep)
   117  				os.Exit(1)
   118  			}
   119  			var port uint16
   120  			fmt.Sscanf(p, "%d", &port)
   121  			srvs.SRVs = append(srvs.SRVs, &net.SRV{Target: h, Port: port})
   122  		}
   123  	}
   124  
   125  	lhost, lport, err := net.SplitHostPort(gatewayListenAddr)
   126  	if err != nil {
   127  		fmt.Println("failed to validate listen address:", gatewayListenAddr)
   128  		os.Exit(1)
   129  	}
   130  
   131  	laddrs, err := net.LookupHost(lhost)
   132  	if err != nil {
   133  		fmt.Println("failed to resolve listen host:", lhost)
   134  		os.Exit(1)
   135  	}
   136  	laddrsMap := make(map[string]bool)
   137  	for _, addr := range laddrs {
   138  		laddrsMap[addr] = true
   139  	}
   140  
   141  	for _, srv := range srvs.SRVs {
   142  		var eaddrs []string
   143  		eaddrs, err = net.LookupHost(srv.Target)
   144  		if err != nil {
   145  			fmt.Println("failed to resolve endpoint host:", srv.Target)
   146  			os.Exit(1)
   147  		}
   148  		if fmt.Sprintf("%d", srv.Port) != lport {
   149  			continue
   150  		}
   151  
   152  		for _, ea := range eaddrs {
   153  			if laddrsMap[ea] {
   154  				fmt.Printf("SRV or endpoint (%s:%d->%s:%d) should not resolve to the gateway listen addr (%s)\n", srv.Target, srv.Port, ea, srv.Port, gatewayListenAddr)
   155  				os.Exit(1)
   156  			}
   157  		}
   158  	}
   159  
   160  	if len(srvs.Endpoints) == 0 {
   161  		fmt.Println("no endpoints found")
   162  		os.Exit(1)
   163  	}
   164  
   165  	var l net.Listener
   166  	l, err = net.Listen("tcp", gatewayListenAddr)
   167  	if err != nil {
   168  		fmt.Fprintln(os.Stderr, err)
   169  		os.Exit(1)
   170  	}
   171  
   172  	tp := tcpproxy.TCPProxy{
   173  		Logger:          lg,
   174  		Listener:        l,
   175  		Endpoints:       srvs.SRVs,
   176  		MonitorInterval: gatewayRetryDelay,
   177  	}
   178  
   179  	// At this point, etcd gateway listener is initialized
   180  	notifySystemd(lg)
   181  
   182  	tp.Run()
   183  }
   184  

View as plain text