...

Source file src/github.com/docker/distribution/registry/storage/driver/gcs/gcs.go

Documentation: github.com/docker/distribution/registry/storage/driver/gcs

     1  //go:build include_gcs
     2  // +build include_gcs
     3  
     4  // Package gcs provides a storagedriver.StorageDriver implementation to
     5  // store blobs in Google cloud storage.
     6  //
     7  // This package leverages the google.golang.org/cloud/storage client library
     8  // for interfacing with gcs.
     9  //
    10  // Because gcs is a key, value store the Stat call does not support last modification
    11  // time for directories (directories are an abstraction for key, value stores)
    12  //
    13  // Note that the contents of incomplete uploads are not accessible even though
    14  // Stat returns their length
    15  package gcs
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"encoding/json"
    21  	"fmt"
    22  	"io"
    23  	"io/ioutil"
    24  	"math/rand"
    25  	"net/http"
    26  	"net/url"
    27  	"reflect"
    28  	"regexp"
    29  	"sort"
    30  	"strconv"
    31  	"strings"
    32  	"time"
    33  
    34  	storagedriver "github.com/docker/distribution/registry/storage/driver"
    35  	"github.com/docker/distribution/registry/storage/driver/base"
    36  	"github.com/docker/distribution/registry/storage/driver/factory"
    37  	"github.com/sirupsen/logrus"
    38  	"golang.org/x/oauth2"
    39  	"golang.org/x/oauth2/google"
    40  	"golang.org/x/oauth2/jwt"
    41  	"google.golang.org/api/googleapi"
    42  	"google.golang.org/cloud"
    43  	"google.golang.org/cloud/storage"
    44  )
    45  
    46  const (
    47  	driverName     = "gcs"
    48  	dummyProjectID = "<unknown>"
    49  
    50  	uploadSessionContentType = "application/x-docker-upload-session"
    51  	minChunkSize             = 256 * 1024
    52  	defaultChunkSize         = 20 * minChunkSize
    53  	defaultMaxConcurrency    = 50
    54  	minConcurrency           = 25
    55  
    56  	maxTries = 5
    57  )
    58  
    59  var rangeHeader = regexp.MustCompile(`^bytes=([0-9])+-([0-9]+)$`)
    60  
    61  // driverParameters is a struct that encapsulates all of the driver parameters after all values have been set
    62  type driverParameters struct {
    63  	bucket        string
    64  	email         string
    65  	privateKey    []byte
    66  	client        *http.Client
    67  	rootDirectory string
    68  	chunkSize     int
    69  
    70  	// maxConcurrency limits the number of concurrent driver operations
    71  	// to GCS, which ultimately increases reliability of many simultaneous
    72  	// pushes by ensuring we aren't DoSing our own server with many
    73  	// connections.
    74  	maxConcurrency uint64
    75  }
    76  
    77  func init() {
    78  	factory.Register(driverName, &gcsDriverFactory{})
    79  }
    80  
    81  // gcsDriverFactory implements the factory.StorageDriverFactory interface
    82  type gcsDriverFactory struct{}
    83  
    84  // Create StorageDriver from parameters
    85  func (factory *gcsDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
    86  	return FromParameters(parameters)
    87  }
    88  
    89  var _ storagedriver.StorageDriver = &driver{}
    90  
    91  // driver is a storagedriver.StorageDriver implementation backed by GCS
    92  // Objects are stored at absolute keys in the provided bucket.
    93  type driver struct {
    94  	client        *http.Client
    95  	bucket        string
    96  	email         string
    97  	privateKey    []byte
    98  	rootDirectory string
    99  	chunkSize     int
   100  }
   101  
   102  // Wrapper wraps `driver` with a throttler, ensuring that no more than N
   103  // GCS actions can occur concurrently. The default limit is 75.
   104  type Wrapper struct {
   105  	baseEmbed
   106  }
   107  
   108  type baseEmbed struct {
   109  	base.Base
   110  }
   111  
   112  // FromParameters constructs a new Driver with a given parameters map
   113  // Required parameters:
   114  // - bucket
   115  func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
   116  	bucket, ok := parameters["bucket"]
   117  	if !ok || fmt.Sprint(bucket) == "" {
   118  		return nil, fmt.Errorf("No bucket parameter provided")
   119  	}
   120  
   121  	rootDirectory, ok := parameters["rootdirectory"]
   122  	if !ok {
   123  		rootDirectory = ""
   124  	}
   125  
   126  	chunkSize := defaultChunkSize
   127  	chunkSizeParam, ok := parameters["chunksize"]
   128  	if ok {
   129  		switch v := chunkSizeParam.(type) {
   130  		case string:
   131  			vv, err := strconv.Atoi(v)
   132  			if err != nil {
   133  				return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
   134  			}
   135  			chunkSize = vv
   136  		case int, uint, int32, uint32, uint64, int64:
   137  			chunkSize = int(reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int())
   138  		default:
   139  			return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam)
   140  		}
   141  
   142  		if chunkSize < minChunkSize {
   143  			return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize)
   144  		}
   145  
   146  		if chunkSize%minChunkSize != 0 {
   147  			return nil, fmt.Errorf("chunksize should be a multiple of %d", minChunkSize)
   148  		}
   149  	}
   150  
   151  	var ts oauth2.TokenSource
   152  	jwtConf := new(jwt.Config)
   153  	if keyfile, ok := parameters["keyfile"]; ok {
   154  		jsonKey, err := ioutil.ReadFile(fmt.Sprint(keyfile))
   155  		if err != nil {
   156  			return nil, err
   157  		}
   158  		jwtConf, err = google.JWTConfigFromJSON(jsonKey, storage.ScopeFullControl)
   159  		if err != nil {
   160  			return nil, err
   161  		}
   162  		ts = jwtConf.TokenSource(context.Background())
   163  	} else if credentials, ok := parameters["credentials"]; ok {
   164  		credentialMap, ok := credentials.(map[interface{}]interface{})
   165  		if !ok {
   166  			return nil, fmt.Errorf("The credentials were not specified in the correct format")
   167  		}
   168  
   169  		stringMap := map[string]interface{}{}
   170  		for k, v := range credentialMap {
   171  			key, ok := k.(string)
   172  			if !ok {
   173  				return nil, fmt.Errorf("One of the credential keys was not a string: %s", fmt.Sprint(k))
   174  			}
   175  			stringMap[key] = v
   176  		}
   177  
   178  		data, err := json.Marshal(stringMap)
   179  		if err != nil {
   180  			return nil, fmt.Errorf("Failed to marshal gcs credentials to json")
   181  		}
   182  
   183  		jwtConf, err = google.JWTConfigFromJSON(data, storage.ScopeFullControl)
   184  		if err != nil {
   185  			return nil, err
   186  		}
   187  		ts = jwtConf.TokenSource(context.Background())
   188  	} else {
   189  		var err error
   190  		ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl)
   191  		if err != nil {
   192  			return nil, err
   193  		}
   194  	}
   195  
   196  	maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency)
   197  	if err != nil {
   198  		return nil, fmt.Errorf("maxconcurrency config error: %s", err)
   199  	}
   200  
   201  	params := driverParameters{
   202  		bucket:         fmt.Sprint(bucket),
   203  		rootDirectory:  fmt.Sprint(rootDirectory),
   204  		email:          jwtConf.Email,
   205  		privateKey:     jwtConf.PrivateKey,
   206  		client:         oauth2.NewClient(context.Background(), ts),
   207  		chunkSize:      chunkSize,
   208  		maxConcurrency: maxConcurrency,
   209  	}
   210  
   211  	return New(params)
   212  }
   213  
   214  // New constructs a new driver
   215  func New(params driverParameters) (storagedriver.StorageDriver, error) {
   216  	rootDirectory := strings.Trim(params.rootDirectory, "/")
   217  	if rootDirectory != "" {
   218  		rootDirectory += "/"
   219  	}
   220  	if params.chunkSize <= 0 || params.chunkSize%minChunkSize != 0 {
   221  		return nil, fmt.Errorf("Invalid chunksize: %d is not a positive multiple of %d", params.chunkSize, minChunkSize)
   222  	}
   223  	d := &driver{
   224  		bucket:        params.bucket,
   225  		rootDirectory: rootDirectory,
   226  		email:         params.email,
   227  		privateKey:    params.privateKey,
   228  		client:        params.client,
   229  		chunkSize:     params.chunkSize,
   230  	}
   231  
   232  	return &Wrapper{
   233  		baseEmbed: baseEmbed{
   234  			Base: base.Base{
   235  				StorageDriver: base.NewRegulator(d, params.maxConcurrency),
   236  			},
   237  		},
   238  	}, nil
   239  }
   240  
   241  // Implement the storagedriver.StorageDriver interface
   242  
   243  func (d *driver) Name() string {
   244  	return driverName
   245  }
   246  
   247  // GetContent retrieves the content stored at "path" as a []byte.
   248  // This should primarily be used for small objects.
   249  func (d *driver) GetContent(context context.Context, path string) ([]byte, error) {
   250  	gcsContext := d.context(context)
   251  	name := d.pathToKey(path)
   252  	var rc io.ReadCloser
   253  	err := retry(func() error {
   254  		var err error
   255  		rc, err = storage.NewReader(gcsContext, d.bucket, name)
   256  		return err
   257  	})
   258  	if err == storage.ErrObjectNotExist {
   259  		return nil, storagedriver.PathNotFoundError{Path: path}
   260  	}
   261  	if err != nil {
   262  		return nil, err
   263  	}
   264  	defer rc.Close()
   265  
   266  	p, err := ioutil.ReadAll(rc)
   267  	if err != nil {
   268  		return nil, err
   269  	}
   270  	return p, nil
   271  }
   272  
   273  // PutContent stores the []byte content at a location designated by "path".
   274  // This should primarily be used for small objects.
   275  func (d *driver) PutContent(context context.Context, path string, contents []byte) error {
   276  	return retry(func() error {
   277  		wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path))
   278  		wc.ContentType = "application/octet-stream"
   279  		return putContentsClose(wc, contents)
   280  	})
   281  }
   282  
   283  // Reader retrieves an io.ReadCloser for the content stored at "path"
   284  // with a given byte offset.
   285  // May be used to resume reading a stream by providing a nonzero offset.
   286  func (d *driver) Reader(context context.Context, path string, offset int64) (io.ReadCloser, error) {
   287  	res, err := getObject(d.client, d.bucket, d.pathToKey(path), offset)
   288  	if err != nil {
   289  		if res != nil {
   290  			if res.StatusCode == http.StatusNotFound {
   291  				res.Body.Close()
   292  				return nil, storagedriver.PathNotFoundError{Path: path}
   293  			}
   294  
   295  			if res.StatusCode == http.StatusRequestedRangeNotSatisfiable {
   296  				res.Body.Close()
   297  				obj, err := storageStatObject(d.context(context), d.bucket, d.pathToKey(path))
   298  				if err != nil {
   299  					return nil, err
   300  				}
   301  				if offset == obj.Size {
   302  					return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
   303  				}
   304  				return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
   305  			}
   306  		}
   307  		return nil, err
   308  	}
   309  	if res.Header.Get("Content-Type") == uploadSessionContentType {
   310  		defer res.Body.Close()
   311  		return nil, storagedriver.PathNotFoundError{Path: path}
   312  	}
   313  	return res.Body, nil
   314  }
   315  
   316  func getObject(client *http.Client, bucket string, name string, offset int64) (*http.Response, error) {
   317  	// copied from google.golang.org/cloud/storage#NewReader :
   318  	// to set the additional "Range" header
   319  	u := &url.URL{
   320  		Scheme: "https",
   321  		Host:   "storage.googleapis.com",
   322  		Path:   fmt.Sprintf("/%s/%s", bucket, name),
   323  	}
   324  	req, err := http.NewRequest("GET", u.String(), nil)
   325  	if err != nil {
   326  		return nil, err
   327  	}
   328  	if offset > 0 {
   329  		req.Header.Set("Range", fmt.Sprintf("bytes=%v-", offset))
   330  	}
   331  	var res *http.Response
   332  	err = retry(func() error {
   333  		var err error
   334  		res, err = client.Do(req)
   335  		return err
   336  	})
   337  	if err != nil {
   338  		return nil, err
   339  	}
   340  	return res, googleapi.CheckMediaResponse(res)
   341  }
   342  
   343  // Writer returns a FileWriter which will store the content written to it
   344  // at the location designated by "path" after the call to Commit.
   345  func (d *driver) Writer(context context.Context, path string, append bool) (storagedriver.FileWriter, error) {
   346  	writer := &writer{
   347  		client: d.client,
   348  		bucket: d.bucket,
   349  		name:   d.pathToKey(path),
   350  		buffer: make([]byte, d.chunkSize),
   351  	}
   352  
   353  	if append {
   354  		err := writer.init(path)
   355  		if err != nil {
   356  			return nil, err
   357  		}
   358  	}
   359  	return writer, nil
   360  }
   361  
   362  type writer struct {
   363  	client     *http.Client
   364  	bucket     string
   365  	name       string
   366  	size       int64
   367  	offset     int64
   368  	closed     bool
   369  	sessionURI string
   370  	buffer     []byte
   371  	buffSize   int
   372  }
   373  
   374  // Cancel removes any written content from this FileWriter.
   375  func (w *writer) Cancel() error {
   376  	w.closed = true
   377  	err := storageDeleteObject(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
   378  	if err != nil {
   379  		if status, ok := err.(*googleapi.Error); ok {
   380  			if status.Code == http.StatusNotFound {
   381  				err = nil
   382  			}
   383  		}
   384  	}
   385  	return err
   386  }
   387  
   388  func (w *writer) Close() error {
   389  	if w.closed {
   390  		return nil
   391  	}
   392  	w.closed = true
   393  
   394  	err := w.writeChunk()
   395  	if err != nil {
   396  		return err
   397  	}
   398  
   399  	// Copy the remaining bytes from the buffer to the upload session
   400  	// Normally buffSize will be smaller than minChunkSize. However, in the
   401  	// unlikely event that the upload session failed to start, this number could be higher.
   402  	// In this case we can safely clip the remaining bytes to the minChunkSize
   403  	if w.buffSize > minChunkSize {
   404  		w.buffSize = minChunkSize
   405  	}
   406  
   407  	// commit the writes by updating the upload session
   408  	err = retry(func() error {
   409  		wc := storage.NewWriter(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
   410  		wc.ContentType = uploadSessionContentType
   411  		wc.Metadata = map[string]string{
   412  			"Session-URI": w.sessionURI,
   413  			"Offset":      strconv.FormatInt(w.offset, 10),
   414  		}
   415  		return putContentsClose(wc, w.buffer[0:w.buffSize])
   416  	})
   417  	if err != nil {
   418  		return err
   419  	}
   420  	w.size = w.offset + int64(w.buffSize)
   421  	w.buffSize = 0
   422  	return nil
   423  }
   424  
   425  func putContentsClose(wc *storage.Writer, contents []byte) error {
   426  	size := len(contents)
   427  	var nn int
   428  	var err error
   429  	for nn < size {
   430  		n, err := wc.Write(contents[nn:size])
   431  		nn += n
   432  		if err != nil {
   433  			break
   434  		}
   435  	}
   436  	if err != nil {
   437  		return err
   438  	}
   439  	return wc.Close()
   440  }
   441  
   442  // Commit flushes all content written to this FileWriter and makes it
   443  // available for future calls to StorageDriver.GetContent and
   444  // StorageDriver.Reader.
   445  func (w *writer) Commit() error {
   446  
   447  	if err := w.checkClosed(); err != nil {
   448  		return err
   449  	}
   450  	w.closed = true
   451  
   452  	// no session started yet just perform a simple upload
   453  	if w.sessionURI == "" {
   454  		err := retry(func() error {
   455  			wc := storage.NewWriter(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
   456  			wc.ContentType = "application/octet-stream"
   457  			return putContentsClose(wc, w.buffer[0:w.buffSize])
   458  		})
   459  		if err != nil {
   460  			return err
   461  		}
   462  		w.size = w.offset + int64(w.buffSize)
   463  		w.buffSize = 0
   464  		return nil
   465  	}
   466  	size := w.offset + int64(w.buffSize)
   467  	var nn int
   468  	// loop must be performed at least once to ensure the file is committed even when
   469  	// the buffer is empty
   470  	for {
   471  		n, err := putChunk(w.client, w.sessionURI, w.buffer[nn:w.buffSize], w.offset, size)
   472  		nn += int(n)
   473  		w.offset += n
   474  		w.size = w.offset
   475  		if err != nil {
   476  			w.buffSize = copy(w.buffer, w.buffer[nn:w.buffSize])
   477  			return err
   478  		}
   479  		if nn == w.buffSize {
   480  			break
   481  		}
   482  	}
   483  	w.buffSize = 0
   484  	return nil
   485  }
   486  
   487  func (w *writer) checkClosed() error {
   488  	if w.closed {
   489  		return fmt.Errorf("Writer already closed")
   490  	}
   491  	return nil
   492  }
   493  
   494  func (w *writer) writeChunk() error {
   495  	var err error
   496  	// chunks can be uploaded only in multiples of minChunkSize
   497  	// chunkSize is a multiple of minChunkSize less than or equal to buffSize
   498  	chunkSize := w.buffSize - (w.buffSize % minChunkSize)
   499  	if chunkSize == 0 {
   500  		return nil
   501  	}
   502  	// if their is no sessionURI yet, obtain one by starting the session
   503  	if w.sessionURI == "" {
   504  		w.sessionURI, err = startSession(w.client, w.bucket, w.name)
   505  	}
   506  	if err != nil {
   507  		return err
   508  	}
   509  	nn, err := putChunk(w.client, w.sessionURI, w.buffer[0:chunkSize], w.offset, -1)
   510  	w.offset += nn
   511  	if w.offset > w.size {
   512  		w.size = w.offset
   513  	}
   514  	// shift the remaining bytes to the start of the buffer
   515  	w.buffSize = copy(w.buffer, w.buffer[int(nn):w.buffSize])
   516  
   517  	return err
   518  }
   519  
   520  func (w *writer) Write(p []byte) (int, error) {
   521  	err := w.checkClosed()
   522  	if err != nil {
   523  		return 0, err
   524  	}
   525  
   526  	var nn int
   527  	for nn < len(p) {
   528  		n := copy(w.buffer[w.buffSize:], p[nn:])
   529  		w.buffSize += n
   530  		if w.buffSize == cap(w.buffer) {
   531  			err = w.writeChunk()
   532  			if err != nil {
   533  				break
   534  			}
   535  		}
   536  		nn += n
   537  	}
   538  	return nn, err
   539  }
   540  
   541  // Size returns the number of bytes written to this FileWriter.
   542  func (w *writer) Size() int64 {
   543  	return w.size
   544  }
   545  
   546  func (w *writer) init(path string) error {
   547  	res, err := getObject(w.client, w.bucket, w.name, 0)
   548  	if err != nil {
   549  		return err
   550  	}
   551  	defer res.Body.Close()
   552  	if res.Header.Get("Content-Type") != uploadSessionContentType {
   553  		return storagedriver.PathNotFoundError{Path: path}
   554  	}
   555  	offset, err := strconv.ParseInt(res.Header.Get("X-Goog-Meta-Offset"), 10, 64)
   556  	if err != nil {
   557  		return err
   558  	}
   559  	buffer, err := ioutil.ReadAll(res.Body)
   560  	if err != nil {
   561  		return err
   562  	}
   563  	w.sessionURI = res.Header.Get("X-Goog-Meta-Session-URI")
   564  	w.buffSize = copy(w.buffer, buffer)
   565  	w.offset = offset
   566  	w.size = offset + int64(w.buffSize)
   567  	return nil
   568  }
   569  
   570  type request func() error
   571  
   572  func retry(req request) error {
   573  	backoff := time.Second
   574  	var err error
   575  	for i := 0; i < maxTries; i++ {
   576  		err = req()
   577  		if err == nil {
   578  			return nil
   579  		}
   580  
   581  		status, ok := err.(*googleapi.Error)
   582  		if !ok || (status.Code != 429 && status.Code < http.StatusInternalServerError) {
   583  			return err
   584  		}
   585  
   586  		time.Sleep(backoff - time.Second + (time.Duration(rand.Int31n(1000)) * time.Millisecond))
   587  		if i <= 4 {
   588  			backoff = backoff * 2
   589  		}
   590  	}
   591  	return err
   592  }
   593  
   594  // Stat retrieves the FileInfo for the given path, including the current
   595  // size in bytes and the creation time.
   596  func (d *driver) Stat(context context.Context, path string) (storagedriver.FileInfo, error) {
   597  	var fi storagedriver.FileInfoFields
   598  	//try to get as file
   599  	gcsContext := d.context(context)
   600  	obj, err := storageStatObject(gcsContext, d.bucket, d.pathToKey(path))
   601  	if err == nil {
   602  		if obj.ContentType == uploadSessionContentType {
   603  			return nil, storagedriver.PathNotFoundError{Path: path}
   604  		}
   605  		fi = storagedriver.FileInfoFields{
   606  			Path:    path,
   607  			Size:    obj.Size,
   608  			ModTime: obj.Updated,
   609  			IsDir:   false,
   610  		}
   611  		return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
   612  	}
   613  	//try to get as folder
   614  	dirpath := d.pathToDirKey(path)
   615  
   616  	query := &storage.Query{
   617  		Prefix:     dirpath,
   618  		MaxResults: 1,
   619  	}
   620  
   621  	objects, err := storageListObjects(gcsContext, d.bucket, query)
   622  	if err != nil {
   623  		return nil, err
   624  	}
   625  	if len(objects.Results) < 1 {
   626  		return nil, storagedriver.PathNotFoundError{Path: path}
   627  	}
   628  	fi = storagedriver.FileInfoFields{
   629  		Path:  path,
   630  		IsDir: true,
   631  	}
   632  	obj = objects.Results[0]
   633  	if obj.Name == dirpath {
   634  		fi.Size = obj.Size
   635  		fi.ModTime = obj.Updated
   636  	}
   637  	return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
   638  }
   639  
   640  // List returns a list of the objects that are direct descendants of the
   641  // given path.
   642  func (d *driver) List(context context.Context, path string) ([]string, error) {
   643  	query := &storage.Query{
   644  		Delimiter: "/",
   645  		Prefix:    d.pathToDirKey(path),
   646  	}
   647  	list := make([]string, 0, 64)
   648  	for {
   649  		objects, err := storageListObjects(d.context(context), d.bucket, query)
   650  		if err != nil {
   651  			return nil, err
   652  		}
   653  		for _, object := range objects.Results {
   654  			// GCS does not guarantee strong consistency between
   655  			// DELETE and LIST operations. Check that the object is not deleted,
   656  			// and filter out any objects with a non-zero time-deleted
   657  			if object.Deleted.IsZero() && object.ContentType != uploadSessionContentType {
   658  				list = append(list, d.keyToPath(object.Name))
   659  			}
   660  		}
   661  		for _, subpath := range objects.Prefixes {
   662  			subpath = d.keyToPath(subpath)
   663  			list = append(list, subpath)
   664  		}
   665  		query = objects.Next
   666  		if query == nil {
   667  			break
   668  		}
   669  	}
   670  	if path != "/" && len(list) == 0 {
   671  		// Treat empty response as missing directory, since we don't actually
   672  		// have directories in Google Cloud Storage.
   673  		return nil, storagedriver.PathNotFoundError{Path: path}
   674  	}
   675  	return list, nil
   676  }
   677  
   678  // Move moves an object stored at sourcePath to destPath, removing the
   679  // original object.
   680  func (d *driver) Move(context context.Context, sourcePath string, destPath string) error {
   681  	gcsContext := d.context(context)
   682  	_, err := storageCopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil)
   683  	if err != nil {
   684  		if status, ok := err.(*googleapi.Error); ok {
   685  			if status.Code == http.StatusNotFound {
   686  				return storagedriver.PathNotFoundError{Path: sourcePath}
   687  			}
   688  		}
   689  		return err
   690  	}
   691  	err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath))
   692  	// if deleting the file fails, log the error, but do not fail; the file was successfully copied,
   693  	// and the original should eventually be cleaned when purging the uploads folder.
   694  	if err != nil {
   695  		logrus.Infof("error deleting file: %v due to %v", sourcePath, err)
   696  	}
   697  	return nil
   698  }
   699  
   700  // listAll recursively lists all names of objects stored at "prefix" and its subpaths.
   701  func (d *driver) listAll(context context.Context, prefix string) ([]string, error) {
   702  	list := make([]string, 0, 64)
   703  	query := &storage.Query{}
   704  	query.Prefix = prefix
   705  	query.Versions = false
   706  	for {
   707  		objects, err := storageListObjects(d.context(context), d.bucket, query)
   708  		if err != nil {
   709  			return nil, err
   710  		}
   711  		for _, obj := range objects.Results {
   712  			// GCS does not guarantee strong consistency between
   713  			// DELETE and LIST operations. Check that the object is not deleted,
   714  			// and filter out any objects with a non-zero time-deleted
   715  			if obj.Deleted.IsZero() {
   716  				list = append(list, obj.Name)
   717  			}
   718  		}
   719  		query = objects.Next
   720  		if query == nil {
   721  			break
   722  		}
   723  	}
   724  	return list, nil
   725  }
   726  
   727  // Delete recursively deletes all objects stored at "path" and its subpaths.
   728  func (d *driver) Delete(context context.Context, path string) error {
   729  	prefix := d.pathToDirKey(path)
   730  	gcsContext := d.context(context)
   731  	keys, err := d.listAll(gcsContext, prefix)
   732  	if err != nil {
   733  		return err
   734  	}
   735  	if len(keys) > 0 {
   736  		sort.Sort(sort.Reverse(sort.StringSlice(keys)))
   737  		for _, key := range keys {
   738  			err := storageDeleteObject(gcsContext, d.bucket, key)
   739  			// GCS only guarantees eventual consistency, so listAll might return
   740  			// paths that no longer exist. If this happens, just ignore any not
   741  			// found error
   742  			if status, ok := err.(*googleapi.Error); ok {
   743  				if status.Code == http.StatusNotFound {
   744  					err = nil
   745  				}
   746  			}
   747  			if err != nil {
   748  				return err
   749  			}
   750  		}
   751  		return nil
   752  	}
   753  	err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(path))
   754  	if err != nil {
   755  		if status, ok := err.(*googleapi.Error); ok {
   756  			if status.Code == http.StatusNotFound {
   757  				return storagedriver.PathNotFoundError{Path: path}
   758  			}
   759  		}
   760  	}
   761  	return err
   762  }
   763  
   764  func storageDeleteObject(context context.Context, bucket string, name string) error {
   765  	return retry(func() error {
   766  		return storage.DeleteObject(context, bucket, name)
   767  	})
   768  }
   769  
   770  func storageStatObject(context context.Context, bucket string, name string) (*storage.Object, error) {
   771  	var obj *storage.Object
   772  	err := retry(func() error {
   773  		var err error
   774  		obj, err = storage.StatObject(context, bucket, name)
   775  		return err
   776  	})
   777  	return obj, err
   778  }
   779  
   780  func storageListObjects(context context.Context, bucket string, q *storage.Query) (*storage.Objects, error) {
   781  	var objs *storage.Objects
   782  	err := retry(func() error {
   783  		var err error
   784  		objs, err = storage.ListObjects(context, bucket, q)
   785  		return err
   786  	})
   787  	return objs, err
   788  }
   789  
   790  func storageCopyObject(context context.Context, srcBucket, srcName string, destBucket, destName string, attrs *storage.ObjectAttrs) (*storage.Object, error) {
   791  	var obj *storage.Object
   792  	err := retry(func() error {
   793  		var err error
   794  		obj, err = storage.CopyObject(context, srcBucket, srcName, destBucket, destName, attrs)
   795  		return err
   796  	})
   797  	return obj, err
   798  }
   799  
   800  // URLFor returns a URL which may be used to retrieve the content stored at
   801  // the given path, possibly using the given options.
   802  // Returns ErrUnsupportedMethod if this driver has no privateKey
   803  func (d *driver) URLFor(context context.Context, path string, options map[string]interface{}) (string, error) {
   804  	if d.privateKey == nil {
   805  		return "", storagedriver.ErrUnsupportedMethod{}
   806  	}
   807  
   808  	name := d.pathToKey(path)
   809  	methodString := "GET"
   810  	method, ok := options["method"]
   811  	if ok {
   812  		methodString, ok = method.(string)
   813  		if !ok || (methodString != "GET" && methodString != "HEAD") {
   814  			return "", storagedriver.ErrUnsupportedMethod{}
   815  		}
   816  	}
   817  
   818  	expiresTime := time.Now().Add(20 * time.Minute)
   819  	expires, ok := options["expiry"]
   820  	if ok {
   821  		et, ok := expires.(time.Time)
   822  		if ok {
   823  			expiresTime = et
   824  		}
   825  	}
   826  
   827  	opts := &storage.SignedURLOptions{
   828  		GoogleAccessID: d.email,
   829  		PrivateKey:     d.privateKey,
   830  		Method:         methodString,
   831  		Expires:        expiresTime,
   832  	}
   833  	return storage.SignedURL(d.bucket, name, opts)
   834  }
   835  
   836  // Walk traverses a filesystem defined within driver, starting
   837  // from the given path, calling f on each file
   838  func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
   839  	return storagedriver.WalkFallback(ctx, d, path, f)
   840  }
   841  
   842  func startSession(client *http.Client, bucket string, name string) (uri string, err error) {
   843  	u := &url.URL{
   844  		Scheme:   "https",
   845  		Host:     "www.googleapis.com",
   846  		Path:     fmt.Sprintf("/upload/storage/v1/b/%v/o", bucket),
   847  		RawQuery: fmt.Sprintf("uploadType=resumable&name=%v", name),
   848  	}
   849  	err = retry(func() error {
   850  		req, err := http.NewRequest("POST", u.String(), nil)
   851  		if err != nil {
   852  			return err
   853  		}
   854  		req.Header.Set("X-Upload-Content-Type", "application/octet-stream")
   855  		req.Header.Set("Content-Length", "0")
   856  		resp, err := client.Do(req)
   857  		if err != nil {
   858  			return err
   859  		}
   860  		defer resp.Body.Close()
   861  		err = googleapi.CheckMediaResponse(resp)
   862  		if err != nil {
   863  			return err
   864  		}
   865  		uri = resp.Header.Get("Location")
   866  		return nil
   867  	})
   868  	return uri, err
   869  }
   870  
   871  func putChunk(client *http.Client, sessionURI string, chunk []byte, from int64, totalSize int64) (int64, error) {
   872  	bytesPut := int64(0)
   873  	err := retry(func() error {
   874  		req, err := http.NewRequest("PUT", sessionURI, bytes.NewReader(chunk))
   875  		if err != nil {
   876  			return err
   877  		}
   878  		length := int64(len(chunk))
   879  		to := from + length - 1
   880  		size := "*"
   881  		if totalSize >= 0 {
   882  			size = strconv.FormatInt(totalSize, 10)
   883  		}
   884  		req.Header.Set("Content-Type", "application/octet-stream")
   885  		if from == to+1 {
   886  			req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", size))
   887  		} else {
   888  			req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", from, to, size))
   889  		}
   890  		req.Header.Set("Content-Length", strconv.FormatInt(length, 10))
   891  
   892  		resp, err := client.Do(req)
   893  		if err != nil {
   894  			return err
   895  		}
   896  		defer resp.Body.Close()
   897  		if totalSize < 0 && resp.StatusCode == 308 {
   898  			groups := rangeHeader.FindStringSubmatch(resp.Header.Get("Range"))
   899  			end, err := strconv.ParseInt(groups[2], 10, 64)
   900  			if err != nil {
   901  				return err
   902  			}
   903  			bytesPut = end - from + 1
   904  			return nil
   905  		}
   906  		err = googleapi.CheckMediaResponse(resp)
   907  		if err != nil {
   908  			return err
   909  		}
   910  		bytesPut = to - from + 1
   911  		return nil
   912  	})
   913  	return bytesPut, err
   914  }
   915  
   916  func (d *driver) context(context context.Context) context.Context {
   917  	return cloud.WithContext(context, dummyProjectID, d.client)
   918  }
   919  
   920  func (d *driver) pathToKey(path string) string {
   921  	return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/"))
   922  }
   923  
   924  func (d *driver) pathToDirKey(path string) string {
   925  	return d.pathToKey(path) + "/"
   926  }
   927  
   928  func (d *driver) keyToPath(key string) string {
   929  	return "/" + strings.Trim(strings.TrimPrefix(key, d.rootDirectory), "/")
   930  }
   931  

View as plain text