...

Source file src/github.com/GoogleCloudPlatform/cloudsql-proxy/cmd/cloud_sql_proxy/proxy.go

Documentation: github.com/GoogleCloudPlatform/cloudsql-proxy/cmd/cloud_sql_proxy

     1  // Copyright 2015 Google Inc. All Rights Reserved.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package main
    16  
    17  // This file contains code for supporting local sockets for the Cloud SQL Auth proxy.
    18  
    19  import (
    20  	"bytes"
    21  	"errors"
    22  	"fmt"
    23  	"net"
    24  	"net/http"
    25  	"os"
    26  	"path/filepath"
    27  	"runtime"
    28  	"strings"
    29  	"time"
    30  
    31  	"github.com/GoogleCloudPlatform/cloudsql-proxy/logging"
    32  	"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/fuse"
    33  	"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy"
    34  	sqladmin "google.golang.org/api/sqladmin/v1beta4"
    35  )
    36  
    37  // WatchInstances handles the lifecycle of local sockets used for proxying
    38  // local connections.  Values received from the updates channel are
    39  // interpretted as a comma-separated list of instances.  The set of sockets in
    40  // 'dir' is the union of 'instances' and the most recent list from 'updates'.
    41  func WatchInstances(dir string, cfgs []instanceConfig, updates <-chan string, cl *http.Client) (<-chan proxy.Conn, error) {
    42  	ch := make(chan proxy.Conn, 1)
    43  
    44  	// Instances specified statically (e.g. as flags to the binary) will always
    45  	// be available. They are ignored if also returned by the GCE metadata since
    46  	// the socket will already be open.
    47  	staticInstances := make(map[string]net.Listener, len(cfgs))
    48  	for _, v := range cfgs {
    49  		l, err := listenInstance(ch, v)
    50  		if err != nil {
    51  			return nil, err
    52  		}
    53  		staticInstances[v.Instance] = l
    54  	}
    55  
    56  	if updates != nil {
    57  		go watchInstancesLoop(dir, ch, updates, staticInstances, cl)
    58  	}
    59  	return ch, nil
    60  }
    61  
    62  func watchInstancesLoop(dir string, dst chan<- proxy.Conn, updates <-chan string, static map[string]net.Listener, cl *http.Client) {
    63  	dynamicInstances := make(map[string]net.Listener)
    64  	for instances := range updates {
    65  		// All instances were legal when we started, so we pass false below to ensure we don't skip them
    66  		// later if they became unhealthy for some reason; this would be a serious enough problem.
    67  		list, err := parseInstanceConfigs(dir, strings.Split(instances, ","), cl, false)
    68  		if err != nil {
    69  			logging.Errorf("%v", err)
    70  			// If we do not have a valid list of instances, skip this update
    71  			continue
    72  		}
    73  
    74  		stillOpen := make(map[string]net.Listener)
    75  		for _, cfg := range list {
    76  			instance := cfg.Instance
    77  
    78  			// If the instance is specified in the static list don't do anything:
    79  			// it's already open and should stay open forever.
    80  			if _, ok := static[instance]; ok {
    81  				continue
    82  			}
    83  
    84  			if l, ok := dynamicInstances[instance]; ok {
    85  				delete(dynamicInstances, instance)
    86  				stillOpen[instance] = l
    87  				continue
    88  			}
    89  
    90  			l, err := listenInstance(dst, cfg)
    91  			if err != nil {
    92  				logging.Errorf("Couldn't open socket for %q: %v", instance, err)
    93  				continue
    94  			}
    95  			stillOpen[instance] = l
    96  		}
    97  
    98  		// Any instance in dynamicInstances was not in the most recent metadata
    99  		// update. Clean up those instances' sockets by closing them; note that
   100  		// this does not affect any existing connections instance.
   101  		for instance, listener := range dynamicInstances {
   102  			logging.Infof("Closing socket for instance %v", instance)
   103  			listener.Close()
   104  		}
   105  
   106  		dynamicInstances = stillOpen
   107  	}
   108  
   109  	for _, v := range static {
   110  		if err := v.Close(); err != nil {
   111  			logging.Errorf("Error closing %q: %v", v.Addr(), err)
   112  		}
   113  	}
   114  	for _, v := range dynamicInstances {
   115  		if err := v.Close(); err != nil {
   116  			logging.Errorf("Error closing %q: %v", v.Addr(), err)
   117  		}
   118  	}
   119  }
   120  
   121  func remove(path string) {
   122  	if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
   123  		logging.Infof("Remove(%q) error: %v", path, err)
   124  	}
   125  }
   126  
   127  // listenInstance starts listening on a new unix socket in dir to connect to the
   128  // specified instance. New connections to this socket are sent to dst.
   129  func listenInstance(dst chan<- proxy.Conn, cfg instanceConfig) (net.Listener, error) {
   130  	unix := cfg.Network == "unix"
   131  	if unix {
   132  		remove(cfg.Address)
   133  	}
   134  	l, err := net.Listen(cfg.Network, cfg.Address)
   135  	if err != nil {
   136  		return nil, err
   137  	}
   138  	if unix {
   139  		if err := os.Chmod(cfg.Address, 0777|os.ModeSocket); err != nil {
   140  			logging.Errorf("couldn't update permissions for socket file %q: %v; other users may not be unable to connect", cfg.Address, err)
   141  		}
   142  	}
   143  
   144  	go func() {
   145  		for {
   146  			start := time.Now()
   147  			c, err := l.Accept()
   148  			if err != nil {
   149  				logging.Errorf("Error in accept for %q on %v: %v", cfg, cfg.Address, err)
   150  				if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
   151  					d := 10*time.Millisecond - time.Since(start)
   152  					if d > 0 {
   153  						time.Sleep(d)
   154  					}
   155  					continue
   156  				}
   157  				l.Close()
   158  				return
   159  			}
   160  			logging.Verbosef("New connection for %q", cfg.Instance)
   161  
   162  			switch clientConn := c.(type) {
   163  			case *net.TCPConn:
   164  				clientConn.SetKeepAlive(true)
   165  				clientConn.SetKeepAlivePeriod(1 * time.Minute)
   166  
   167  			}
   168  			dst <- proxy.Conn{cfg.Instance, c}
   169  		}
   170  	}()
   171  
   172  	logging.Infof("Listening on %s for %s", cfg.Address, cfg.Instance)
   173  	return l, nil
   174  }
   175  
   176  type instanceConfig struct {
   177  	Instance         string
   178  	Network, Address string
   179  }
   180  
   181  // loopbackForNet maps a network (e.g. tcp6) to the loopback address for that
   182  // network. It is updated during the initialization of validNets to include a
   183  // valid loopback address for "tcp".
   184  var loopbackForNet = map[string]string{
   185  	"tcp4": "127.0.0.1",
   186  	"tcp6": "::1",
   187  }
   188  
   189  // validNets tracks the networks that are valid for this platform and machine.
   190  var validNets = func() map[string]bool {
   191  	m := map[string]bool{
   192  		"unix": runtime.GOOS != "windows",
   193  	}
   194  
   195  	anyTCP := false
   196  	for _, n := range []string{"tcp4", "tcp6"} {
   197  		host, ok := loopbackForNet[n]
   198  		if !ok {
   199  			// This is effectively a compile-time error.
   200  			panic(fmt.Sprintf("no loopback address found for %v", n))
   201  		}
   202  		// Open any port to see if the net is valid.
   203  		x, err := net.Listen(n, net.JoinHostPort(host, "0"))
   204  		if err != nil {
   205  			// Error is too verbose to be useful.
   206  			continue
   207  		}
   208  		x.Close()
   209  		m[n] = true
   210  
   211  		if !anyTCP {
   212  			anyTCP = true
   213  			// Set the loopback value for generic tcp if it hasn't already been
   214  			// set. (If both tcp4/tcp6 are supported the first one in the list
   215  			// (tcp4's 127.0.0.1) is used.
   216  			loopbackForNet["tcp"] = host
   217  		}
   218  	}
   219  	if anyTCP {
   220  		m["tcp"] = true
   221  	}
   222  	return m
   223  }()
   224  
   225  func parseInstanceConfig(dir, instance string, cl *http.Client) (instanceConfig, error) {
   226  	var ret instanceConfig
   227  	proj, region, name, args, err := proxy.ParseInstanceConnectionName(instance)
   228  	if err != nil {
   229  		return instanceConfig{}, err
   230  	}
   231  	ret.Instance = args[0]
   232  	regionName := fmt.Sprintf("%s~%s", region, name)
   233  	if len(args) == 1 {
   234  		// Default to listening via unix socket in specified directory
   235  		ret.Network = "unix"
   236  		ret.Address = filepath.Join(dir, instance)
   237  	} else {
   238  		// Parse the instance options if present.
   239  		opts := strings.SplitN(args[1], ":", 2)
   240  		if len(opts) != 2 {
   241  			return instanceConfig{}, fmt.Errorf("invalid instance options: must be in the form `unix:/path/to/socket`, `tcp:port`, `tcp:host:port`; invalid option was %q", strings.Join(opts, ":"))
   242  		}
   243  		ret.Network = opts[0]
   244  		var err error
   245  		if ret.Network == "unix" {
   246  			if strings.HasPrefix(opts[1], "/") {
   247  				ret.Address = opts[1] // Root path.
   248  			} else {
   249  				ret.Address = filepath.Join(dir, opts[1])
   250  			}
   251  		} else {
   252  			ret.Address, err = parseTCPOpts(opts[0], opts[1])
   253  		}
   254  		if err != nil {
   255  			return instanceConfig{}, err
   256  		}
   257  	}
   258  
   259  	// Use the SQL Admin API to verify compatibility with the instance.
   260  	sql, err := sqladmin.New(cl)
   261  	if err != nil {
   262  		return instanceConfig{}, err
   263  	}
   264  	if *host != "" {
   265  		sql.BasePath = *host
   266  	}
   267  	inst, err := sql.Connect.Get(proj, regionName).Do()
   268  	if err != nil {
   269  		return instanceConfig{}, err
   270  	}
   271  	if inst.BackendType == "FIRST_GEN" {
   272  		logging.Errorf("WARNING: proxy client does not support first generation Cloud SQL instances.")
   273  		return instanceConfig{}, fmt.Errorf("%q is a first generation instance", instance)
   274  	}
   275  	// Postgres instances use a special suffix on the unix socket.
   276  	// See https://www.postgresql.org/docs/11/runtime-config-connection.html
   277  	if ret.Network == "unix" && strings.HasPrefix(strings.ToLower(inst.DatabaseVersion), "postgres") {
   278  		// Verify the directory exists.
   279  		if err := os.MkdirAll(ret.Address, 0755); err != nil {
   280  			return instanceConfig{}, err
   281  		}
   282  		ret.Address = filepath.Join(ret.Address, ".s.PGSQL.5432")
   283  	}
   284  
   285  	if !validNets[ret.Network] {
   286  		return ret, fmt.Errorf("invalid %q: unsupported network: %v", instance, ret.Network)
   287  	}
   288  	return ret, nil
   289  }
   290  
   291  // parseTCPOpts parses the instance options when specifying tcp port options.
   292  func parseTCPOpts(ntwk, addrOpt string) (string, error) {
   293  	if strings.Contains(addrOpt, ":") {
   294  		return addrOpt, nil // User provided a host and port; use that.
   295  	}
   296  	// No "host" part of the address. Be safe and assume that they want a loopback address.
   297  	addr, ok := loopbackForNet[ntwk]
   298  	if !ok {
   299  		return "", fmt.Errorf("invalid %q:%q: unrecognized network %v", ntwk, addrOpt, ntwk)
   300  	}
   301  	return net.JoinHostPort(addr, addrOpt), nil
   302  }
   303  
   304  // parseInstanceConfigs calls parseInstanceConfig for each instance in the
   305  // provided slice, collecting errors along the way. There may be valid
   306  // instanceConfigs returned even if there's an error.
   307  func parseInstanceConfigs(dir string, instances []string, cl *http.Client, skipFailedInstanceConfigs bool) ([]instanceConfig, error) {
   308  	errs := new(bytes.Buffer)
   309  	var cfg []instanceConfig
   310  	for _, v := range instances {
   311  		if v == "" {
   312  			continue
   313  		}
   314  		if c, err := parseInstanceConfig(dir, v, cl); err != nil {
   315  			if skipFailedInstanceConfigs {
   316  				logging.Infof("There was a problem when parsing an instance configuration but ignoring due to the configuration. Error: %v", err)
   317  			} else {
   318  				fmt.Fprintf(errs, "\n\t%v", err)
   319  			}
   320  
   321  		} else {
   322  			cfg = append(cfg, c)
   323  		}
   324  	}
   325  
   326  	var err error
   327  	if errs.Len() > 0 {
   328  		err = fmt.Errorf("errors parsing config:%s", errs)
   329  	}
   330  	return cfg, err
   331  }
   332  
   333  // CreateInstanceConfigs verifies that the parameters passed to it are valid
   334  // for the proxy for the platform and system and then returns a slice of valid
   335  // instanceConfig. It is possible for the instanceConfig to be empty if no valid
   336  // configurations were specified, however `err` will be set.
   337  func CreateInstanceConfigs(dir string, useFuse bool, instances []string, instancesSrc string, cl *http.Client, skipFailedInstanceConfigs bool) ([]instanceConfig, error) {
   338  	if useFuse && !fuse.Supported() {
   339  		return nil, errors.New("FUSE not supported on this system")
   340  	}
   341  
   342  	cfgs, err := parseInstanceConfigs(dir, instances, cl, skipFailedInstanceConfigs)
   343  	if err != nil {
   344  		return nil, err
   345  	}
   346  
   347  	if dir == "" {
   348  		// Reasons to set '-dir':
   349  		//    - Using -fuse
   350  		//    - Using the metadata to get a list of instances
   351  		//    - Having an instance that uses a 'unix' network
   352  		if useFuse {
   353  			return nil, errors.New("must set -dir because -fuse was set")
   354  		} else if instancesSrc != "" {
   355  			return nil, errors.New("must set -dir because -instances_metadata was set")
   356  		} else {
   357  			for _, v := range cfgs {
   358  				if v.Network == "unix" {
   359  					return nil, fmt.Errorf("must set -dir: using a unix socket for %v", v.Instance)
   360  				}
   361  			}
   362  		}
   363  		// Otherwise it's safe to not set -dir
   364  	}
   365  
   366  	if useFuse {
   367  		if len(instances) != 0 || instancesSrc != "" {
   368  			return nil, errors.New("-fuse is not compatible with -projects, -instances, or -instances_metadata")
   369  		}
   370  		return nil, nil
   371  	}
   372  	// FUSE disabled.
   373  	if len(instances) == 0 && instancesSrc == "" {
   374  		// Failure to specifying instance can be caused by following reasons.
   375  		// 1. not enough information is provided by flags
   376  		// 2. failed to invoke gcloud
   377  		var flags string
   378  		if fuse.Supported() {
   379  			flags = "-projects, -fuse, -instances or -instances_metadata"
   380  		} else {
   381  			flags = "-projects, -instances or -instances_metadata"
   382  		}
   383  
   384  		errStr := fmt.Sprintf("no instance selected because none of %s is specified", flags)
   385  		return nil, errors.New(errStr)
   386  	}
   387  	return cfgs, nil
   388  }
   389  

View as plain text