1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package main
16
17
18
19 import (
20 "bytes"
21 "errors"
22 "fmt"
23 "net"
24 "net/http"
25 "os"
26 "path/filepath"
27 "runtime"
28 "strings"
29 "time"
30
31 "github.com/GoogleCloudPlatform/cloudsql-proxy/logging"
32 "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/fuse"
33 "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy"
34 sqladmin "google.golang.org/api/sqladmin/v1beta4"
35 )
36
37
38
39
40
41 func WatchInstances(dir string, cfgs []instanceConfig, updates <-chan string, cl *http.Client) (<-chan proxy.Conn, error) {
42 ch := make(chan proxy.Conn, 1)
43
44
45
46
47 staticInstances := make(map[string]net.Listener, len(cfgs))
48 for _, v := range cfgs {
49 l, err := listenInstance(ch, v)
50 if err != nil {
51 return nil, err
52 }
53 staticInstances[v.Instance] = l
54 }
55
56 if updates != nil {
57 go watchInstancesLoop(dir, ch, updates, staticInstances, cl)
58 }
59 return ch, nil
60 }
61
62 func watchInstancesLoop(dir string, dst chan<- proxy.Conn, updates <-chan string, static map[string]net.Listener, cl *http.Client) {
63 dynamicInstances := make(map[string]net.Listener)
64 for instances := range updates {
65
66
67 list, err := parseInstanceConfigs(dir, strings.Split(instances, ","), cl, false)
68 if err != nil {
69 logging.Errorf("%v", err)
70
71 continue
72 }
73
74 stillOpen := make(map[string]net.Listener)
75 for _, cfg := range list {
76 instance := cfg.Instance
77
78
79
80 if _, ok := static[instance]; ok {
81 continue
82 }
83
84 if l, ok := dynamicInstances[instance]; ok {
85 delete(dynamicInstances, instance)
86 stillOpen[instance] = l
87 continue
88 }
89
90 l, err := listenInstance(dst, cfg)
91 if err != nil {
92 logging.Errorf("Couldn't open socket for %q: %v", instance, err)
93 continue
94 }
95 stillOpen[instance] = l
96 }
97
98
99
100
101 for instance, listener := range dynamicInstances {
102 logging.Infof("Closing socket for instance %v", instance)
103 listener.Close()
104 }
105
106 dynamicInstances = stillOpen
107 }
108
109 for _, v := range static {
110 if err := v.Close(); err != nil {
111 logging.Errorf("Error closing %q: %v", v.Addr(), err)
112 }
113 }
114 for _, v := range dynamicInstances {
115 if err := v.Close(); err != nil {
116 logging.Errorf("Error closing %q: %v", v.Addr(), err)
117 }
118 }
119 }
120
121 func remove(path string) {
122 if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
123 logging.Infof("Remove(%q) error: %v", path, err)
124 }
125 }
126
127
128
129 func listenInstance(dst chan<- proxy.Conn, cfg instanceConfig) (net.Listener, error) {
130 unix := cfg.Network == "unix"
131 if unix {
132 remove(cfg.Address)
133 }
134 l, err := net.Listen(cfg.Network, cfg.Address)
135 if err != nil {
136 return nil, err
137 }
138 if unix {
139 if err := os.Chmod(cfg.Address, 0777|os.ModeSocket); err != nil {
140 logging.Errorf("couldn't update permissions for socket file %q: %v; other users may not be unable to connect", cfg.Address, err)
141 }
142 }
143
144 go func() {
145 for {
146 start := time.Now()
147 c, err := l.Accept()
148 if err != nil {
149 logging.Errorf("Error in accept for %q on %v: %v", cfg, cfg.Address, err)
150 if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
151 d := 10*time.Millisecond - time.Since(start)
152 if d > 0 {
153 time.Sleep(d)
154 }
155 continue
156 }
157 l.Close()
158 return
159 }
160 logging.Verbosef("New connection for %q", cfg.Instance)
161
162 switch clientConn := c.(type) {
163 case *net.TCPConn:
164 clientConn.SetKeepAlive(true)
165 clientConn.SetKeepAlivePeriod(1 * time.Minute)
166
167 }
168 dst <- proxy.Conn{cfg.Instance, c}
169 }
170 }()
171
172 logging.Infof("Listening on %s for %s", cfg.Address, cfg.Instance)
173 return l, nil
174 }
175
176 type instanceConfig struct {
177 Instance string
178 Network, Address string
179 }
180
181
182
183
184 var loopbackForNet = map[string]string{
185 "tcp4": "127.0.0.1",
186 "tcp6": "::1",
187 }
188
189
190 var validNets = func() map[string]bool {
191 m := map[string]bool{
192 "unix": runtime.GOOS != "windows",
193 }
194
195 anyTCP := false
196 for _, n := range []string{"tcp4", "tcp6"} {
197 host, ok := loopbackForNet[n]
198 if !ok {
199
200 panic(fmt.Sprintf("no loopback address found for %v", n))
201 }
202
203 x, err := net.Listen(n, net.JoinHostPort(host, "0"))
204 if err != nil {
205
206 continue
207 }
208 x.Close()
209 m[n] = true
210
211 if !anyTCP {
212 anyTCP = true
213
214
215
216 loopbackForNet["tcp"] = host
217 }
218 }
219 if anyTCP {
220 m["tcp"] = true
221 }
222 return m
223 }()
224
225 func parseInstanceConfig(dir, instance string, cl *http.Client) (instanceConfig, error) {
226 var ret instanceConfig
227 proj, region, name, args, err := proxy.ParseInstanceConnectionName(instance)
228 if err != nil {
229 return instanceConfig{}, err
230 }
231 ret.Instance = args[0]
232 regionName := fmt.Sprintf("%s~%s", region, name)
233 if len(args) == 1 {
234
235 ret.Network = "unix"
236 ret.Address = filepath.Join(dir, instance)
237 } else {
238
239 opts := strings.SplitN(args[1], ":", 2)
240 if len(opts) != 2 {
241 return instanceConfig{}, fmt.Errorf("invalid instance options: must be in the form `unix:/path/to/socket`, `tcp:port`, `tcp:host:port`; invalid option was %q", strings.Join(opts, ":"))
242 }
243 ret.Network = opts[0]
244 var err error
245 if ret.Network == "unix" {
246 if strings.HasPrefix(opts[1], "/") {
247 ret.Address = opts[1]
248 } else {
249 ret.Address = filepath.Join(dir, opts[1])
250 }
251 } else {
252 ret.Address, err = parseTCPOpts(opts[0], opts[1])
253 }
254 if err != nil {
255 return instanceConfig{}, err
256 }
257 }
258
259
260 sql, err := sqladmin.New(cl)
261 if err != nil {
262 return instanceConfig{}, err
263 }
264 if *host != "" {
265 sql.BasePath = *host
266 }
267 inst, err := sql.Connect.Get(proj, regionName).Do()
268 if err != nil {
269 return instanceConfig{}, err
270 }
271 if inst.BackendType == "FIRST_GEN" {
272 logging.Errorf("WARNING: proxy client does not support first generation Cloud SQL instances.")
273 return instanceConfig{}, fmt.Errorf("%q is a first generation instance", instance)
274 }
275
276
277 if ret.Network == "unix" && strings.HasPrefix(strings.ToLower(inst.DatabaseVersion), "postgres") {
278
279 if err := os.MkdirAll(ret.Address, 0755); err != nil {
280 return instanceConfig{}, err
281 }
282 ret.Address = filepath.Join(ret.Address, ".s.PGSQL.5432")
283 }
284
285 if !validNets[ret.Network] {
286 return ret, fmt.Errorf("invalid %q: unsupported network: %v", instance, ret.Network)
287 }
288 return ret, nil
289 }
290
291
292 func parseTCPOpts(ntwk, addrOpt string) (string, error) {
293 if strings.Contains(addrOpt, ":") {
294 return addrOpt, nil
295 }
296
297 addr, ok := loopbackForNet[ntwk]
298 if !ok {
299 return "", fmt.Errorf("invalid %q:%q: unrecognized network %v", ntwk, addrOpt, ntwk)
300 }
301 return net.JoinHostPort(addr, addrOpt), nil
302 }
303
304
305
306
307 func parseInstanceConfigs(dir string, instances []string, cl *http.Client, skipFailedInstanceConfigs bool) ([]instanceConfig, error) {
308 errs := new(bytes.Buffer)
309 var cfg []instanceConfig
310 for _, v := range instances {
311 if v == "" {
312 continue
313 }
314 if c, err := parseInstanceConfig(dir, v, cl); err != nil {
315 if skipFailedInstanceConfigs {
316 logging.Infof("There was a problem when parsing an instance configuration but ignoring due to the configuration. Error: %v", err)
317 } else {
318 fmt.Fprintf(errs, "\n\t%v", err)
319 }
320
321 } else {
322 cfg = append(cfg, c)
323 }
324 }
325
326 var err error
327 if errs.Len() > 0 {
328 err = fmt.Errorf("errors parsing config:%s", errs)
329 }
330 return cfg, err
331 }
332
333
334
335
336
337 func CreateInstanceConfigs(dir string, useFuse bool, instances []string, instancesSrc string, cl *http.Client, skipFailedInstanceConfigs bool) ([]instanceConfig, error) {
338 if useFuse && !fuse.Supported() {
339 return nil, errors.New("FUSE not supported on this system")
340 }
341
342 cfgs, err := parseInstanceConfigs(dir, instances, cl, skipFailedInstanceConfigs)
343 if err != nil {
344 return nil, err
345 }
346
347 if dir == "" {
348
349
350
351
352 if useFuse {
353 return nil, errors.New("must set -dir because -fuse was set")
354 } else if instancesSrc != "" {
355 return nil, errors.New("must set -dir because -instances_metadata was set")
356 } else {
357 for _, v := range cfgs {
358 if v.Network == "unix" {
359 return nil, fmt.Errorf("must set -dir: using a unix socket for %v", v.Instance)
360 }
361 }
362 }
363
364 }
365
366 if useFuse {
367 if len(instances) != 0 || instancesSrc != "" {
368 return nil, errors.New("-fuse is not compatible with -projects, -instances, or -instances_metadata")
369 }
370 return nil, nil
371 }
372
373 if len(instances) == 0 && instancesSrc == "" {
374
375
376
377 var flags string
378 if fuse.Supported() {
379 flags = "-projects, -fuse, -instances or -instances_metadata"
380 } else {
381 flags = "-projects, -instances or -instances_metadata"
382 }
383
384 errStr := fmt.Sprintf("no instance selected because none of %s is specified", flags)
385 return nil, errors.New(errStr)
386 }
387 return cfgs, nil
388 }
389
View as plain text