1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 package fuse
36
37 import (
38 "bytes"
39 "errors"
40 "fmt"
41 "io"
42 "net"
43 "os"
44 "path/filepath"
45 "strings"
46 "sync"
47 "syscall"
48 "time"
49
50 "github.com/GoogleCloudPlatform/cloudsql-proxy/logging"
51 "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/proxy"
52 "github.com/hanwen/go-fuse/v2/fs"
53 "github.com/hanwen/go-fuse/v2/fuse"
54 "github.com/hanwen/go-fuse/v2/fuse/nodefs"
55 "golang.org/x/net/context"
56 )
57
58
59
60
61
62
63
64
65 func NewConnSrc(mountdir, tmpdir string, client *proxy.Client, connset *proxy.ConnSet) (<-chan proxy.Conn, io.Closer, error) {
66 if err := os.MkdirAll(tmpdir, 0777); err != nil {
67 return nil, nil, err
68 }
69 if connset == nil {
70
71 connset = proxy.NewConnSet()
72 }
73 conns := make(chan proxy.Conn, 1)
74 root := &fsRoot{
75 tmpDir: tmpdir,
76 linkDir: mountdir,
77 dst: conns,
78 links: make(map[string]*symlink),
79 connset: connset,
80 client: client,
81 }
82
83 srv, err := fs.Mount(mountdir, root, &fs.Options{
84 MountOptions: fuse.MountOptions{AllowOther: true},
85 })
86 if err != nil {
87 return nil, nil, fmt.Errorf("FUSE mount failed: %q: %v", mountdir, err)
88 }
89
90 closer := fuseCloser(func() error {
91 err := srv.Unmount()
92 if err != nil {
93 logging.Errorf("Unmount failed: %v", err)
94 }
95 return root.Close()
96 })
97 return conns, closer, nil
98 }
99
100 type fuseCloser func() error
101
102 func (fc fuseCloser) Close() error {
103 return fc()
104 }
105
106
107
108 type symlink struct {
109 fs.Inode
110 path string
111 }
112
113 var _ fs.NodeReadlinker = &symlink{}
114
115 func (s *symlink) Readlink(ctx context.Context) ([]byte, syscall.Errno) {
116 return []byte(s.path), fs.OK
117 }
118
119
120
121 type fsRoot struct {
122 fs.Inode
123
124
125
126 tmpDir string
127
128
129
130 linkDir string
131
132 client *proxy.Client
133 connset *proxy.ConnSet
134
135
136
137 sockLock sync.Mutex
138 links map[string]*symlink
139
140
141 closers []io.Closer
142
143 sync.RWMutex
144 dst chan<- proxy.Conn
145 }
146
147 var _ interface {
148 fs.InodeEmbedder
149 fs.NodeGetattrer
150 fs.NodeLookuper
151 fs.NodeReaddirer
152 } = &fsRoot{}
153
154 func (r *fsRoot) newConn(instance string, c net.Conn) {
155 r.RLock()
156
157 if ch := r.dst; ch != nil {
158 ch <- proxy.Conn{Instance: instance, Conn: c}
159 } else {
160 logging.Errorf("Ignored new conn request to %q: system has been closed", instance)
161 }
162 r.RUnlock()
163 }
164
165
166
167 func (r *fsRoot) Close() error {
168 r.Lock()
169 if r.dst != nil {
170
171
172
173 close(r.dst)
174
175 r.dst = nil
176 }
177 r.Unlock()
178
179 var errs bytes.Buffer
180 r.sockLock.Lock()
181 for _, c := range r.closers {
182 if err := c.Close(); err != nil {
183 fmt.Fprintln(&errs, err)
184 }
185 }
186 r.sockLock.Unlock()
187
188 if errs.Len() == 0 {
189 return nil
190 }
191 logging.Errorf("Close %q: %v", r.linkDir, errs.String())
192 return errors.New(errs.String())
193 }
194
195
196 func (r *fsRoot) Getattr(ctx context.Context, f fs.FileHandle, out *fuse.AttrOut) syscall.Errno {
197 *out = fuse.AttrOut{Attr: fuse.Attr{
198 Mode: 0555 | fuse.S_IFDIR,
199 }}
200 return fs.OK
201 }
202
203
204
205
206
207 func (r *fsRoot) Lookup(ctx context.Context, instance string, out *fuse.EntryOut) (*fs.Inode, syscall.Errno) {
208 if instance == "README" {
209 return r.NewInode(ctx, &readme{}, fs.StableAttr{}), fs.OK
210 }
211 r.sockLock.Lock()
212 defer r.sockLock.Unlock()
213
214 if _, _, _, _, err := proxy.ParseInstanceConnectionName(instance); err != nil {
215 return nil, syscall.ENOENT
216 }
217
218 if ret, ok := r.links[instance]; ok {
219 return ret.EmbeddedInode(), fs.OK
220 }
221
222
223 path := filepath.Join(r.tmpDir, instance)
224 os.RemoveAll(path)
225
226 linkpath := path
227
228
229 if r.client != nil {
230 version, err := r.client.InstanceVersionContext(ctx, instance)
231 if err != nil {
232 logging.Errorf("Failed to get Instance version for %s: %v", instance, err)
233 return nil, syscall.ENOENT
234 }
235 if strings.HasPrefix(strings.ToLower(version), "postgres") {
236 if err := os.MkdirAll(path, 0755); err != nil {
237 logging.Errorf("Failed to create path %s: %v", path, err)
238 return nil, syscall.EIO
239 }
240 path = filepath.Join(linkpath, ".s.PGSQL.5432")
241 }
242 }
243
244
245
246
247 sock, err := net.Listen("unix", path)
248 if err != nil {
249 logging.Errorf("couldn't listen at %q: %v", path, err)
250 return nil, syscall.EEXIST
251 }
252 if err := os.Chmod(path, 0777|os.ModeSocket); err != nil {
253 logging.Errorf("couldn't update permissions for socket file %q: %v; other users may be unable to connect", path, err)
254 }
255
256 go r.listenerLifecycle(sock, instance, path)
257
258 ret := &symlink{path: linkpath}
259 inode := r.NewInode(ctx, ret, fs.StableAttr{Mode: 0777 | fuse.S_IFLNK})
260 r.links[instance] = ret
261
262 r.closers = append(r.closers, sock)
263
264 return inode, fs.OK
265 }
266
267
268
269 func (r *fsRoot) removeListener(instance, path string) {
270 r.sockLock.Lock()
271 defer r.sockLock.Unlock()
272 v, ok := r.links[instance]
273 if ok && v.path == path {
274 delete(r.links, instance)
275 } else {
276 logging.Errorf("Removing a listener for %q at %q which was already replaced", instance, path)
277 }
278 }
279
280
281
282 func (r *fsRoot) listenerLifecycle(l net.Listener, instance, path string) {
283 for {
284 start := time.Now()
285 c, err := l.Accept()
286 if err != nil {
287 logging.Errorf("error in Accept for %q: %v", instance, err)
288 if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
289 d := 10*time.Millisecond - time.Since(start)
290 if d > 0 {
291 time.Sleep(d)
292 }
293 continue
294 }
295 break
296 }
297 r.newConn(instance, c)
298 }
299 r.removeListener(instance, path)
300 l.Close()
301 if err := os.Remove(path); err != nil {
302 logging.Errorf("couldn't remove %q: %v", path, err)
303 }
304 }
305
306
307
308
309 func (r *fsRoot) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno) {
310 activeConns := r.connset.IDs()
311 entries := []fuse.DirEntry{
312 {Name: "README", Mode: 0555 | fuse.S_IFREG},
313 }
314 for _, conn := range activeConns {
315 entries = append(entries, fuse.DirEntry{
316 Name: conn,
317 Mode: 0777 | syscall.S_IFSOCK,
318 })
319 }
320 ds := fs.NewListDirStream(entries)
321 return ds, fs.OK
322 }
323
324
325 type readme struct {
326 fs.Inode
327 }
328
329 var _ interface {
330 fs.InodeEmbedder
331 fs.NodeGetattrer
332 fs.NodeReader
333 fs.NodeOpener
334 } = &readme{}
335
336 const readmeText = `
337 When programs attempt to open files in this directory, a remote connection to
338 the Cloud SQL instance of the same name will be established.
339
340 That is, running:
341
342 mysql -u root -S "/path/to/this/directory/project:region:instance-2"
343 -or-
344 psql "host=/path/to/this/directory/project:region:instance-2 dbname=mydb user=myuser"
345
346 will open a new connection to the specified instance, given you have the correct
347 permissions.
348
349 Listing the contents of this directory will show all instances with active
350 connections.
351 `
352
353
354
355 func (*readme) Getattr(ctx context.Context, f fs.FileHandle, out *fuse.AttrOut) syscall.Errno {
356 *out = fuse.AttrOut{Attr: fuse.Attr{
357 Mode: 0444 | syscall.S_IFREG,
358 Size: uint64(len(readmeText)),
359 }}
360 return fs.OK
361 }
362
363
364 func (*readme) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) {
365 end := int(off) + len(dest)
366 if end > len(readmeText) {
367 end = len(readmeText)
368 }
369 return fuse.ReadResultData([]byte(readmeText[off:end])), fs.OK
370 }
371
372
373
374 func (*readme) Open(ctx context.Context, mode uint32) (fs.FileHandle, uint32, syscall.Errno) {
375 df := nodefs.NewDataFile([]byte(readmeText))
376 rf := nodefs.NewReadOnlyFile(df)
377 return rf, 0, fs.OK
378 }
379
View as plain text