1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package etcdmain
16
17 import (
18 "fmt"
19 "net"
20 "net/url"
21 "os"
22 "time"
23
24 "go.etcd.io/etcd/client/pkg/v3/logutil"
25 "go.etcd.io/etcd/server/v3/proxy/tcpproxy"
26
27 "github.com/spf13/cobra"
28 "go.uber.org/zap"
29 )
30
31 var (
32 gatewayListenAddr string
33 gatewayEndpoints []string
34 gatewayDNSCluster string
35 gatewayDNSClusterServiceName string
36 gatewayInsecureDiscovery bool
37 gatewayRetryDelay time.Duration
38 gatewayCA string
39 )
40
41 var (
42 rootCmd = &cobra.Command{
43 Use: "etcd",
44 Short: "etcd server",
45 SuggestFor: []string{"etcd"},
46 }
47 )
48
49 func init() {
50 rootCmd.AddCommand(newGatewayCommand())
51 }
52
53
54 func newGatewayCommand() *cobra.Command {
55 lpc := &cobra.Command{
56 Use: "gateway <subcommand>",
57 Short: "gateway related command",
58 }
59 lpc.AddCommand(newGatewayStartCommand())
60
61 return lpc
62 }
63
64 func newGatewayStartCommand() *cobra.Command {
65 cmd := cobra.Command{
66 Use: "start",
67 Short: "start the gateway",
68 Run: startGateway,
69 }
70
71 cmd.Flags().StringVar(&gatewayListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
72 cmd.Flags().StringVar(&gatewayDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
73 cmd.Flags().StringVar(&gatewayDNSClusterServiceName, "discovery-srv-name", "", "service name to query when using DNS discovery")
74 cmd.Flags().BoolVar(&gatewayInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
75 cmd.Flags().StringVar(&gatewayCA, "trusted-ca-file", "", "path to the client server TLS CA file for verifying the discovered endpoints when discovery-srv is provided.")
76
77 cmd.Flags().StringSliceVar(&gatewayEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
78
79 cmd.Flags().DurationVar(&gatewayRetryDelay, "retry-delay", time.Minute, "duration of delay before retrying failed endpoints")
80
81 return &cmd
82 }
83
84 func stripSchema(eps []string) []string {
85 var endpoints []string
86 for _, ep := range eps {
87 if u, err := url.Parse(ep); err == nil && u.Host != "" {
88 ep = u.Host
89 }
90 endpoints = append(endpoints, ep)
91 }
92 return endpoints
93 }
94
95 func startGateway(cmd *cobra.Command, args []string) {
96 lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
97 if err != nil {
98 fmt.Fprintln(os.Stderr, err)
99 os.Exit(1)
100 }
101
102
103 lg.Info("Running: ", zap.Strings("args", os.Args))
104
105 srvs := discoverEndpoints(lg, gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery, gatewayDNSClusterServiceName)
106 if len(srvs.Endpoints) == 0 {
107
108 srvs.Endpoints = gatewayEndpoints
109 }
110
111 srvs.Endpoints = stripSchema(srvs.Endpoints)
112 if len(srvs.SRVs) == 0 {
113 for _, ep := range srvs.Endpoints {
114 h, p, serr := net.SplitHostPort(ep)
115 if serr != nil {
116 fmt.Printf("error parsing endpoint %q", ep)
117 os.Exit(1)
118 }
119 var port uint16
120 fmt.Sscanf(p, "%d", &port)
121 srvs.SRVs = append(srvs.SRVs, &net.SRV{Target: h, Port: port})
122 }
123 }
124
125 lhost, lport, err := net.SplitHostPort(gatewayListenAddr)
126 if err != nil {
127 fmt.Println("failed to validate listen address:", gatewayListenAddr)
128 os.Exit(1)
129 }
130
131 laddrs, err := net.LookupHost(lhost)
132 if err != nil {
133 fmt.Println("failed to resolve listen host:", lhost)
134 os.Exit(1)
135 }
136 laddrsMap := make(map[string]bool)
137 for _, addr := range laddrs {
138 laddrsMap[addr] = true
139 }
140
141 for _, srv := range srvs.SRVs {
142 var eaddrs []string
143 eaddrs, err = net.LookupHost(srv.Target)
144 if err != nil {
145 fmt.Println("failed to resolve endpoint host:", srv.Target)
146 os.Exit(1)
147 }
148 if fmt.Sprintf("%d", srv.Port) != lport {
149 continue
150 }
151
152 for _, ea := range eaddrs {
153 if laddrsMap[ea] {
154 fmt.Printf("SRV or endpoint (%s:%d->%s:%d) should not resolve to the gateway listen addr (%s)\n", srv.Target, srv.Port, ea, srv.Port, gatewayListenAddr)
155 os.Exit(1)
156 }
157 }
158 }
159
160 if len(srvs.Endpoints) == 0 {
161 fmt.Println("no endpoints found")
162 os.Exit(1)
163 }
164
165 var l net.Listener
166 l, err = net.Listen("tcp", gatewayListenAddr)
167 if err != nil {
168 fmt.Fprintln(os.Stderr, err)
169 os.Exit(1)
170 }
171
172 tp := tcpproxy.TCPProxy{
173 Logger: lg,
174 Listener: l,
175 Endpoints: srvs.SRVs,
176 MonitorInterval: gatewayRetryDelay,
177 }
178
179
180 notifySystemd(lg)
181
182 tp.Run()
183 }
184
View as plain text