...

Source file src/github.com/docker/distribution/registry/storage/driver/swift/swift.go

Documentation: github.com/docker/distribution/registry/storage/driver/swift

     1  // Package swift provides a storagedriver.StorageDriver implementation to
     2  // store blobs in Openstack Swift object storage.
     3  //
     4  // This package leverages the ncw/swift client library for interfacing with
     5  // Swift.
     6  //
     7  // It supports both TempAuth authentication and Keystone authentication
     8  // (up to version 3).
     9  //
    10  // As Swift has a limit on the size of a single uploaded object (by default
    11  // this is 5GB), the driver makes use of the Swift Large Object Support
    12  // (http://docs.openstack.org/developer/swift/overview_large_objects.html).
    13  // Only one container is used for both manifests and data objects. Manifests
    14  // are stored in the 'files' pseudo directory, data objects are stored under
    15  // 'segments'.
    16  package swift
    17  
    18  import (
    19  	"bufio"
    20  	"bytes"
    21  	"context"
    22  	"crypto/rand"
    23  	"crypto/sha1"
    24  	"crypto/tls"
    25  	"encoding/hex"
    26  	"fmt"
    27  	"io"
    28  	"io/ioutil"
    29  	"net/http"
    30  	"net/url"
    31  	"strconv"
    32  	"strings"
    33  	"time"
    34  
    35  	"github.com/mitchellh/mapstructure"
    36  	"github.com/ncw/swift"
    37  
    38  	storagedriver "github.com/docker/distribution/registry/storage/driver"
    39  	"github.com/docker/distribution/registry/storage/driver/base"
    40  	"github.com/docker/distribution/registry/storage/driver/factory"
    41  	"github.com/docker/distribution/version"
    42  )
    43  
    44  const driverName = "swift"
    45  
    46  // defaultChunkSize defines the default size of a segment
    47  const defaultChunkSize = 20 * 1024 * 1024
    48  
    49  // minChunkSize defines the minimum size of a segment
    50  const minChunkSize = 1 << 20
    51  
    52  // contentType defines the Content-Type header associated with stored segments
    53  const contentType = "application/octet-stream"
    54  
    55  // readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
    56  var readAfterWriteTimeout = 15 * time.Second
    57  
    58  // readAfterWriteWait defines the time to sleep between two retries
    59  var readAfterWriteWait = 200 * time.Millisecond
    60  
    61  // Parameters A struct that encapsulates all of the driver parameters after all values have been set
    62  type Parameters struct {
    63  	Username            string
    64  	Password            string
    65  	AuthURL             string
    66  	Tenant              string
    67  	TenantID            string
    68  	Domain              string
    69  	DomainID            string
    70  	TenantDomain        string
    71  	TenantDomainID      string
    72  	TrustID             string
    73  	Region              string
    74  	AuthVersion         int
    75  	Container           string
    76  	Prefix              string
    77  	EndpointType        string
    78  	InsecureSkipVerify  bool
    79  	ChunkSize           int
    80  	SecretKey           string
    81  	AccessKey           string
    82  	TempURLContainerKey bool
    83  	TempURLMethods      []string
    84  }
    85  
    86  // swiftInfo maps the JSON structure returned by Swift /info endpoint
    87  type swiftInfo struct {
    88  	Swift struct {
    89  		Version string `mapstructure:"version"`
    90  	}
    91  	Tempurl struct {
    92  		Methods []string `mapstructure:"methods"`
    93  	}
    94  	BulkDelete struct {
    95  		MaxDeletesPerRequest int `mapstructure:"max_deletes_per_request"`
    96  	} `mapstructure:"bulk_delete"`
    97  }
    98  
    99  func init() {
   100  	factory.Register(driverName, &swiftDriverFactory{})
   101  }
   102  
   103  // swiftDriverFactory implements the factory.StorageDriverFactory interface
   104  type swiftDriverFactory struct{}
   105  
   106  func (factory *swiftDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
   107  	return FromParameters(parameters)
   108  }
   109  
   110  type driver struct {
   111  	Conn                 *swift.Connection
   112  	Container            string
   113  	Prefix               string
   114  	BulkDeleteSupport    bool
   115  	BulkDeleteMaxDeletes int
   116  	ChunkSize            int
   117  	SecretKey            string
   118  	AccessKey            string
   119  	TempURLContainerKey  bool
   120  	TempURLMethods       []string
   121  }
   122  
   123  type baseEmbed struct {
   124  	base.Base
   125  }
   126  
   127  // Driver is a storagedriver.StorageDriver implementation backed by Openstack Swift
   128  // Objects are stored at absolute keys in the provided container.
   129  type Driver struct {
   130  	baseEmbed
   131  }
   132  
   133  // FromParameters constructs a new Driver with a given parameters map
   134  // Required parameters:
   135  // - username
   136  // - password
   137  // - authurl
   138  // - container
   139  func FromParameters(parameters map[string]interface{}) (*Driver, error) {
   140  	params := Parameters{
   141  		ChunkSize:          defaultChunkSize,
   142  		InsecureSkipVerify: false,
   143  	}
   144  
   145  	// Sanitize some entries before trying to decode parameters with mapstructure
   146  	// TenantID and Tenant when integers only and passed as ENV variables
   147  	// are considered as integer and not string. The parser fails in this
   148  	// case.
   149  	_, ok := parameters["tenant"]
   150  	if ok {
   151  		parameters["tenant"] = fmt.Sprint(parameters["tenant"])
   152  	}
   153  	_, ok = parameters["tenantid"]
   154  	if ok {
   155  		parameters["tenantid"] = fmt.Sprint(parameters["tenantid"])
   156  	}
   157  
   158  	if err := mapstructure.Decode(parameters, &params); err != nil {
   159  		return nil, err
   160  	}
   161  
   162  	if params.Username == "" {
   163  		return nil, fmt.Errorf("no username parameter provided")
   164  	}
   165  
   166  	if params.Password == "" {
   167  		return nil, fmt.Errorf("no password parameter provided")
   168  	}
   169  
   170  	if params.AuthURL == "" {
   171  		return nil, fmt.Errorf("no authurl parameter provided")
   172  	}
   173  
   174  	if params.Container == "" {
   175  		return nil, fmt.Errorf("no container parameter provided")
   176  	}
   177  
   178  	if params.ChunkSize < minChunkSize {
   179  		return nil, fmt.Errorf("the chunksize %#v parameter should be a number that is larger than or equal to %d", params.ChunkSize, minChunkSize)
   180  	}
   181  
   182  	return New(params)
   183  }
   184  
   185  // New constructs a new Driver with the given Openstack Swift credentials and container name
   186  func New(params Parameters) (*Driver, error) {
   187  	transport := &http.Transport{
   188  		Proxy:               http.ProxyFromEnvironment,
   189  		MaxIdleConnsPerHost: 2048,
   190  		TLSClientConfig:     &tls.Config{InsecureSkipVerify: params.InsecureSkipVerify},
   191  	}
   192  
   193  	ct := &swift.Connection{
   194  		UserName:       params.Username,
   195  		ApiKey:         params.Password,
   196  		AuthUrl:        params.AuthURL,
   197  		Region:         params.Region,
   198  		AuthVersion:    params.AuthVersion,
   199  		UserAgent:      "distribution/" + version.Version,
   200  		Tenant:         params.Tenant,
   201  		TenantId:       params.TenantID,
   202  		Domain:         params.Domain,
   203  		DomainId:       params.DomainID,
   204  		TenantDomain:   params.TenantDomain,
   205  		TenantDomainId: params.TenantDomainID,
   206  		TrustId:        params.TrustID,
   207  		EndpointType:   swift.EndpointType(params.EndpointType),
   208  		Transport:      transport,
   209  		ConnectTimeout: 60 * time.Second,
   210  		Timeout:        15 * 60 * time.Second,
   211  	}
   212  	err := ct.Authenticate()
   213  	if err != nil {
   214  		return nil, fmt.Errorf("swift authentication failed: %s", err)
   215  	}
   216  
   217  	if _, _, err := ct.Container(params.Container); err == swift.ContainerNotFound {
   218  		if err := ct.ContainerCreate(params.Container, nil); err != nil {
   219  			return nil, fmt.Errorf("failed to create container %s (%s)", params.Container, err)
   220  		}
   221  	} else if err != nil {
   222  		return nil, fmt.Errorf("failed to retrieve info about container %s (%s)", params.Container, err)
   223  	}
   224  
   225  	d := &driver{
   226  		Conn:           ct,
   227  		Container:      params.Container,
   228  		Prefix:         params.Prefix,
   229  		ChunkSize:      params.ChunkSize,
   230  		TempURLMethods: make([]string, 0),
   231  		AccessKey:      params.AccessKey,
   232  	}
   233  
   234  	info := swiftInfo{}
   235  	if config, err := d.Conn.QueryInfo(); err == nil {
   236  		_, d.BulkDeleteSupport = config["bulk_delete"]
   237  
   238  		if err := mapstructure.Decode(config, &info); err == nil {
   239  			d.TempURLContainerKey = info.Swift.Version >= "2.3.0"
   240  			d.TempURLMethods = info.Tempurl.Methods
   241  			if d.BulkDeleteSupport {
   242  				d.BulkDeleteMaxDeletes = info.BulkDelete.MaxDeletesPerRequest
   243  			}
   244  		}
   245  	} else {
   246  		d.TempURLContainerKey = params.TempURLContainerKey
   247  		d.TempURLMethods = params.TempURLMethods
   248  	}
   249  
   250  	if len(d.TempURLMethods) > 0 {
   251  		secretKey := params.SecretKey
   252  		if secretKey == "" {
   253  			secretKey, _ = generateSecret()
   254  		}
   255  
   256  		// Since Swift 2.2.2, we can now set secret keys on containers
   257  		// in addition to the account secret keys. Use them in preference.
   258  		if d.TempURLContainerKey {
   259  			_, containerHeaders, err := d.Conn.Container(d.Container)
   260  			if err != nil {
   261  				return nil, fmt.Errorf("failed to fetch container info %s (%s)", d.Container, err)
   262  			}
   263  
   264  			d.SecretKey = containerHeaders["X-Container-Meta-Temp-Url-Key"]
   265  			if d.SecretKey == "" || (params.SecretKey != "" && d.SecretKey != params.SecretKey) {
   266  				m := swift.Metadata{}
   267  				m["temp-url-key"] = secretKey
   268  				if d.Conn.ContainerUpdate(d.Container, m.ContainerHeaders()); err == nil {
   269  					d.SecretKey = secretKey
   270  				}
   271  			}
   272  		} else {
   273  			// Use the account secret key
   274  			_, accountHeaders, err := d.Conn.Account()
   275  			if err != nil {
   276  				return nil, fmt.Errorf("failed to fetch account info (%s)", err)
   277  			}
   278  
   279  			d.SecretKey = accountHeaders["X-Account-Meta-Temp-Url-Key"]
   280  			if d.SecretKey == "" || (params.SecretKey != "" && d.SecretKey != params.SecretKey) {
   281  				m := swift.Metadata{}
   282  				m["temp-url-key"] = secretKey
   283  				if err := d.Conn.AccountUpdate(m.AccountHeaders()); err == nil {
   284  					d.SecretKey = secretKey
   285  				}
   286  			}
   287  		}
   288  	}
   289  
   290  	return &Driver{
   291  		baseEmbed: baseEmbed{
   292  			Base: base.Base{
   293  				StorageDriver: d,
   294  			},
   295  		},
   296  	}, nil
   297  }
   298  
   299  // Implement the storagedriver.StorageDriver interface
   300  
   301  func (d *driver) Name() string {
   302  	return driverName
   303  }
   304  
   305  // GetContent retrieves the content stored at "path" as a []byte.
   306  func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
   307  	content, err := d.Conn.ObjectGetBytes(d.Container, d.swiftPath(path))
   308  	if err == swift.ObjectNotFound {
   309  		return nil, storagedriver.PathNotFoundError{Path: path}
   310  	}
   311  	return content, err
   312  }
   313  
   314  // PutContent stores the []byte content at a location designated by "path".
   315  func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
   316  	err := d.Conn.ObjectPutBytes(d.Container, d.swiftPath(path), contents, contentType)
   317  	if err == swift.ObjectNotFound {
   318  		return storagedriver.PathNotFoundError{Path: path}
   319  	}
   320  	return err
   321  }
   322  
   323  // Reader retrieves an io.ReadCloser for the content stored at "path" with a
   324  // given byte offset.
   325  func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
   326  	headers := make(swift.Headers)
   327  	headers["Range"] = "bytes=" + strconv.FormatInt(offset, 10) + "-"
   328  
   329  	waitingTime := readAfterWriteWait
   330  	endTime := time.Now().Add(readAfterWriteTimeout)
   331  
   332  	for {
   333  		file, headers, err := d.Conn.ObjectOpen(d.Container, d.swiftPath(path), false, headers)
   334  		if err != nil {
   335  			if err == swift.ObjectNotFound {
   336  				return nil, storagedriver.PathNotFoundError{Path: path}
   337  			}
   338  			if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == http.StatusRequestedRangeNotSatisfiable {
   339  				return ioutil.NopCloser(bytes.NewReader(nil)), nil
   340  			}
   341  			return file, err
   342  		}
   343  
   344  		//if this is a DLO and it is clear that segments are still missing,
   345  		//wait until they show up
   346  		_, isDLO := headers["X-Object-Manifest"]
   347  		size, err := file.Length()
   348  		if err != nil {
   349  			return file, err
   350  		}
   351  		if isDLO && size == 0 {
   352  			if time.Now().Add(waitingTime).After(endTime) {
   353  				return nil, fmt.Errorf("timeout expired while waiting for segments of %s to show up", path)
   354  			}
   355  			time.Sleep(waitingTime)
   356  			waitingTime *= 2
   357  			continue
   358  		}
   359  
   360  		//if not, then this reader will be fine
   361  		return file, nil
   362  	}
   363  }
   364  
   365  // Writer returns a FileWriter which will store the content written to it
   366  // at the location designated by "path" after the call to Commit.
   367  func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
   368  	var (
   369  		segments     []swift.Object
   370  		segmentsPath string
   371  		err          error
   372  	)
   373  
   374  	if !append {
   375  		segmentsPath, err = d.swiftSegmentPath(path)
   376  		if err != nil {
   377  			return nil, err
   378  		}
   379  	} else {
   380  		info, headers, err := d.Conn.Object(d.Container, d.swiftPath(path))
   381  		if err == swift.ObjectNotFound {
   382  			return nil, storagedriver.PathNotFoundError{Path: path}
   383  		} else if err != nil {
   384  			return nil, err
   385  		}
   386  		manifest, ok := headers["X-Object-Manifest"]
   387  		if !ok {
   388  			segmentsPath, err = d.swiftSegmentPath(path)
   389  			if err != nil {
   390  				return nil, err
   391  			}
   392  			if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, getSegmentPath(segmentsPath, len(segments))); err != nil {
   393  				return nil, err
   394  			}
   395  			segments = []swift.Object{info}
   396  		} else {
   397  			_, segmentsPath = parseManifest(manifest)
   398  			if segments, err = d.getAllSegments(segmentsPath); err != nil {
   399  				return nil, err
   400  			}
   401  		}
   402  	}
   403  
   404  	return d.newWriter(path, segmentsPath, segments), nil
   405  }
   406  
   407  // Stat retrieves the FileInfo for the given path, including the current size
   408  // in bytes and the creation time.
   409  func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
   410  	swiftPath := d.swiftPath(path)
   411  	opts := &swift.ObjectsOpts{
   412  		Prefix:    swiftPath,
   413  		Delimiter: '/',
   414  	}
   415  
   416  	objects, err := d.Conn.ObjectsAll(d.Container, opts)
   417  	if err != nil {
   418  		if err == swift.ContainerNotFound {
   419  			return nil, storagedriver.PathNotFoundError{Path: path}
   420  		}
   421  		return nil, err
   422  	}
   423  
   424  	fi := storagedriver.FileInfoFields{
   425  		Path: strings.TrimPrefix(strings.TrimSuffix(swiftPath, "/"), d.swiftPath("/")),
   426  	}
   427  
   428  	for _, obj := range objects {
   429  		if obj.PseudoDirectory && obj.Name == swiftPath+"/" {
   430  			fi.IsDir = true
   431  			return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
   432  		} else if obj.Name == swiftPath {
   433  			// The file exists. But on Swift 1.12, the 'bytes' field is always 0 so
   434  			// we need to do a separate HEAD request.
   435  			break
   436  		}
   437  	}
   438  
   439  	//Don't trust an empty `objects` slice. A container listing can be
   440  	//outdated. For files, we can make a HEAD request on the object which
   441  	//reports existence (at least) much more reliably.
   442  	waitingTime := readAfterWriteWait
   443  	endTime := time.Now().Add(readAfterWriteTimeout)
   444  
   445  	for {
   446  		info, headers, err := d.Conn.Object(d.Container, swiftPath)
   447  		if err != nil {
   448  			if err == swift.ObjectNotFound {
   449  				return nil, storagedriver.PathNotFoundError{Path: path}
   450  			}
   451  			return nil, err
   452  		}
   453  
   454  		//if this is a DLO and it is clear that segments are still missing,
   455  		//wait until they show up
   456  		_, isDLO := headers["X-Object-Manifest"]
   457  		if isDLO && info.Bytes == 0 {
   458  			if time.Now().Add(waitingTime).After(endTime) {
   459  				return nil, fmt.Errorf("timeout expired while waiting for segments of %s to show up", path)
   460  			}
   461  			time.Sleep(waitingTime)
   462  			waitingTime *= 2
   463  			continue
   464  		}
   465  
   466  		//otherwise, accept the result
   467  		fi.IsDir = false
   468  		fi.Size = info.Bytes
   469  		fi.ModTime = info.LastModified
   470  		return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
   471  	}
   472  }
   473  
   474  // List returns a list of the objects that are direct descendants of the given path.
   475  func (d *driver) List(ctx context.Context, path string) ([]string, error) {
   476  	var files []string
   477  
   478  	prefix := d.swiftPath(path)
   479  	if prefix != "" {
   480  		prefix += "/"
   481  	}
   482  
   483  	opts := &swift.ObjectsOpts{
   484  		Prefix:    prefix,
   485  		Delimiter: '/',
   486  	}
   487  
   488  	objects, err := d.Conn.ObjectsAll(d.Container, opts)
   489  	for _, obj := range objects {
   490  		files = append(files, strings.TrimPrefix(strings.TrimSuffix(obj.Name, "/"), d.swiftPath("/")))
   491  	}
   492  
   493  	if err == swift.ContainerNotFound || (len(objects) == 0 && path != "/") {
   494  		return files, storagedriver.PathNotFoundError{Path: path}
   495  	}
   496  	return files, err
   497  }
   498  
   499  // Move moves an object stored at sourcePath to destPath, removing the original
   500  // object.
   501  func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
   502  	_, headers, err := d.Conn.Object(d.Container, d.swiftPath(sourcePath))
   503  	if err == nil {
   504  		if manifest, ok := headers["X-Object-Manifest"]; ok {
   505  			if err = d.createManifest(destPath, manifest); err != nil {
   506  				return err
   507  			}
   508  			err = d.Conn.ObjectDelete(d.Container, d.swiftPath(sourcePath))
   509  		} else {
   510  			err = d.Conn.ObjectMove(d.Container, d.swiftPath(sourcePath), d.Container, d.swiftPath(destPath))
   511  		}
   512  	}
   513  	if err == swift.ObjectNotFound {
   514  		return storagedriver.PathNotFoundError{Path: sourcePath}
   515  	}
   516  	return err
   517  }
   518  
   519  // Delete recursively deletes all objects stored at "path" and its subpaths.
   520  func (d *driver) Delete(ctx context.Context, path string) error {
   521  	opts := swift.ObjectsOpts{
   522  		Prefix: d.swiftPath(path) + "/",
   523  	}
   524  
   525  	objects, err := d.Conn.ObjectsAll(d.Container, &opts)
   526  	if err != nil {
   527  		if err == swift.ContainerNotFound {
   528  			return storagedriver.PathNotFoundError{Path: path}
   529  		}
   530  		return err
   531  	}
   532  
   533  	for _, obj := range objects {
   534  		if obj.PseudoDirectory {
   535  			continue
   536  		}
   537  		if _, headers, err := d.Conn.Object(d.Container, obj.Name); err == nil {
   538  			manifest, ok := headers["X-Object-Manifest"]
   539  			if ok {
   540  				_, prefix := parseManifest(manifest)
   541  				segments, err := d.getAllSegments(prefix)
   542  				if err != nil {
   543  					return err
   544  				}
   545  				objects = append(objects, segments...)
   546  			}
   547  		} else {
   548  			if err == swift.ObjectNotFound {
   549  				return storagedriver.PathNotFoundError{Path: obj.Name}
   550  			}
   551  			return err
   552  		}
   553  	}
   554  
   555  	if d.BulkDeleteSupport && len(objects) > 0 && d.BulkDeleteMaxDeletes > 0 {
   556  		filenames := make([]string, len(objects))
   557  		for i, obj := range objects {
   558  			filenames[i] = obj.Name
   559  		}
   560  
   561  		chunks, err := chunkFilenames(filenames, d.BulkDeleteMaxDeletes)
   562  		if err != nil {
   563  			return err
   564  		}
   565  		for _, chunk := range chunks {
   566  			_, err := d.Conn.BulkDelete(d.Container, chunk)
   567  			// Don't fail on ObjectNotFound because eventual consistency
   568  			// makes this situation normal.
   569  			if err != nil && err != swift.Forbidden && err != swift.ObjectNotFound {
   570  				if err == swift.ContainerNotFound {
   571  					return storagedriver.PathNotFoundError{Path: path}
   572  				}
   573  				return err
   574  			}
   575  		}
   576  	} else {
   577  		for _, obj := range objects {
   578  			if err := d.Conn.ObjectDelete(d.Container, obj.Name); err != nil {
   579  				if err == swift.ObjectNotFound {
   580  					return storagedriver.PathNotFoundError{Path: obj.Name}
   581  				}
   582  				return err
   583  			}
   584  		}
   585  	}
   586  
   587  	_, _, err = d.Conn.Object(d.Container, d.swiftPath(path))
   588  	if err == nil {
   589  		if err := d.Conn.ObjectDelete(d.Container, d.swiftPath(path)); err != nil {
   590  			if err == swift.ObjectNotFound {
   591  				return storagedriver.PathNotFoundError{Path: path}
   592  			}
   593  			return err
   594  		}
   595  	} else if err == swift.ObjectNotFound {
   596  		if len(objects) == 0 {
   597  			return storagedriver.PathNotFoundError{Path: path}
   598  		}
   599  	} else {
   600  		return err
   601  	}
   602  	return nil
   603  }
   604  
   605  // URLFor returns a URL which may be used to retrieve the content stored at the given path.
   606  func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
   607  	if d.SecretKey == "" {
   608  		return "", storagedriver.ErrUnsupportedMethod{}
   609  	}
   610  
   611  	methodString := "GET"
   612  	method, ok := options["method"]
   613  	if ok {
   614  		if methodString, ok = method.(string); !ok {
   615  			return "", storagedriver.ErrUnsupportedMethod{}
   616  		}
   617  	}
   618  
   619  	if methodString == "HEAD" {
   620  		// A "HEAD" request on a temporary URL is allowed if the
   621  		// signature was generated with "GET", "POST" or "PUT"
   622  		methodString = "GET"
   623  	}
   624  
   625  	supported := false
   626  	for _, method := range d.TempURLMethods {
   627  		if method == methodString {
   628  			supported = true
   629  			break
   630  		}
   631  	}
   632  
   633  	if !supported {
   634  		return "", storagedriver.ErrUnsupportedMethod{}
   635  	}
   636  
   637  	expiresTime := time.Now().Add(20 * time.Minute)
   638  	expires, ok := options["expiry"]
   639  	if ok {
   640  		et, ok := expires.(time.Time)
   641  		if ok {
   642  			expiresTime = et
   643  		}
   644  	}
   645  
   646  	tempURL := d.Conn.ObjectTempUrl(d.Container, d.swiftPath(path), d.SecretKey, methodString, expiresTime)
   647  
   648  	if d.AccessKey != "" {
   649  		// On HP Cloud, the signature must be in the form of tenant_id:access_key:signature
   650  		url, _ := url.Parse(tempURL)
   651  		query := url.Query()
   652  		query.Set("temp_url_sig", fmt.Sprintf("%s:%s:%s", d.Conn.TenantId, d.AccessKey, query.Get("temp_url_sig")))
   653  		url.RawQuery = query.Encode()
   654  		tempURL = url.String()
   655  	}
   656  
   657  	return tempURL, nil
   658  }
   659  
   660  // Walk traverses a filesystem defined within driver, starting
   661  // from the given path, calling f on each file
   662  func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
   663  	return storagedriver.WalkFallback(ctx, d, path, f)
   664  }
   665  
   666  func (d *driver) swiftPath(path string) string {
   667  	return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
   668  }
   669  
   670  func (d *driver) swiftSegmentPath(path string) (string, error) {
   671  	checksum := sha1.New()
   672  	random := make([]byte, 32)
   673  	if _, err := rand.Read(random); err != nil {
   674  		return "", err
   675  	}
   676  	path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...)))
   677  	return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
   678  }
   679  
   680  func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
   681  	//a simple container listing works 99.9% of the time
   682  	segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path})
   683  	if err != nil {
   684  		if err == swift.ContainerNotFound {
   685  			return nil, storagedriver.PathNotFoundError{Path: path}
   686  		}
   687  		return nil, err
   688  	}
   689  
   690  	//build a lookup table by object name
   691  	hasObjectName := make(map[string]struct{})
   692  	for _, segment := range segments {
   693  		hasObjectName[segment.Name] = struct{}{}
   694  	}
   695  
   696  	//The container listing might be outdated (i.e. not contain all existing
   697  	//segment objects yet) because of temporary inconsistency (Swift is only
   698  	//eventually consistent!). Check its completeness.
   699  	segmentNumber := 0
   700  	for {
   701  		segmentNumber++
   702  		segmentPath := getSegmentPath(path, segmentNumber)
   703  
   704  		if _, seen := hasObjectName[segmentPath]; seen {
   705  			continue
   706  		}
   707  
   708  		//This segment is missing in the container listing. Use a more reliable
   709  		//request to check its existence. (HEAD requests on segments are
   710  		//guaranteed to return the correct metadata, except for the pathological
   711  		//case of an outage of large parts of the Swift cluster or its network,
   712  		//since every segment is only written once.)
   713  		segment, _, err := d.Conn.Object(d.Container, segmentPath)
   714  		switch err {
   715  		case nil:
   716  			//found new segment -> keep going, more might be missing
   717  			segments = append(segments, segment)
   718  			continue
   719  		case swift.ObjectNotFound:
   720  			//This segment is missing. Since we upload segments sequentially,
   721  			//there won't be any more segments after it.
   722  			return segments, nil
   723  		default:
   724  			return nil, err //unexpected error
   725  		}
   726  	}
   727  }
   728  
   729  func (d *driver) createManifest(path string, segments string) error {
   730  	headers := make(swift.Headers)
   731  	headers["X-Object-Manifest"] = segments
   732  	manifest, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "", contentType, headers)
   733  	if err != nil {
   734  		if err == swift.ObjectNotFound {
   735  			return storagedriver.PathNotFoundError{Path: path}
   736  		}
   737  		return err
   738  	}
   739  	if err := manifest.Close(); err != nil {
   740  		if err == swift.ObjectNotFound {
   741  			return storagedriver.PathNotFoundError{Path: path}
   742  		}
   743  		return err
   744  	}
   745  	return nil
   746  }
   747  
   748  func chunkFilenames(slice []string, maxSize int) (chunks [][]string, err error) {
   749  	if maxSize > 0 {
   750  		for offset := 0; offset < len(slice); offset += maxSize {
   751  			chunkSize := maxSize
   752  			if offset+chunkSize > len(slice) {
   753  				chunkSize = len(slice) - offset
   754  			}
   755  			chunks = append(chunks, slice[offset:offset+chunkSize])
   756  		}
   757  	} else {
   758  		return nil, fmt.Errorf("max chunk size must be > 0")
   759  	}
   760  	return
   761  }
   762  
   763  func parseManifest(manifest string) (container string, prefix string) {
   764  	components := strings.SplitN(manifest, "/", 2)
   765  	container = components[0]
   766  	if len(components) > 1 {
   767  		prefix = components[1]
   768  	}
   769  	return container, prefix
   770  }
   771  
   772  func generateSecret() (string, error) {
   773  	var secretBytes [32]byte
   774  	if _, err := rand.Read(secretBytes[:]); err != nil {
   775  		return "", fmt.Errorf("could not generate random bytes for Swift secret key: %v", err)
   776  	}
   777  	return hex.EncodeToString(secretBytes[:]), nil
   778  }
   779  
   780  func getSegmentPath(segmentsPath string, partNumber int) string {
   781  	return fmt.Sprintf("%s/%016d", segmentsPath, partNumber)
   782  }
   783  
   784  type writer struct {
   785  	driver       *driver
   786  	path         string
   787  	segmentsPath string
   788  	size         int64
   789  	bw           *bufio.Writer
   790  	closed       bool
   791  	committed    bool
   792  	cancelled    bool
   793  }
   794  
   795  func (d *driver) newWriter(path, segmentsPath string, segments []swift.Object) storagedriver.FileWriter {
   796  	var size int64
   797  	for _, segment := range segments {
   798  		size += segment.Bytes
   799  	}
   800  	return &writer{
   801  		driver:       d,
   802  		path:         path,
   803  		segmentsPath: segmentsPath,
   804  		size:         size,
   805  		bw: bufio.NewWriterSize(&segmentWriter{
   806  			conn:          d.Conn,
   807  			container:     d.Container,
   808  			segmentsPath:  segmentsPath,
   809  			segmentNumber: len(segments) + 1,
   810  			maxChunkSize:  d.ChunkSize,
   811  		}, d.ChunkSize),
   812  	}
   813  }
   814  
   815  func (w *writer) Write(p []byte) (int, error) {
   816  	if w.closed {
   817  		return 0, fmt.Errorf("already closed")
   818  	} else if w.committed {
   819  		return 0, fmt.Errorf("already committed")
   820  	} else if w.cancelled {
   821  		return 0, fmt.Errorf("already cancelled")
   822  	}
   823  
   824  	n, err := w.bw.Write(p)
   825  	w.size += int64(n)
   826  	return n, err
   827  }
   828  
   829  func (w *writer) Size() int64 {
   830  	return w.size
   831  }
   832  
   833  func (w *writer) Close() error {
   834  	if w.closed {
   835  		return fmt.Errorf("already closed")
   836  	}
   837  
   838  	if err := w.bw.Flush(); err != nil {
   839  		return err
   840  	}
   841  
   842  	if !w.committed && !w.cancelled {
   843  		if err := w.driver.createManifest(w.path, w.driver.Container+"/"+w.segmentsPath); err != nil {
   844  			return err
   845  		}
   846  		if err := w.waitForSegmentsToShowUp(); err != nil {
   847  			return err
   848  		}
   849  	}
   850  	w.closed = true
   851  
   852  	return nil
   853  }
   854  
   855  func (w *writer) Cancel() error {
   856  	if w.closed {
   857  		return fmt.Errorf("already closed")
   858  	} else if w.committed {
   859  		return fmt.Errorf("already committed")
   860  	}
   861  	w.cancelled = true
   862  	return w.driver.Delete(context.Background(), w.path)
   863  }
   864  
   865  func (w *writer) Commit() error {
   866  	if w.closed {
   867  		return fmt.Errorf("already closed")
   868  	} else if w.committed {
   869  		return fmt.Errorf("already committed")
   870  	} else if w.cancelled {
   871  		return fmt.Errorf("already cancelled")
   872  	}
   873  
   874  	if err := w.bw.Flush(); err != nil {
   875  		return err
   876  	}
   877  
   878  	if err := w.driver.createManifest(w.path, w.driver.Container+"/"+w.segmentsPath); err != nil {
   879  		return err
   880  	}
   881  
   882  	w.committed = true
   883  	return w.waitForSegmentsToShowUp()
   884  }
   885  
   886  func (w *writer) waitForSegmentsToShowUp() error {
   887  	var err error
   888  	waitingTime := readAfterWriteWait
   889  	endTime := time.Now().Add(readAfterWriteTimeout)
   890  
   891  	for {
   892  		var info swift.Object
   893  		if info, _, err = w.driver.Conn.Object(w.driver.Container, w.driver.swiftPath(w.path)); err == nil {
   894  			if info.Bytes == w.size {
   895  				break
   896  			}
   897  			err = fmt.Errorf("timeout expired while waiting for segments of %s to show up", w.path)
   898  		}
   899  		if time.Now().Add(waitingTime).After(endTime) {
   900  			break
   901  		}
   902  		time.Sleep(waitingTime)
   903  		waitingTime *= 2
   904  	}
   905  
   906  	return err
   907  }
   908  
   909  type segmentWriter struct {
   910  	conn          *swift.Connection
   911  	container     string
   912  	segmentsPath  string
   913  	segmentNumber int
   914  	maxChunkSize  int
   915  }
   916  
   917  func (sw *segmentWriter) Write(p []byte) (int, error) {
   918  	n := 0
   919  	for offset := 0; offset < len(p); offset += sw.maxChunkSize {
   920  		chunkSize := sw.maxChunkSize
   921  		if offset+chunkSize > len(p) {
   922  			chunkSize = len(p) - offset
   923  		}
   924  		_, err := sw.conn.ObjectPut(sw.container, getSegmentPath(sw.segmentsPath, sw.segmentNumber), bytes.NewReader(p[offset:offset+chunkSize]), false, "", contentType, nil)
   925  		if err != nil {
   926  			return n, err
   927  		}
   928  
   929  		sw.segmentNumber++
   930  		n += chunkSize
   931  	}
   932  
   933  	return n, nil
   934  }
   935  

View as plain text