1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package main
21
22 import (
23 "errors"
24 "flag"
25 "fmt"
26 "io/ioutil"
27 "net/http"
28 "os"
29 "os/signal"
30 "path/filepath"
31 "strings"
32 "sync"
33 "syscall"
34 "time"
35
36 "github.com/GoogleCloudPlatform/cloudsql-proxy/cmd/cloud_sql_proxy/internal/healthcheck"
37 "github.com/GoogleCloudPlatform/cloudsql-proxy/logging"
38 "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/certs"
39 "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/fuse"
40 "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/limits"
41 "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy"
42 "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/util"
43
44 "cloud.google.com/go/compute/metadata"
45 "github.com/coreos/go-systemd/v22/daemon"
46 "golang.org/x/net/context"
47 "golang.org/x/oauth2"
48 goauth "golang.org/x/oauth2/google"
49 sqladmin "google.golang.org/api/sqladmin/v1beta4"
50 )
51
52 var (
53 version = flag.Bool("version", false, "Print the version of the proxy and exit")
54 verbose = flag.Bool("verbose", true,
55 `If false, verbose output such as information about when connections are
56 created/closed without error are suppressed`,
57 )
58 quiet = flag.Bool("quiet", false, "Disable log messages")
59 logDebugStdout = flag.Bool("log_debug_stdout", false, "If true, log messages that are not errors will output to stdout instead of stderr")
60 structuredLogs = flag.Bool("structured_logs", false, "Configures all log messages to be emitted as JSON.")
61
62 refreshCfgThrottle = flag.Duration("refresh_config_throttle", proxy.DefaultRefreshCfgThrottle,
63 `If set, this flag specifies the amount of forced sleep between successive
64 API calls in order to protect client API quota. Minimum allowed value is
65 `+minimumRefreshCfgThrottle.String(),
66 )
67 checkRegion = flag.Bool("check_region", false, `If specified, the 'region' portion of the connection string is required for
68 Unix socket-based connections.`)
69
70
71 dir = flag.String("dir", "", "Directory to use for placing Unix sockets representing database instances")
72 projects = flag.String("projects", "",
73 `Open sockets for each Cloud SQL Instance in the projects specified
74 (comma-separated list)`,
75 )
76 instances stringListValue
77 instanceSrc = flag.String("instances_metadata", "", `If provided, it is treated as a path to a metadata value which
78 is polled for a comma-separated list of instances to connect to. For example,
79 to use the instance metadata value named 'cloud-sql-instances' you would
80 provide 'instance/attributes/cloud-sql-instances'. Not compatible with -fuse`)
81 useFuse = flag.Bool("fuse", false, `Mount a directory at 'dir' using FUSE for accessing instances. Note that the
82 directory at 'dir' must be empty before this program is started.`)
83 fuseTmp = flag.String("fuse_tmp", defaultTmp, `Used as a temporary directory if -fuse is set. Note that files in this directory
84 can be removed automatically by this program.`)
85
86
87 maxConnections = flag.Uint64("max_connections", 0,
88 `If provided, the maximum number of connections to establish before refusing
89 new connections. Defaults to 0 (no limit)`,
90 )
91 fdRlimit = flag.Uint64("fd_rlimit", limits.ExpectedFDs,
92 `Sets the rlimit on the number of open file descriptors for the proxy to
93 the provided value. If set to zero, disables attempts to set the rlimit.
94 Defaults to a value which can support 4K connections to one instance`,
95 )
96 termTimeout = flag.Duration("term_timeout", 0,
97 `When set, the proxy will wait for existing connections to close before
98 terminating. Any connections that haven't closed after the timeout will be
99 dropped`,
100 )
101
102
103 token = flag.String("token", "", "When set, the proxy uses this Bearer token for authorization.")
104 tokenFile = flag.String("credential_file", "",
105 `If provided, this json file will be used to retrieve Service Account
106 credentials. You may set the GOOGLE_APPLICATION_CREDENTIALS environment
107 variable for the same effect.`,
108 )
109 ipAddressTypes = flag.String("ip_address_types", "PUBLIC,PRIVATE",
110 `Default to be 'PUBLIC,PRIVATE'. Options: a list of strings separated by
111 ',', e.g. 'PUBLIC,PRIVATE' `,
112 )
113
114 enableIAMLogin = flag.Bool("enable_iam_login", false, "Enables database user authentication using Cloud SQL's IAM DB Authentication (Postgres only).")
115
116 skipInvalidInstanceConfigs = flag.Bool("skip_failed_instance_config", false,
117 `Setting this flag will allow you to prevent the proxy from terminating
118 when some instance configurations could not be parsed and/or are
119 unavailable.`,
120 )
121
122
123 host = flag.String("host", "",
124 `When set, the proxy uses this host as the base API path. Example:
125 https://sqladmin.googleapis.com`,
126 )
127 quotaProject = flag.String("quota_project", "",
128 `Specifies the project to use for Cloud SQL Admin API quota tracking.`)
129
130
131 useHTTPHealthCheck = flag.Bool("use_http_health_check", false, "When set, creates an HTTP server that checks and communicates the health of the proxy client.")
132 healthCheckPort = flag.String("health_check_port", "8090", "When applicable, health checks take place on this port number. Defaults to 8090.")
133 )
134
135 const (
136 minimumRefreshCfgThrottle = time.Second
137
138 port = 3307
139 )
140
141 func init() {
142 flag.Usage = func() {
143 fmt.Fprintf(os.Stderr, `
144 The Cloud SQL Auth proxy allows simple, secure connectivity to Google Cloud SQL. It
145 is a long-running process that opens local sockets (either TCP or Unix sockets)
146 according to the parameters passed to it. A local application connects to a
147 Cloud SQL instance by using the corresponding socket.
148
149
150 Authorization:
151 * On Google Compute Engine, the default service account is used.
152 The Cloud SQL API must be enabled for the VM.
153
154 * When the gcloud command-line tool is installed on the local machine, the
155 "active account" is used for authentication. Run 'gcloud auth list' to see
156 which accounts are installed on your local machine and
157 'gcloud config list account' to view the active account.
158
159 * To configure the proxy using a service account, pass the -credential_file
160 parameter or set the GOOGLE_APPLICATION_CREDENTIALS environment variable.
161 This will override gcloud or GCE (Google Compute Engine) credentials,
162 if they exist.
163
164 * To configure the proxy using IAM authentication, pass the -enable_iam_login
165 flag. This will cause the proxy to use IAM account credentials for
166 database user authentication.
167
168 General:
169 -quiet
170 Disable log messages (e.g. when new connections are established).
171 WARNING: this option disables ALL logging output (including connection
172 errors), which will likely make debugging difficult. The -quiet flag takes
173 precedence over the -verbose flag.
174
175 -log_debug_stdout
176 When explicitly set to true, verbose and info log messages will be directed
177 to stdout as opposed to the default stderr.
178
179 -verbose
180 When explicitly set to false, disable log messages that are not errors nor
181 first-time startup messages (e.g. when new connections are established).
182
183 -structured_logs
184 When set to true, all log messages are written out as JSON.
185
186 Connection:
187 -instances
188 To connect to a specific list of instances, set the instances parameter
189 to a comma-separated list of instance connection strings. For example:
190
191 -instances=my-project:my-region:my-instance
192
193 For convenience, this flag may be specified multiple times.
194
195 For connectivity over TCP, you must specify a tcp port as part of the
196 instance string. For example, the following example opens a loopback TCP
197 socket on port 3306, which will be proxied to connect to the instance
198 'my-instance' in project 'my-project'. To listen on other interfaces than
199 localhost, a custom bind address (e.g., 0.0.0.0) may be provided. For
200 example:
201
202 -instances=my-project:my-region:my-instance=tcp:3306
203 or
204 -instances=my-project:my-region:my-instance=tcp:0.0.0.0:3306
205
206 When connecting over TCP, the -instances parameter is required.
207
208 To set a custom socket name, you can specify it as part of the instance
209 string. The following example opens a unix socket in the directory
210 specified by -dir, which will be proxied to connect to the instance
211 'my-instance' in project 'my-project':
212
213 -instances=my-project:my-region:my-instance=unix:custom-socket-name
214
215 Note: The directory specified by -dir must exist and the socket file path
216 (i.e., dir plus INSTANCE_CONNECTION_NAME) must be under your platform's
217 limit (typically 108 characters on many Unix systems, but varies by platform).
218
219 To override the -dir parameter, specify an absolute path as shown in the
220 following example:
221
222 -instances=my-project:my-region:my-instance=unix:/my/custom/sql-socket
223
224 Supplying INSTANCES environment variable achieves the same effect. One can
225 use that to keep k8s manifest files constant across multiple environments
226
227 -instances_metadata
228 When running on GCE (Google Compute Engine) you can avoid the need to
229 specify the list of instances on the command line by using the Metadata
230 server. This parameter specifies a path to a metadata value which is then
231 interpreted as a list of instances in the exact same way as the -instances
232 parameter. Updates to the metadata value will be observed and acted on by
233 the Proxy.
234
235 -projects
236 To direct the proxy to allow connections to all instances in specific
237 projects, set the projects parameter:
238
239 -projects=my-project
240
241 -fuse
242 If your local environment has FUSE installed, you can specify the -fuse
243 flag to avoid the requirement to specify instances in advance. With FUSE,
244 any attempts to open a Unix socket in the directory specified by -dir
245 automatically creates that socket and connects to the corresponding
246 instance.
247
248 -dir
249 When using Unix sockets (the default for systems which support them), the
250 Proxy places the sockets in the directory specified by the -dir parameter.
251
252 Automatic instance discovery:
253 If the Google Cloud SQL is installed on the local machine and no instance
254 connection flags are specified, the proxy connects to all instances in the
255 gcloud tool's active project. Run 'gcloud config list project' to
256 display the active project.
257
258
259 Information for all flags:
260 `)
261 flag.VisitAll(func(f *flag.Flag) {
262 usage := strings.Replace(f.Usage, "\n", "\n ", -1)
263 fmt.Fprintf(os.Stderr, " -%s\n %s\n\n", f.Name, usage)
264 })
265 }
266 }
267
268 var defaultTmp = filepath.Join(os.TempDir(), "cloudsql-proxy-tmp")
269
270
271 var versionString = "1.28.0"
272
273
274 var metadataString = ""
275
276
277 func semanticVersion() string {
278 v := versionString
279 if metadataString != "" {
280 v += "+" + metadataString
281 }
282 return v
283 }
284
285
286 func userAgentFromVersionString() string {
287 return "cloud_sql_proxy/" + semanticVersion()
288 }
289
290 const accountErrorSuffix = `Please create a new VM with Cloud SQL access (scope) enabled under "Identity and API access". Alternatively, create a new "service account key" and specify it using the -credential_file parameter`
291
292 type stringListValue []string
293
294 func (i *stringListValue) String() string {
295 return strings.Join(*i, ",")
296 }
297
298 func (i *stringListValue) Set(s string) error {
299 *i = append(*i, stringList(s)...)
300 return nil
301 }
302
303 func checkFlags(onGCE bool) error {
304 if !onGCE {
305 if *instanceSrc != "" {
306 return errors.New("-instances_metadata unsupported outside of Google Compute Engine")
307 }
308 return nil
309 }
310
311 if *token != "" || *tokenFile != "" || os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") != "" {
312 return nil
313 }
314
315
316 _, err := util.GcloudConfig()
317 if err == nil {
318 return nil
319 }
320
321 scopes, err := metadata.Scopes("default")
322 if err != nil {
323 if _, ok := err.(metadata.NotDefinedError); ok {
324 return errors.New("no service account found for this Compute Engine VM. " + accountErrorSuffix)
325 }
326 return fmt.Errorf("error checking scopes: %T %v | %+v", err, err, err)
327 }
328
329 ok := false
330 for _, sc := range scopes {
331 if sc == proxy.SQLScope || sc == "https://www.googleapis.com/auth/cloud-platform" {
332 ok = true
333 break
334 }
335 }
336 if !ok {
337 return errors.New(`the default Compute Engine service account is not configured with sufficient permissions to access the Cloud SQL API from this VM. ` + accountErrorSuffix)
338 }
339 return nil
340 }
341
342 func authenticatedClientFromPath(ctx context.Context, f string) (*http.Client, oauth2.TokenSource, error) {
343 all, err := ioutil.ReadFile(f)
344 if err != nil {
345 return nil, nil, fmt.Errorf("invalid json file %q: %v", f, err)
346 }
347
348 if cfg, err := goauth.JWTConfigFromJSON(all, proxy.SQLScope); err == nil {
349 logging.Infof("using credential file for authentication; email=%s", cfg.Email)
350 return cfg.Client(ctx), cfg.TokenSource(ctx), nil
351 }
352
353 cred, err := goauth.CredentialsFromJSON(ctx, all, proxy.SQLScope)
354 if err != nil {
355 return nil, nil, fmt.Errorf("invalid json file %q: %v", f, err)
356 }
357 logging.Infof("using credential file for authentication; path=%q", f)
358 return oauth2.NewClient(ctx, cred.TokenSource), cred.TokenSource, nil
359 }
360
361 func authenticatedClient(ctx context.Context) (*http.Client, oauth2.TokenSource, error) {
362 if *tokenFile != "" {
363 return authenticatedClientFromPath(ctx, *tokenFile)
364 } else if tok := *token; tok != "" {
365 src := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: tok})
366 return oauth2.NewClient(ctx, src), src, nil
367 } else if f := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS"); f != "" {
368 return authenticatedClientFromPath(ctx, f)
369 }
370
371
372
373 src, err := util.GcloudTokenSource(ctx)
374 if err != nil {
375 src, err = goauth.DefaultTokenSource(ctx, proxy.SQLScope)
376 }
377 if err != nil {
378 return nil, nil, err
379 }
380
381 return oauth2.NewClient(ctx, src), src, nil
382 }
383
384
385
386
387
388
389 type quotaProjectTransport struct {
390 base http.RoundTripper
391 project string
392 }
393
394 var _ http.RoundTripper = quotaProjectTransport{}
395
396
397 func (t quotaProjectTransport) RoundTrip(req *http.Request) (*http.Response, error) {
398 if req.Header == nil {
399 req.Header = make(http.Header)
400 }
401 req.Header.Add("X-Goog-User-Project", t.project)
402 return t.base.RoundTrip(req)
403 }
404
405
406
407 func configureQuotaProject(c *http.Client, project string) {
408
409
410 base := c.Transport
411 if base == nil {
412 base = http.DefaultTransport
413 }
414 c.Transport = quotaProjectTransport{
415 base: base,
416 project: project,
417 }
418 }
419
420 func stringList(s string) []string {
421 spl := strings.Split(s, ",")
422 if len(spl) == 1 && spl[0] == "" {
423 return nil
424 }
425 return spl
426 }
427
428 func listInstances(ctx context.Context, cl *http.Client, projects []string) ([]string, error) {
429 if len(projects) == 0 {
430
431 return nil, nil
432 }
433
434 sql, err := sqladmin.New(cl)
435 if err != nil {
436 return nil, err
437 }
438 if *host != "" {
439 sql.BasePath = *host
440 }
441
442 ch := make(chan string)
443 var wg sync.WaitGroup
444 wg.Add(len(projects))
445 for _, proj := range projects {
446 proj := proj
447 go func() {
448 err := sql.Instances.List(proj).Pages(ctx, func(r *sqladmin.InstancesListResponse) error {
449 for _, in := range r.Items {
450
451 if in.BackendType == "SECOND_GEN" {
452 ch <- in.ConnectionName
453 }
454 }
455 return nil
456 })
457 if err != nil {
458 logging.Errorf("Error listing instances in %v: %v", proj, err)
459 }
460 wg.Done()
461 }()
462 }
463 go func() {
464 wg.Wait()
465 close(ch)
466 }()
467 var ret []string
468 for x := range ch {
469 ret = append(ret, x)
470 }
471 if len(ret) == 0 {
472 return nil, fmt.Errorf("no Cloud SQL Instances found in these projects: %v", projects)
473 }
474 return ret, nil
475 }
476
477 func gcloudProject() ([]string, error) {
478 cfg, err := util.GcloudConfig()
479 if err != nil {
480 return nil, err
481 }
482 if cfg.Configuration.Properties.Core.Project == "" {
483 return nil, fmt.Errorf("gcloud has no active project, you can set it by running `gcloud config set project <project>`")
484 }
485 return []string{cfg.Configuration.Properties.Core.Project}, nil
486 }
487
488 func runProxy() int {
489 flag.Var(&instances, "instances",
490 `Comma-separated list of fully qualified instances (project:region:name)
491 to connect to. If the name has the suffix '=tcp:port', a TCP server is opened
492 on the specified port on localhost to proxy to that instance. It is also possible
493 to listen on a custom address by providing a host, e.g., '=tcp:0.0.0.0:port'. If
494 no value is provided for 'tcp', one socket file per instance is opened in 'dir'.
495 For convenience, this flag may be specified multiple times.
496 You may use the INSTANCES environment variable for the same effect. Using both will
497 use the value from the flag, Not compatible with -fuse.`,
498 )
499
500 flag.Parse()
501
502 if *version {
503 fmt.Println("Cloud SQL Auth proxy:", semanticVersion())
504 return 0
505 }
506
507 if *logDebugStdout {
508 logging.LogDebugToStdout()
509 }
510
511 if !*verbose {
512 logging.LogVerboseToNowhere()
513 }
514
515 if *structuredLogs {
516 cleanup, err := logging.EnableStructuredLogs(*logDebugStdout, *verbose)
517 if err != nil {
518 logging.Errorf("failed to enable structured logs: %v", err)
519 return 1
520 }
521 defer cleanup()
522 }
523
524 if *quiet {
525 logging.Infof("Cloud SQL Auth proxy logging has been disabled by the -quiet flag. All messages (including errors) will be suppressed.")
526 logging.DisableLogging()
527 }
528
529
530 ipAddrTypeOptsInput := strings.Split(*ipAddressTypes, ",")
531
532 if *fdRlimit != 0 {
533 if err := limits.SetupFDLimits(*fdRlimit); err != nil {
534 logging.Infof("failed to setup file descriptor limits: %v", err)
535 }
536 }
537
538 if *host != "" && !strings.HasSuffix(*host, "/") {
539 logging.Errorf("Flag host should always end with /")
540 flag.PrintDefaults()
541 return 0
542 }
543
544
545
546 if envInstances := os.Getenv("INSTANCES"); len(instances) == 0 && envInstances != "" {
547 instances.Set(envInstances)
548 }
549
550 projList := stringList(*projects)
551
552 if len(instances) == 0 && *instanceSrc == "" && len(projList) == 0 && !*useFuse {
553 var err error
554 projList, err = gcloudProject()
555 if err == nil {
556 logging.Infof("Using gcloud's active project: %v", projList)
557 } else if gErr, ok := err.(*util.GcloudError); ok && gErr.Status == util.GcloudNotFound {
558 logging.Errorf("gcloud is not in the path and -instances and -projects are empty")
559 return 1
560 } else {
561 logging.Errorf("unable to retrieve the active gcloud project and -instances and -projects are empty: %v", err)
562 return 1
563 }
564 }
565
566 onGCE := metadata.OnGCE()
567 if err := checkFlags(onGCE); err != nil {
568 logging.Errorf(err.Error())
569 return 1
570 }
571
572 ctx, cancel := context.WithCancel(context.Background())
573 client, tokSrc, err := authenticatedClient(ctx)
574 if err != nil {
575 logging.Errorf(err.Error())
576 return 1
577 }
578
579 if *quotaProject != "" {
580 logging.Infof("Using the project %q for SQL Admin API quota", *quotaProject)
581 configureQuotaProject(client, *quotaProject)
582 }
583
584 ins, err := listInstances(ctx, client, projList)
585 if err != nil {
586 logging.Errorf(err.Error())
587 return 1
588 }
589 instances = append(instances, ins...)
590 cfgs, err := CreateInstanceConfigs(*dir, *useFuse, instances, *instanceSrc, client, *skipInvalidInstanceConfigs)
591 if err != nil {
592 logging.Errorf(err.Error())
593 return 1
594 }
595
596
597
598 var connset *proxy.ConnSet
599 if *useFuse {
600 connset = proxy.NewConnSet()
601 }
602
603
604 refreshCfgThrottle := *refreshCfgThrottle
605 if refreshCfgThrottle < minimumRefreshCfgThrottle {
606 refreshCfgThrottle = minimumRefreshCfgThrottle
607 }
608 refreshCfgBuffer := proxy.DefaultRefreshCfgBuffer
609 if *enableIAMLogin {
610 refreshCfgThrottle = proxy.IAMLoginRefreshThrottle
611 refreshCfgBuffer = proxy.IAMLoginRefreshCfgBuffer
612 }
613 proxyClient := &proxy.Client{
614 Port: port,
615 MaxConnections: *maxConnections,
616 Certs: certs.NewCertSourceOpts(client, certs.RemoteOpts{
617 APIBasePath: *host,
618 IgnoreRegion: !*checkRegion,
619 UserAgent: userAgentFromVersionString(),
620 IPAddrTypeOpts: ipAddrTypeOptsInput,
621 EnableIAMLogin: *enableIAMLogin,
622 TokenSource: tokSrc,
623 }),
624 Conns: connset,
625 RefreshCfgThrottle: refreshCfgThrottle,
626 RefreshCfgBuffer: refreshCfgBuffer,
627 }
628
629 var hc *healthcheck.Server
630 if *useHTTPHealthCheck {
631
632 var insts []string
633 for _, cfg := range cfgs {
634 insts = append(insts, cfg.Instance)
635 }
636 hc, err = healthcheck.NewServer(proxyClient, *healthCheckPort, insts)
637 if err != nil {
638 logging.Errorf("[Health Check] Could not initialize health check server: %v", err)
639 return 1
640 }
641 defer hc.Close(ctx)
642 }
643
644
645 var connSrc <-chan proxy.Conn
646 if *useFuse {
647 c, fuse, err := fuse.NewConnSrc(*dir, *fuseTmp, proxyClient, connset)
648 if err != nil {
649 logging.Errorf("Could not start fuse directory at %q: %v", *dir, err)
650 return 1
651 }
652 connSrc = c
653 defer fuse.Close()
654 } else {
655 updates := make(chan string)
656 if *instanceSrc != "" {
657 go func() {
658 for {
659 err := metadata.Subscribe(*instanceSrc, func(v string, ok bool) error {
660 if ok {
661 updates <- v
662 }
663 return nil
664 })
665 if err != nil {
666 logging.Errorf("Error on receiving new instances from metadata: %v", err)
667 }
668 time.Sleep(5 * time.Second)
669 }
670 }()
671 }
672
673 c, err := WatchInstances(*dir, cfgs, updates, client)
674 if err != nil {
675 logging.Errorf(err.Error())
676 return 1
677 }
678 connSrc = c
679 }
680
681 logging.Infof("Ready for new connections")
682
683 if hc != nil {
684 hc.NotifyStarted()
685 }
686
687 signals := make(chan os.Signal, 1)
688 signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
689
690 shutdown := make(chan int, 1)
691 go func() {
692 defer func() { cancel(); close(shutdown) }()
693 <-signals
694 logging.Infof("Received TERM signal. Waiting up to %s before terminating.", *termTimeout)
695 go func() {
696 if _, err := daemon.SdNotify(false, daemon.SdNotifyStopping); err != nil {
697 logging.Errorf("Failed to notify systemd of termination: %v", err)
698 }
699 }()
700
701 err := proxyClient.Shutdown(*termTimeout)
702 if err != nil {
703 logging.Errorf("Error during SIGTERM shutdown: %v", err)
704 shutdown <- 2
705 return
706 }
707 }()
708
709
710
711
712 go func() {
713 if _, err := daemon.SdNotify(false, daemon.SdNotifyReady); err != nil {
714 logging.Errorf("Failed to notify systemd of readiness: %v", err)
715 }
716 }()
717 proxyClient.RunContext(ctx, connSrc)
718 if code, ok := <-shutdown; ok {
719 return code
720 }
721 return 0
722 }
723
724 func main() {
725 code := runProxy()
726 os.Exit(code)
727 }
728
View as plain text