...

Source file src/github.com/docker/distribution/registry/storage/driver/oss/oss.go

Documentation: github.com/docker/distribution/registry/storage/driver/oss

     1  //go:build include_oss
     2  // +build include_oss
     3  
     4  // Package oss provides a storagedriver.StorageDriver implementation to
     5  // store blobs in Aliyun OSS cloud storage.
     6  //
     7  // This package leverages the denverdino/aliyungo client library for interfacing with
     8  // oss.
     9  //
    10  // Because OSS is a key, value store the Stat call does not support last modification
    11  // time for directories (directories are an abstraction for key, value stores)
    12  package oss
    13  
    14  import (
    15  	"bytes"
    16  	"context"
    17  	"fmt"
    18  	"io"
    19  	"io/ioutil"
    20  	"net/http"
    21  	"reflect"
    22  	"strconv"
    23  	"strings"
    24  	"time"
    25  
    26  	"github.com/denverdino/aliyungo/oss"
    27  	storagedriver "github.com/docker/distribution/registry/storage/driver"
    28  	"github.com/docker/distribution/registry/storage/driver/base"
    29  	"github.com/docker/distribution/registry/storage/driver/factory"
    30  	"github.com/sirupsen/logrus"
    31  )
    32  
    33  const driverName = "oss"
    34  
    35  // minChunkSize defines the minimum multipart upload chunk size
    36  // OSS API requires multipart upload chunks to be at least 5MB
    37  const minChunkSize = 5 << 20
    38  
    39  const defaultChunkSize = 2 * minChunkSize
    40  
    41  // listMax is the largest amount of objects you can request from OSS in a list call
    42  const listMax = 1000
    43  
    44  // DriverParameters A struct that encapsulates all of the driver parameters after all values have been set
    45  type DriverParameters struct {
    46  	AccessKeyID     string
    47  	AccessKeySecret string
    48  	Bucket          string
    49  	Region          oss.Region
    50  	Internal        bool
    51  	Encrypt         bool
    52  	Secure          bool
    53  	ChunkSize       int64
    54  	RootDirectory   string
    55  	Endpoint        string
    56  }
    57  
    58  func init() {
    59  	factory.Register(driverName, &ossDriverFactory{})
    60  }
    61  
    62  // ossDriverFactory implements the factory.StorageDriverFactory interface
    63  type ossDriverFactory struct{}
    64  
    65  func (factory *ossDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
    66  	return FromParameters(parameters)
    67  }
    68  
    69  var _ storagedriver.StorageDriver = &driver{}
    70  
    71  type driver struct {
    72  	Client        *oss.Client
    73  	Bucket        *oss.Bucket
    74  	ChunkSize     int64
    75  	Encrypt       bool
    76  	RootDirectory string
    77  }
    78  
    79  type baseEmbed struct {
    80  	base.Base
    81  }
    82  
    83  // Driver is a storagedriver.StorageDriver implementation backed by Aliyun OSS
    84  // Objects are stored at absolute keys in the provided bucket.
    85  type Driver struct {
    86  	baseEmbed
    87  }
    88  
    89  // FromParameters constructs a new Driver with a given parameters map
    90  // Required parameters:
    91  // - accesskey
    92  // - secretkey
    93  // - region
    94  // - bucket
    95  // - encrypt
    96  func FromParameters(parameters map[string]interface{}) (*Driver, error) {
    97  	// Providing no values for these is valid in case the user is authenticating
    98  
    99  	accessKey, ok := parameters["accesskeyid"]
   100  	if !ok {
   101  		return nil, fmt.Errorf("No accesskeyid parameter provided")
   102  	}
   103  	secretKey, ok := parameters["accesskeysecret"]
   104  	if !ok {
   105  		return nil, fmt.Errorf("No accesskeysecret parameter provided")
   106  	}
   107  
   108  	regionName, ok := parameters["region"]
   109  	if !ok || fmt.Sprint(regionName) == "" {
   110  		return nil, fmt.Errorf("No region parameter provided")
   111  	}
   112  
   113  	bucket, ok := parameters["bucket"]
   114  	if !ok || fmt.Sprint(bucket) == "" {
   115  		return nil, fmt.Errorf("No bucket parameter provided")
   116  	}
   117  
   118  	internalBool := false
   119  	internal, ok := parameters["internal"]
   120  	if ok {
   121  		internalBool, ok = internal.(bool)
   122  		if !ok {
   123  			return nil, fmt.Errorf("The internal parameter should be a boolean")
   124  		}
   125  	}
   126  
   127  	encryptBool := false
   128  	encrypt, ok := parameters["encrypt"]
   129  	if ok {
   130  		encryptBool, ok = encrypt.(bool)
   131  		if !ok {
   132  			return nil, fmt.Errorf("The encrypt parameter should be a boolean")
   133  		}
   134  	}
   135  
   136  	secureBool := true
   137  	secure, ok := parameters["secure"]
   138  	if ok {
   139  		secureBool, ok = secure.(bool)
   140  		if !ok {
   141  			return nil, fmt.Errorf("The secure parameter should be a boolean")
   142  		}
   143  	}
   144  
   145  	chunkSize := int64(defaultChunkSize)
   146  	chunkSizeParam, ok := parameters["chunksize"]
   147  	if ok {
   148  		switch v := chunkSizeParam.(type) {
   149  		case string:
   150  			vv, err := strconv.ParseInt(v, 0, 64)
   151  			if err != nil {
   152  				return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
   153  			}
   154  			chunkSize = vv
   155  		case int64:
   156  			chunkSize = v
   157  		case int, uint, int32, uint32, uint64:
   158  			chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int()
   159  		default:
   160  			return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam)
   161  		}
   162  
   163  		if chunkSize < minChunkSize {
   164  			return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize)
   165  		}
   166  	}
   167  
   168  	rootDirectory, ok := parameters["rootdirectory"]
   169  	if !ok {
   170  		rootDirectory = ""
   171  	}
   172  
   173  	endpoint, ok := parameters["endpoint"]
   174  	if !ok {
   175  		endpoint = ""
   176  	}
   177  
   178  	params := DriverParameters{
   179  		AccessKeyID:     fmt.Sprint(accessKey),
   180  		AccessKeySecret: fmt.Sprint(secretKey),
   181  		Bucket:          fmt.Sprint(bucket),
   182  		Region:          oss.Region(fmt.Sprint(regionName)),
   183  		ChunkSize:       chunkSize,
   184  		RootDirectory:   fmt.Sprint(rootDirectory),
   185  		Encrypt:         encryptBool,
   186  		Secure:          secureBool,
   187  		Internal:        internalBool,
   188  		Endpoint:        fmt.Sprint(endpoint),
   189  	}
   190  
   191  	return New(params)
   192  }
   193  
   194  // New constructs a new Driver with the given Aliyun credentials, region, encryption flag, and
   195  // bucketName
   196  func New(params DriverParameters) (*Driver, error) {
   197  
   198  	client := oss.NewOSSClient(params.Region, params.Internal, params.AccessKeyID, params.AccessKeySecret, params.Secure)
   199  	client.SetEndpoint(params.Endpoint)
   200  	bucket := client.Bucket(params.Bucket)
   201  	client.SetDebug(false)
   202  
   203  	// Validate that the given credentials have at least read permissions in the
   204  	// given bucket scope.
   205  	if _, err := bucket.List(strings.TrimRight(params.RootDirectory, "/"), "", "", 1); err != nil {
   206  		return nil, err
   207  	}
   208  
   209  	// TODO(tg123): Currently multipart uploads have no timestamps, so this would be unwise
   210  	// if you initiated a new OSS client while another one is running on the same bucket.
   211  
   212  	d := &driver{
   213  		Client:        client,
   214  		Bucket:        bucket,
   215  		ChunkSize:     params.ChunkSize,
   216  		Encrypt:       params.Encrypt,
   217  		RootDirectory: params.RootDirectory,
   218  	}
   219  
   220  	return &Driver{
   221  		baseEmbed: baseEmbed{
   222  			Base: base.Base{
   223  				StorageDriver: d,
   224  			},
   225  		},
   226  	}, nil
   227  }
   228  
   229  // Implement the storagedriver.StorageDriver interface
   230  
   231  func (d *driver) Name() string {
   232  	return driverName
   233  }
   234  
   235  // GetContent retrieves the content stored at "path" as a []byte.
   236  func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
   237  	content, err := d.Bucket.Get(d.ossPath(path))
   238  	if err != nil {
   239  		return nil, parseError(path, err)
   240  	}
   241  	return content, nil
   242  }
   243  
   244  // PutContent stores the []byte content at a location designated by "path".
   245  func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
   246  	return parseError(path, d.Bucket.Put(d.ossPath(path), contents, d.getContentType(), getPermissions(), d.getOptions()))
   247  }
   248  
   249  // Reader retrieves an io.ReadCloser for the content stored at "path" with a
   250  // given byte offset.
   251  func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
   252  	headers := make(http.Header)
   253  	headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
   254  
   255  	resp, err := d.Bucket.GetResponseWithHeaders(d.ossPath(path), headers)
   256  	if err != nil {
   257  		return nil, parseError(path, err)
   258  	}
   259  
   260  	// Due to Aliyun OSS API, status 200 and whole object will be return instead of an
   261  	// InvalidRange error when range is invalid.
   262  	//
   263  	// OSS sever will always return http.StatusPartialContent if range is acceptable.
   264  	if resp.StatusCode != http.StatusPartialContent {
   265  		resp.Body.Close()
   266  		return ioutil.NopCloser(bytes.NewReader(nil)), nil
   267  	}
   268  
   269  	return resp.Body, nil
   270  }
   271  
   272  // Writer returns a FileWriter which will store the content written to it
   273  // at the location designated by "path" after the call to Commit.
   274  func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
   275  	key := d.ossPath(path)
   276  	if !append {
   277  		// TODO (brianbland): cancel other uploads at this path
   278  		multi, err := d.Bucket.InitMulti(key, d.getContentType(), getPermissions(), d.getOptions())
   279  		if err != nil {
   280  			return nil, err
   281  		}
   282  		return d.newWriter(key, multi, nil), nil
   283  	}
   284  	multis, _, err := d.Bucket.ListMulti(key, "")
   285  	if err != nil {
   286  		return nil, parseError(path, err)
   287  	}
   288  	for _, multi := range multis {
   289  		if key != multi.Key {
   290  			continue
   291  		}
   292  		parts, err := multi.ListParts()
   293  		if err != nil {
   294  			return nil, parseError(path, err)
   295  		}
   296  		var multiSize int64
   297  		for _, part := range parts {
   298  			multiSize += part.Size
   299  		}
   300  		return d.newWriter(key, multi, parts), nil
   301  	}
   302  	return nil, storagedriver.PathNotFoundError{Path: path}
   303  }
   304  
   305  // Stat retrieves the FileInfo for the given path, including the current size
   306  // in bytes and the creation time.
   307  func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
   308  	listResponse, err := d.Bucket.List(d.ossPath(path), "", "", 1)
   309  	if err != nil {
   310  		return nil, err
   311  	}
   312  
   313  	fi := storagedriver.FileInfoFields{
   314  		Path: path,
   315  	}
   316  
   317  	if len(listResponse.Contents) == 1 {
   318  		if listResponse.Contents[0].Key != d.ossPath(path) {
   319  			fi.IsDir = true
   320  		} else {
   321  			fi.IsDir = false
   322  			fi.Size = listResponse.Contents[0].Size
   323  
   324  			timestamp, err := time.Parse(time.RFC3339Nano, listResponse.Contents[0].LastModified)
   325  			if err != nil {
   326  				return nil, err
   327  			}
   328  			fi.ModTime = timestamp
   329  		}
   330  	} else if len(listResponse.CommonPrefixes) == 1 {
   331  		fi.IsDir = true
   332  	} else {
   333  		return nil, storagedriver.PathNotFoundError{Path: path}
   334  	}
   335  
   336  	return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
   337  }
   338  
   339  // List returns a list of the objects that are direct descendants of the given path.
   340  func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
   341  	path := opath
   342  	if path != "/" && opath[len(path)-1] != '/' {
   343  		path = path + "/"
   344  	}
   345  
   346  	// This is to cover for the cases when the rootDirectory of the driver is either "" or "/".
   347  	// In those cases, there is no root prefix to replace and we must actually add a "/" to all
   348  	// results in order to keep them as valid paths as recognized by storagedriver.PathRegexp
   349  	prefix := ""
   350  	if d.ossPath("") == "" {
   351  		prefix = "/"
   352  	}
   353  
   354  	ossPath := d.ossPath(path)
   355  	listResponse, err := d.Bucket.List(ossPath, "/", "", listMax)
   356  	if err != nil {
   357  		return nil, parseError(opath, err)
   358  	}
   359  
   360  	files := []string{}
   361  	directories := []string{}
   362  
   363  	for {
   364  		for _, key := range listResponse.Contents {
   365  			files = append(files, strings.Replace(key.Key, d.ossPath(""), prefix, 1))
   366  		}
   367  
   368  		for _, commonPrefix := range listResponse.CommonPrefixes {
   369  			directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.ossPath(""), prefix, 1))
   370  		}
   371  
   372  		if listResponse.IsTruncated {
   373  			listResponse, err = d.Bucket.List(ossPath, "/", listResponse.NextMarker, listMax)
   374  			if err != nil {
   375  				return nil, err
   376  			}
   377  		} else {
   378  			break
   379  		}
   380  	}
   381  
   382  	// This is to cover for the cases when the first key equal to ossPath.
   383  	if len(files) > 0 && files[0] == strings.Replace(ossPath, d.ossPath(""), prefix, 1) {
   384  		files = files[1:]
   385  	}
   386  
   387  	if opath != "/" {
   388  		if len(files) == 0 && len(directories) == 0 {
   389  			// Treat empty response as missing directory, since we don't actually
   390  			// have directories in s3.
   391  			return nil, storagedriver.PathNotFoundError{Path: opath}
   392  		}
   393  	}
   394  
   395  	return append(files, directories...), nil
   396  }
   397  
   398  const maxConcurrency = 10
   399  
   400  // Move moves an object stored at sourcePath to destPath, removing the original
   401  // object.
   402  func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
   403  	logrus.Infof("Move from %s to %s", d.ossPath(sourcePath), d.ossPath(destPath))
   404  	err := d.Bucket.CopyLargeFileInParallel(d.ossPath(sourcePath), d.ossPath(destPath),
   405  		d.getContentType(),
   406  		getPermissions(),
   407  		oss.Options{},
   408  		maxConcurrency)
   409  	if err != nil {
   410  		logrus.Errorf("Failed for move from %s to %s: %v", d.ossPath(sourcePath), d.ossPath(destPath), err)
   411  		return parseError(sourcePath, err)
   412  	}
   413  
   414  	return d.Delete(ctx, sourcePath)
   415  }
   416  
   417  // Delete recursively deletes all objects stored at "path" and its subpaths.
   418  func (d *driver) Delete(ctx context.Context, path string) error {
   419  	ossPath := d.ossPath(path)
   420  	listResponse, err := d.Bucket.List(ossPath, "", "", listMax)
   421  	if err != nil || len(listResponse.Contents) == 0 {
   422  		return storagedriver.PathNotFoundError{Path: path}
   423  	}
   424  
   425  	ossObjects := make([]oss.Object, listMax)
   426  
   427  	for len(listResponse.Contents) > 0 {
   428  		numOssObjects := len(listResponse.Contents)
   429  		for index, key := range listResponse.Contents {
   430  			// Stop if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab").
   431  			if len(key.Key) > len(ossPath) && (key.Key)[len(ossPath)] != '/' {
   432  				numOssObjects = index
   433  				break
   434  			}
   435  			ossObjects[index].Key = key.Key
   436  		}
   437  
   438  		err := d.Bucket.DelMulti(oss.Delete{Quiet: false, Objects: ossObjects[0:numOssObjects]})
   439  		if err != nil {
   440  			return nil
   441  		}
   442  
   443  		if numOssObjects < len(listResponse.Contents) {
   444  			return nil
   445  		}
   446  
   447  		listResponse, err = d.Bucket.List(d.ossPath(path), "", "", listMax)
   448  		if err != nil {
   449  			return err
   450  		}
   451  	}
   452  
   453  	return nil
   454  }
   455  
   456  // URLFor returns a URL which may be used to retrieve the content stored at the given path.
   457  // May return an UnsupportedMethodErr in certain StorageDriver implementations.
   458  func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
   459  	methodString := "GET"
   460  	method, ok := options["method"]
   461  	if ok {
   462  		methodString, ok = method.(string)
   463  		if !ok || (methodString != "GET") {
   464  			return "", storagedriver.ErrUnsupportedMethod{}
   465  		}
   466  	}
   467  
   468  	expiresTime := time.Now().Add(20 * time.Minute)
   469  
   470  	expires, ok := options["expiry"]
   471  	if ok {
   472  		et, ok := expires.(time.Time)
   473  		if ok {
   474  			expiresTime = et
   475  		}
   476  	}
   477  	logrus.Infof("methodString: %s, expiresTime: %v", methodString, expiresTime)
   478  	signedURL := d.Bucket.SignedURLWithMethod(methodString, d.ossPath(path), expiresTime, nil, nil)
   479  	logrus.Infof("signed URL: %s", signedURL)
   480  	return signedURL, nil
   481  }
   482  
   483  // Walk traverses a filesystem defined within driver, starting
   484  // from the given path, calling f on each file
   485  func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
   486  	return storagedriver.WalkFallback(ctx, d, path, f)
   487  }
   488  
   489  func (d *driver) ossPath(path string) string {
   490  	return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
   491  }
   492  
   493  func parseError(path string, err error) error {
   494  	if ossErr, ok := err.(*oss.Error); ok && ossErr.StatusCode == http.StatusNotFound && (ossErr.Code == "NoSuchKey" || ossErr.Code == "") {
   495  		return storagedriver.PathNotFoundError{Path: path}
   496  	}
   497  
   498  	return err
   499  }
   500  
   501  func (d *driver) getOptions() oss.Options {
   502  	return oss.Options{ServerSideEncryption: d.Encrypt}
   503  }
   504  
   505  func getPermissions() oss.ACL {
   506  	return oss.Private
   507  }
   508  
   509  func (d *driver) getContentType() string {
   510  	return "application/octet-stream"
   511  }
   512  
   513  // writer attempts to upload parts to S3 in a buffered fashion where the last
   514  // part is at least as large as the chunksize, so the multipart upload could be
   515  // cleanly resumed in the future. This is violated if Close is called after less
   516  // than a full chunk is written.
   517  type writer struct {
   518  	driver      *driver
   519  	key         string
   520  	multi       *oss.Multi
   521  	parts       []oss.Part
   522  	size        int64
   523  	readyPart   []byte
   524  	pendingPart []byte
   525  	closed      bool
   526  	committed   bool
   527  	cancelled   bool
   528  }
   529  
   530  func (d *driver) newWriter(key string, multi *oss.Multi, parts []oss.Part) storagedriver.FileWriter {
   531  	var size int64
   532  	for _, part := range parts {
   533  		size += part.Size
   534  	}
   535  	return &writer{
   536  		driver: d,
   537  		key:    key,
   538  		multi:  multi,
   539  		parts:  parts,
   540  		size:   size,
   541  	}
   542  }
   543  
   544  func (w *writer) Write(p []byte) (int, error) {
   545  	if w.closed {
   546  		return 0, fmt.Errorf("already closed")
   547  	} else if w.committed {
   548  		return 0, fmt.Errorf("already committed")
   549  	} else if w.cancelled {
   550  		return 0, fmt.Errorf("already cancelled")
   551  	}
   552  
   553  	// If the last written part is smaller than minChunkSize, we need to make a
   554  	// new multipart upload :sadface:
   555  	if len(w.parts) > 0 && int(w.parts[len(w.parts)-1].Size) < minChunkSize {
   556  		err := w.multi.Complete(w.parts)
   557  		if err != nil {
   558  			w.multi.Abort()
   559  			return 0, err
   560  		}
   561  
   562  		multi, err := w.driver.Bucket.InitMulti(w.key, w.driver.getContentType(), getPermissions(), w.driver.getOptions())
   563  		if err != nil {
   564  			return 0, err
   565  		}
   566  		w.multi = multi
   567  
   568  		// If the entire written file is smaller than minChunkSize, we need to make
   569  		// a new part from scratch :double sad face:
   570  		if w.size < minChunkSize {
   571  			contents, err := w.driver.Bucket.Get(w.key)
   572  			if err != nil {
   573  				return 0, err
   574  			}
   575  			w.parts = nil
   576  			w.readyPart = contents
   577  		} else {
   578  			// Otherwise we can use the old file as the new first part
   579  			_, part, err := multi.PutPartCopy(1, oss.CopyOptions{}, w.driver.Bucket.Name+"/"+w.key)
   580  			if err != nil {
   581  				return 0, err
   582  			}
   583  			w.parts = []oss.Part{part}
   584  		}
   585  	}
   586  
   587  	var n int
   588  
   589  	for len(p) > 0 {
   590  		// If no parts are ready to write, fill up the first part
   591  		if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 {
   592  			if len(p) >= neededBytes {
   593  				w.readyPart = append(w.readyPart, p[:neededBytes]...)
   594  				n += neededBytes
   595  				p = p[neededBytes:]
   596  			} else {
   597  				w.readyPart = append(w.readyPart, p...)
   598  				n += len(p)
   599  				p = nil
   600  			}
   601  		}
   602  
   603  		if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 {
   604  			if len(p) >= neededBytes {
   605  				w.pendingPart = append(w.pendingPart, p[:neededBytes]...)
   606  				n += neededBytes
   607  				p = p[neededBytes:]
   608  				err := w.flushPart()
   609  				if err != nil {
   610  					w.size += int64(n)
   611  					return n, err
   612  				}
   613  			} else {
   614  				w.pendingPart = append(w.pendingPart, p...)
   615  				n += len(p)
   616  				p = nil
   617  			}
   618  		}
   619  	}
   620  	w.size += int64(n)
   621  	return n, nil
   622  }
   623  
   624  func (w *writer) Size() int64 {
   625  	return w.size
   626  }
   627  
   628  func (w *writer) Close() error {
   629  	if w.closed {
   630  		return fmt.Errorf("already closed")
   631  	}
   632  	w.closed = true
   633  	return w.flushPart()
   634  }
   635  
   636  func (w *writer) Cancel() error {
   637  	if w.closed {
   638  		return fmt.Errorf("already closed")
   639  	} else if w.committed {
   640  		return fmt.Errorf("already committed")
   641  	}
   642  	w.cancelled = true
   643  	err := w.multi.Abort()
   644  	return err
   645  }
   646  
   647  func (w *writer) Commit() error {
   648  	if w.closed {
   649  		return fmt.Errorf("already closed")
   650  	} else if w.committed {
   651  		return fmt.Errorf("already committed")
   652  	} else if w.cancelled {
   653  		return fmt.Errorf("already cancelled")
   654  	}
   655  	err := w.flushPart()
   656  	if err != nil {
   657  		return err
   658  	}
   659  	w.committed = true
   660  	err = w.multi.Complete(w.parts)
   661  	if err != nil {
   662  		w.multi.Abort()
   663  		return err
   664  	}
   665  	return nil
   666  }
   667  
   668  // flushPart flushes buffers to write a part to S3.
   669  // Only called by Write (with both buffers full) and Close/Commit (always)
   670  func (w *writer) flushPart() error {
   671  	if len(w.readyPart) == 0 && len(w.pendingPart) == 0 {
   672  		// nothing to write
   673  		return nil
   674  	}
   675  	if len(w.pendingPart) < int(w.driver.ChunkSize) {
   676  		// closing with a small pending part
   677  		// combine ready and pending to avoid writing a small part
   678  		w.readyPart = append(w.readyPart, w.pendingPart...)
   679  		w.pendingPart = nil
   680  	}
   681  
   682  	part, err := w.multi.PutPart(len(w.parts)+1, bytes.NewReader(w.readyPart))
   683  	if err != nil {
   684  		return err
   685  	}
   686  	w.parts = append(w.parts, part)
   687  	w.readyPart = w.pendingPart
   688  	w.pendingPart = nil
   689  	return nil
   690  }
   691  

View as plain text