...

Source file src/github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/fuse/fuse.go

Documentation: github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/fuse

     1  // Copyright 2015 Google Inc. All Rights Reserved.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  //go:build !windows && !openbsd
    16  // +build !windows,!openbsd
    17  
    18  // Package fuse provides a connection source wherein the user does not need to
    19  // specify which instance they are connecting to before they start the
    20  // executable. Instead, simply attempting to access a file in the provided
    21  // directory will transparently create a proxied connection to an instance
    22  // which has that name.
    23  //
    24  // Specifically, given that NewConnSrc was called with the mounting directory
    25  // as /cloudsql:
    26  //
    27  // 1) Execute `mysql -S /cloudsql/speckle:instance`
    28  // 2) The 'mysql' executable looks up the file "speckle:instance" inside "/cloudsql"
    29  // 3) This lookup is intercepted by the code in this package. A local unix socket
    30  //    located in a temporary directory is opened for listening and the lookup for
    31  //    "speckle:instance" returns to mysql saying that it is a symbolic link
    32  //    pointing to this new local socket.
    33  // 4) mysql dials the local unix socket, creating a new connection to the
    34  //    specified instance.
    35  package fuse
    36  
    37  import (
    38  	"bytes"
    39  	"errors"
    40  	"fmt"
    41  	"io"
    42  	"net"
    43  	"os"
    44  	"path/filepath"
    45  	"strings"
    46  	"sync"
    47  	"syscall"
    48  	"time"
    49  
    50  	"github.com/GoogleCloudPlatform/cloudsql-proxy/logging"
    51  	"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy"
    52  	"github.com/hanwen/go-fuse/v2/fs"
    53  	"github.com/hanwen/go-fuse/v2/fuse"
    54  	"github.com/hanwen/go-fuse/v2/fuse/nodefs"
    55  	"golang.org/x/net/context"
    56  )
    57  
    58  // NewConnSrc returns a source of new connections based on Lookups in the
    59  // provided mount directory. If there isn't a directory located at tmpdir one
    60  // is created. The second return parameter can be used to shutdown and release
    61  // any resources. As a result of this shutdown, or during any other fatal
    62  // error, the returned chan will be closed.
    63  //
    64  // The connset parameter is optional.
    65  func NewConnSrc(mountdir, tmpdir string, client *proxy.Client, connset *proxy.ConnSet) (<-chan proxy.Conn, io.Closer, error) {
    66  	if err := os.MkdirAll(tmpdir, 0777); err != nil {
    67  		return nil, nil, err
    68  	}
    69  	if connset == nil {
    70  		// Make a dummy one.
    71  		connset = proxy.NewConnSet()
    72  	}
    73  	conns := make(chan proxy.Conn, 1)
    74  	root := &fsRoot{
    75  		tmpDir:  tmpdir,
    76  		linkDir: mountdir,
    77  		dst:     conns,
    78  		links:   make(map[string]*symlink),
    79  		connset: connset,
    80  		client:  client,
    81  	}
    82  
    83  	srv, err := fs.Mount(mountdir, root, &fs.Options{
    84  		MountOptions: fuse.MountOptions{AllowOther: true},
    85  	})
    86  	if err != nil {
    87  		return nil, nil, fmt.Errorf("FUSE mount failed: %q: %v", mountdir, err)
    88  	}
    89  
    90  	closer := fuseCloser(func() error {
    91  		err := srv.Unmount() // Best effort unmount
    92  		if err != nil {
    93  			logging.Errorf("Unmount failed: %v", err)
    94  		}
    95  		return root.Close()
    96  	})
    97  	return conns, closer, nil
    98  }
    99  
   100  type fuseCloser func() error
   101  
   102  func (fc fuseCloser) Close() error {
   103  	return fc()
   104  }
   105  
   106  // symlink implements a symbolic link, returning the underlying path when
   107  // Readlink is called.
   108  type symlink struct {
   109  	fs.Inode
   110  	path string
   111  }
   112  
   113  var _ fs.NodeReadlinker = &symlink{}
   114  
   115  func (s *symlink) Readlink(ctx context.Context) ([]byte, syscall.Errno) {
   116  	return []byte(s.path), fs.OK
   117  }
   118  
   119  // fsRoot provides the in-memory file system that supports lazy connections to
   120  // Cloud SQL instances.
   121  type fsRoot struct {
   122  	fs.Inode
   123  
   124  	// tmpDir defines a temporary directory where all the sockets are placed
   125  	// faciliating connections to Cloud SQL instances.
   126  	tmpDir string
   127  	// linkDir is the directory that holds symbolic links to the tmp dir for
   128  	// each Cloud SQL instance connection. After shutdown, this directory is
   129  	// cleaned out.
   130  	linkDir string
   131  
   132  	client  *proxy.Client
   133  	connset *proxy.ConnSet
   134  
   135  	// sockLock protects fields in this struct related to sockets; specifically
   136  	// 'links' and 'closers'.
   137  	sockLock sync.Mutex
   138  	links    map[string]*symlink
   139  	// closers includes a reference to all open Unix socket listeners. When
   140  	// fs.Close is called, all of these listeners are also closed.
   141  	closers []io.Closer
   142  
   143  	sync.RWMutex
   144  	dst chan<- proxy.Conn
   145  }
   146  
   147  var _ interface {
   148  	fs.InodeEmbedder
   149  	fs.NodeGetattrer
   150  	fs.NodeLookuper
   151  	fs.NodeReaddirer
   152  } = &fsRoot{}
   153  
   154  func (r *fsRoot) newConn(instance string, c net.Conn) {
   155  	r.RLock()
   156  	// dst will be nil if Close has been called already.
   157  	if ch := r.dst; ch != nil {
   158  		ch <- proxy.Conn{Instance: instance, Conn: c}
   159  	} else {
   160  		logging.Errorf("Ignored new conn request to %q: system has been closed", instance)
   161  	}
   162  	r.RUnlock()
   163  }
   164  
   165  // Close shuts down the fsRoot filesystem and closes all open Unix socket
   166  // listeners.
   167  func (r *fsRoot) Close() error {
   168  	r.Lock()
   169  	if r.dst != nil {
   170  		// Since newConn only sends on dst while holding a reader lock, holding the
   171  		// writer lock is sufficient to ensure there are no pending sends on the
   172  		// channel when it is closed.
   173  		close(r.dst)
   174  		// Setting it to nil prevents further sends.
   175  		r.dst = nil
   176  	}
   177  	r.Unlock()
   178  
   179  	var errs bytes.Buffer
   180  	r.sockLock.Lock()
   181  	for _, c := range r.closers {
   182  		if err := c.Close(); err != nil {
   183  			fmt.Fprintln(&errs, err)
   184  		}
   185  	}
   186  	r.sockLock.Unlock()
   187  
   188  	if errs.Len() == 0 {
   189  		return nil
   190  	}
   191  	logging.Errorf("Close %q: %v", r.linkDir, errs.String())
   192  	return errors.New(errs.String())
   193  }
   194  
   195  // Getattr implements fs.NodeGetattrer and represents fsRoot as a directory.
   196  func (r *fsRoot) Getattr(ctx context.Context, f fs.FileHandle, out *fuse.AttrOut) syscall.Errno {
   197  	*out = fuse.AttrOut{Attr: fuse.Attr{
   198  		Mode: 0555 | fuse.S_IFDIR,
   199  	}}
   200  	return fs.OK
   201  }
   202  
   203  // Lookup implements fs.NodeLookuper and handles all requests, either for the
   204  // README, or for a new connection to a Cloud SQL instance. When receiving a
   205  // request for a Cloud SQL instance, Lookup will return a symlink to a Unix
   206  // socket that provides connectivity to a remote instance.
   207  func (r *fsRoot) Lookup(ctx context.Context, instance string, out *fuse.EntryOut) (*fs.Inode, syscall.Errno) {
   208  	if instance == "README" {
   209  		return r.NewInode(ctx, &readme{}, fs.StableAttr{}), fs.OK
   210  	}
   211  	r.sockLock.Lock()
   212  	defer r.sockLock.Unlock()
   213  
   214  	if _, _, _, _, err := proxy.ParseInstanceConnectionName(instance); err != nil {
   215  		return nil, syscall.ENOENT
   216  	}
   217  
   218  	if ret, ok := r.links[instance]; ok {
   219  		return ret.EmbeddedInode(), fs.OK
   220  	}
   221  
   222  	// path is the location of the Unix socket
   223  	path := filepath.Join(r.tmpDir, instance)
   224  	os.RemoveAll(path) // Best effort; the following will fail if this does.
   225  	// linkpath is the location the symlink points to
   226  	linkpath := path
   227  
   228  	// Add a ".s.PGSQL.5432" suffix to path for Postgres instances
   229  	if r.client != nil {
   230  		version, err := r.client.InstanceVersionContext(ctx, instance)
   231  		if err != nil {
   232  			logging.Errorf("Failed to get Instance version for %s: %v", instance, err)
   233  			return nil, syscall.ENOENT
   234  		}
   235  		if strings.HasPrefix(strings.ToLower(version), "postgres") {
   236  			if err := os.MkdirAll(path, 0755); err != nil {
   237  				logging.Errorf("Failed to create path %s: %v", path, err)
   238  				return nil, syscall.EIO
   239  			}
   240  			path = filepath.Join(linkpath, ".s.PGSQL.5432")
   241  		}
   242  	}
   243  	// TODO: check path length -- if it exceeds the max supported socket length,
   244  	// return an error that helps the user understand what went wrong.
   245  	// Otherwise, we get a "bind: invalid argument" error.
   246  
   247  	sock, err := net.Listen("unix", path)
   248  	if err != nil {
   249  		logging.Errorf("couldn't listen at %q: %v", path, err)
   250  		return nil, syscall.EEXIST
   251  	}
   252  	if err := os.Chmod(path, 0777|os.ModeSocket); err != nil {
   253  		logging.Errorf("couldn't update permissions for socket file %q: %v; other users may be unable to connect", path, err)
   254  	}
   255  
   256  	go r.listenerLifecycle(sock, instance, path)
   257  
   258  	ret := &symlink{path: linkpath}
   259  	inode := r.NewInode(ctx, ret, fs.StableAttr{Mode: 0777 | fuse.S_IFLNK})
   260  	r.links[instance] = ret
   261  	// TODO(chowski): memory leak when listeners exit on their own via removeListener.
   262  	r.closers = append(r.closers, sock)
   263  
   264  	return inode, fs.OK
   265  }
   266  
   267  // removeListener marks that a Listener for an instance has exited and is no
   268  // longer serving new connections.
   269  func (r *fsRoot) removeListener(instance, path string) {
   270  	r.sockLock.Lock()
   271  	defer r.sockLock.Unlock()
   272  	v, ok := r.links[instance]
   273  	if ok && v.path == path {
   274  		delete(r.links, instance)
   275  	} else {
   276  		logging.Errorf("Removing a listener for %q at %q which was already replaced", instance, path)
   277  	}
   278  }
   279  
   280  // listenerLifecycle calls l.Accept in a loop, and for each new connection
   281  // r.newConn is called. After the Listener returns an error it is removed.
   282  func (r *fsRoot) listenerLifecycle(l net.Listener, instance, path string) {
   283  	for {
   284  		start := time.Now()
   285  		c, err := l.Accept()
   286  		if err != nil {
   287  			logging.Errorf("error in Accept for %q: %v", instance, err)
   288  			if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
   289  				d := 10*time.Millisecond - time.Since(start)
   290  				if d > 0 {
   291  					time.Sleep(d)
   292  				}
   293  				continue
   294  			}
   295  			break
   296  		}
   297  		r.newConn(instance, c)
   298  	}
   299  	r.removeListener(instance, path)
   300  	l.Close()
   301  	if err := os.Remove(path); err != nil {
   302  		logging.Errorf("couldn't remove %q: %v", path, err)
   303  	}
   304  }
   305  
   306  // Readdir implements fs.NodeReaddirer and returns a list of files for each
   307  // instance to which the proxy is actively connected. In addition, the list
   308  // includes a README.
   309  func (r *fsRoot) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno) {
   310  	activeConns := r.connset.IDs()
   311  	entries := []fuse.DirEntry{
   312  		{Name: "README", Mode: 0555 | fuse.S_IFREG},
   313  	}
   314  	for _, conn := range activeConns {
   315  		entries = append(entries, fuse.DirEntry{
   316  			Name: conn,
   317  			Mode: 0777 | syscall.S_IFSOCK,
   318  		})
   319  	}
   320  	ds := fs.NewListDirStream(entries)
   321  	return ds, fs.OK
   322  }
   323  
   324  // readme represents a static read-only text file.
   325  type readme struct {
   326  	fs.Inode
   327  }
   328  
   329  var _ interface {
   330  	fs.InodeEmbedder
   331  	fs.NodeGetattrer
   332  	fs.NodeReader
   333  	fs.NodeOpener
   334  } = &readme{}
   335  
   336  const readmeText = `
   337  When programs attempt to open files in this directory, a remote connection to
   338  the Cloud SQL instance of the same name will be established.
   339  
   340  That is, running:
   341  
   342  	mysql -u root -S "/path/to/this/directory/project:region:instance-2"
   343  	-or-
   344  	psql "host=/path/to/this/directory/project:region:instance-2 dbname=mydb user=myuser"
   345  
   346  will open a new connection to the specified instance, given you have the correct
   347  permissions.
   348  
   349  Listing the contents of this directory will show all instances with active
   350  connections.
   351  `
   352  
   353  // Getattr implements fs.NodeGetattrer and indicates that this file is a regular
   354  // file.
   355  func (*readme) Getattr(ctx context.Context, f fs.FileHandle, out *fuse.AttrOut) syscall.Errno {
   356  	*out = fuse.AttrOut{Attr: fuse.Attr{
   357  		Mode: 0444 | syscall.S_IFREG,
   358  		Size: uint64(len(readmeText)),
   359  	}}
   360  	return fs.OK
   361  }
   362  
   363  // Read implements fs.NodeReader and supports incremental reads.
   364  func (*readme) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) {
   365  	end := int(off) + len(dest)
   366  	if end > len(readmeText) {
   367  		end = len(readmeText)
   368  	}
   369  	return fuse.ReadResultData([]byte(readmeText[off:end])), fs.OK
   370  }
   371  
   372  // Open implements fs.NodeOpener and supports opening the README as a read-only
   373  // file.
   374  func (*readme) Open(ctx context.Context, mode uint32) (fs.FileHandle, uint32, syscall.Errno) {
   375  	df := nodefs.NewDataFile([]byte(readmeText))
   376  	rf := nodefs.NewReadOnlyFile(df)
   377  	return rf, 0, fs.OK
   378  }
   379  

View as plain text