...

Source file src/github.com/Shopify/go-storage/cloudstorage_fs.go

Documentation: github.com/Shopify/go-storage

     1  package storage
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"io"
     8  	"sync"
     9  	"time"
    10  
    11  	gstorage "cloud.google.com/go/storage"
    12  	"golang.org/x/oauth2/google"
    13  	"google.golang.org/api/googleapi"
    14  	"google.golang.org/api/iterator"
    15  	"google.golang.org/api/option"
    16  )
    17  
    18  // NewCloudStorageFS creates a Google Cloud Storage FS
    19  // credentials can be nil to use the default GOOGLE_APPLICATION_CREDENTIALS
    20  func NewCloudStorageFS(bucket string, credentials *google.Credentials) FS {
    21  	return &cloudStorageFS{
    22  		bucketName:  bucket,
    23  		credentials: credentials,
    24  	}
    25  }
    26  
    27  // cloudStorageFS implements FS and uses Google Cloud Storage as the underlying
    28  // file storage.
    29  type cloudStorageFS struct {
    30  	bucketName  string
    31  	credentials *google.Credentials
    32  
    33  	bucketLock   sync.RWMutex
    34  	bucket       *gstorage.BucketHandle
    35  	bucketScopes Scope
    36  }
    37  
    38  func (c *cloudStorageFS) URL(ctx context.Context, path string, options *SignedURLOptions) (string, error) {
    39  	if options == nil {
    40  		options = &SignedURLOptions{}
    41  	}
    42  	options.applyDefaults()
    43  
    44  	b, err := c.bucketHandle(ctx, ScopeSignURL)
    45  	if err != nil {
    46  		return "", err
    47  	}
    48  
    49  	return b.SignedURL(path, &gstorage.SignedURLOptions{
    50  		Method:  options.Method,
    51  		Expires: time.Now().Add(options.Expiry),
    52  	})
    53  }
    54  
    55  // Open implements FS.
    56  func (c *cloudStorageFS) Open(ctx context.Context, path string, options *ReaderOptions) (*File, error) {
    57  	b, err := c.bucketHandle(ctx, ScopeRead)
    58  	if err != nil {
    59  		return nil, err
    60  	}
    61  
    62  	obj := b.Object(path)
    63  	if options != nil {
    64  		obj = obj.ReadCompressed(options.ReadCompressed)
    65  	}
    66  
    67  	f, err := obj.NewReader(ctx)
    68  	if err != nil {
    69  		if errors.Is(err, gstorage.ErrObjectNotExist) {
    70  			return nil, &notExistError{
    71  				Path: path,
    72  			}
    73  		}
    74  
    75  		return nil, err
    76  	}
    77  
    78  	return &File{
    79  		ReadCloser: f,
    80  		Attributes: Attributes{
    81  			ContentType:     f.Attrs.ContentType,
    82  			ContentEncoding: f.Attrs.ContentEncoding,
    83  			ModTime:         f.Attrs.LastModified,
    84  			Size:            f.Attrs.Size,
    85  		},
    86  	}, nil
    87  }
    88  
    89  // Attributes implements FS.
    90  func (c *cloudStorageFS) Attributes(ctx context.Context, path string, _ *ReaderOptions) (*Attributes, error) {
    91  	b, err := c.bucketHandle(ctx, ScopeRead)
    92  	if err != nil {
    93  		return nil, err
    94  	}
    95  
    96  	a, err := b.Object(path).Attrs(ctx)
    97  	if err != nil {
    98  		return nil, err
    99  	}
   100  
   101  	return &Attributes{
   102  		ContentType:     a.ContentType,
   103  		ContentEncoding: a.ContentEncoding,
   104  		Metadata:        a.Metadata,
   105  		ModTime:         a.Updated,
   106  		CreationTime:    a.Created,
   107  		Size:            a.Size,
   108  	}, nil
   109  }
   110  
   111  // Create implements FS.
   112  func (c *cloudStorageFS) Create(ctx context.Context, path string, options *WriterOptions) (io.WriteCloser, error) {
   113  	b, err := c.bucketHandle(ctx, ScopeWrite)
   114  	if err != nil {
   115  		return nil, err
   116  	}
   117  
   118  	w := b.Object(path).NewWriter(ctx)
   119  
   120  	if options != nil {
   121  		w.Metadata = options.Attributes.Metadata
   122  		w.ContentType = options.Attributes.ContentType
   123  		w.ContentEncoding = options.Attributes.ContentEncoding
   124  		w.ChunkSize = options.BufferSize
   125  	}
   126  	w.ChunkSize = c.chunkSize(w.ChunkSize)
   127  
   128  	return w, nil
   129  }
   130  
   131  func (c *cloudStorageFS) chunkSize(size int) int {
   132  	if size == 0 {
   133  		return googleapi.DefaultUploadChunkSize
   134  	} else if size > 0 {
   135  		return size
   136  	}
   137  
   138  	return 0 // disable buffering
   139  }
   140  
   141  // Delete implements FS.
   142  func (c *cloudStorageFS) Delete(ctx context.Context, path string) error {
   143  	b, err := c.bucketHandle(ctx, ScopeDelete)
   144  	if err != nil {
   145  		return err
   146  	}
   147  
   148  	return b.Object(path).Delete(ctx)
   149  }
   150  
   151  // Walk implements FS.
   152  func (c *cloudStorageFS) Walk(ctx context.Context, path string, fn WalkFn) error {
   153  	bh, err := c.bucketHandle(ctx, ScopeRead)
   154  	if err != nil {
   155  		return err
   156  	}
   157  
   158  	it := bh.Objects(ctx, &gstorage.Query{
   159  		Prefix: path,
   160  	})
   161  
   162  	for {
   163  		r, err := it.Next()
   164  		if errors.Is(err, iterator.Done) {
   165  			break
   166  		}
   167  		if err != nil {
   168  			// TODO(dhowden): Properly handle this error.
   169  			return err
   170  		}
   171  
   172  		if err = fn(r.Name); err != nil {
   173  			return err
   174  		}
   175  	}
   176  
   177  	return nil
   178  }
   179  
   180  func cloudStorageScope(scope Scope) string {
   181  	switch {
   182  	case scope.Has(ScopeDelete):
   183  		return gstorage.ScopeFullControl
   184  	case scope.Has(ScopeWrite):
   185  		return gstorage.ScopeReadWrite
   186  	case scope.Has(ScopeRead), scope.Has(ScopeSignURL):
   187  		return gstorage.ScopeReadOnly
   188  	default:
   189  		panic(fmt.Sprintf("unknown scope: '%s'", scope))
   190  	}
   191  }
   192  
   193  func ResolveCloudStorageScope(scope Scope) Scope {
   194  	switch cloudStorageScope(scope) {
   195  	case gstorage.ScopeFullControl:
   196  		return ScopeRWD | scope
   197  	case gstorage.ScopeReadWrite:
   198  		return ScopeRW | scope
   199  	case gstorage.ScopeReadOnly:
   200  		return ScopeRead | scope
   201  	default:
   202  		panic(fmt.Sprintf("unknown scope: '%s'", scope))
   203  	}
   204  }
   205  
   206  func (c *cloudStorageFS) findCredentials(ctx context.Context, scope string) (*google.Credentials, error) {
   207  	if c.credentials != nil {
   208  		return c.credentials, nil
   209  	}
   210  
   211  	return google.FindDefaultCredentials(ctx, scope)
   212  }
   213  
   214  func (c *cloudStorageFS) client(ctx context.Context, scope Scope) (*gstorage.Client, error) {
   215  	creds, err := c.findCredentials(ctx, cloudStorageScope(scope))
   216  	if err != nil {
   217  		return nil, fmt.Errorf("finding credentials: %w", err)
   218  	}
   219  
   220  	var options []option.ClientOption
   221  	options = append(options, option.WithCredentials(creds))
   222  	options = append(options, option.WithScopes(cloudStorageScope(scope)))
   223  
   224  	client, err := gstorage.NewClient(ctx, options...)
   225  	if err != nil {
   226  		return nil, fmt.Errorf("building client: %w", err)
   227  	}
   228  
   229  	return client, nil
   230  }
   231  
   232  func (c *cloudStorageFS) bucketHandle(ctx context.Context, scope Scope) (*gstorage.BucketHandle, error) {
   233  	c.bucketLock.RLock()
   234  	scope |= c.bucketScopes // Expand requested scope to encompass existing scopes
   235  	if bucket := c.bucket; bucket != nil && c.bucketScopes.Has(scope) {
   236  		c.bucketLock.RUnlock()
   237  
   238  		return bucket, nil
   239  	}
   240  	c.bucketLock.RUnlock()
   241  
   242  	c.bucketLock.Lock()
   243  	defer c.bucketLock.Unlock()
   244  	if c.bucket != nil && c.bucketScopes.Has(scope) { // Race condition
   245  		return c.bucket, nil
   246  	}
   247  
   248  	// Expand the requested scope to include the scopes that GCS would provide
   249  	// e.g. Requesting Write actually provides ReadWrite.
   250  	// Also include any scope that was previously used.
   251  	scope = ResolveCloudStorageScope(c.bucketScopes | scope)
   252  
   253  	client, err := c.client(ctx, scope)
   254  	if err != nil {
   255  		return nil, err
   256  	}
   257  
   258  	c.bucket = client.Bucket(c.bucketName)
   259  	c.bucketScopes = scope
   260  
   261  	return c.bucket, nil
   262  }
   263  

View as plain text