...

Source file src/github.com/docker/distribution/registry/storage/driver/azure/azure.go

Documentation: github.com/docker/distribution/registry/storage/driver/azure

     1  // Package azure provides a storagedriver.StorageDriver implementation to
     2  // store blobs in Microsoft Azure Blob Storage Service.
     3  package azure
     4  
     5  import (
     6  	"bufio"
     7  	"bytes"
     8  	"context"
     9  	"fmt"
    10  	"io"
    11  	"io/ioutil"
    12  	"net/http"
    13  	"strings"
    14  	"time"
    15  
    16  	storagedriver "github.com/docker/distribution/registry/storage/driver"
    17  	"github.com/docker/distribution/registry/storage/driver/base"
    18  	"github.com/docker/distribution/registry/storage/driver/factory"
    19  
    20  	azure "github.com/Azure/azure-sdk-for-go/storage"
    21  )
    22  
    23  const driverName = "azure"
    24  
    25  const (
    26  	paramAccountName = "accountname"
    27  	paramAccountKey  = "accountkey"
    28  	paramContainer   = "container"
    29  	paramRealm       = "realm"
    30  	maxChunkSize     = 4 * 1024 * 1024
    31  )
    32  
    33  type driver struct {
    34  	client    azure.BlobStorageClient
    35  	container string
    36  }
    37  
    38  type baseEmbed struct{ base.Base }
    39  
    40  // Driver is a storagedriver.StorageDriver implementation backed by
    41  // Microsoft Azure Blob Storage Service.
    42  type Driver struct{ baseEmbed }
    43  
    44  func init() {
    45  	factory.Register(driverName, &azureDriverFactory{})
    46  }
    47  
    48  type azureDriverFactory struct{}
    49  
    50  func (factory *azureDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
    51  	return FromParameters(parameters)
    52  }
    53  
    54  // FromParameters constructs a new Driver with a given parameters map.
    55  func FromParameters(parameters map[string]interface{}) (*Driver, error) {
    56  	accountName, ok := parameters[paramAccountName]
    57  	if !ok || fmt.Sprint(accountName) == "" {
    58  		return nil, fmt.Errorf("no %s parameter provided", paramAccountName)
    59  	}
    60  
    61  	accountKey, ok := parameters[paramAccountKey]
    62  	if !ok || fmt.Sprint(accountKey) == "" {
    63  		return nil, fmt.Errorf("no %s parameter provided", paramAccountKey)
    64  	}
    65  
    66  	container, ok := parameters[paramContainer]
    67  	if !ok || fmt.Sprint(container) == "" {
    68  		return nil, fmt.Errorf("no %s parameter provided", paramContainer)
    69  	}
    70  
    71  	realm, ok := parameters[paramRealm]
    72  	if !ok || fmt.Sprint(realm) == "" {
    73  		realm = azure.DefaultBaseURL
    74  	}
    75  
    76  	return New(fmt.Sprint(accountName), fmt.Sprint(accountKey), fmt.Sprint(container), fmt.Sprint(realm))
    77  }
    78  
    79  // New constructs a new Driver with the given Azure Storage Account credentials
    80  func New(accountName, accountKey, container, realm string) (*Driver, error) {
    81  	api, err := azure.NewClient(accountName, accountKey, realm, azure.DefaultAPIVersion, true)
    82  	if err != nil {
    83  		return nil, err
    84  	}
    85  
    86  	blobClient := api.GetBlobService()
    87  
    88  	// Create registry container
    89  	containerRef := blobClient.GetContainerReference(container)
    90  	if _, err = containerRef.CreateIfNotExists(nil); err != nil {
    91  		return nil, err
    92  	}
    93  
    94  	d := &driver{
    95  		client:    blobClient,
    96  		container: container}
    97  	return &Driver{baseEmbed: baseEmbed{Base: base.Base{StorageDriver: d}}}, nil
    98  }
    99  
   100  // Implement the storagedriver.StorageDriver interface.
   101  func (d *driver) Name() string {
   102  	return driverName
   103  }
   104  
   105  // GetContent retrieves the content stored at "path" as a []byte.
   106  func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
   107  	blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
   108  	blob, err := blobRef.Get(nil)
   109  	if err != nil {
   110  		if is404(err) {
   111  			return nil, storagedriver.PathNotFoundError{Path: path}
   112  		}
   113  		return nil, err
   114  	}
   115  
   116  	defer blob.Close()
   117  	return ioutil.ReadAll(blob)
   118  }
   119  
   120  // PutContent stores the []byte content at a location designated by "path".
   121  func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
   122  	// max size for block blobs uploaded via single "Put Blob" for version after "2016-05-31"
   123  	// https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob#remarks
   124  	const limit = 256 * 1024 * 1024
   125  	if len(contents) > limit {
   126  		return fmt.Errorf("uploading %d bytes with PutContent is not supported; limit: %d bytes", len(contents), limit)
   127  	}
   128  
   129  	// Historically, blobs uploaded via PutContent used to be of type AppendBlob
   130  	// (https://github.com/docker/distribution/pull/1438). We can't replace
   131  	// these blobs atomically via a single "Put Blob" operation without
   132  	// deleting them first. Once we detect they are BlockBlob type, we can
   133  	// overwrite them with an atomically "Put Blob" operation.
   134  	//
   135  	// While we delete the blob and create a new one, there will be a small
   136  	// window of inconsistency and if the Put Blob fails, we may end up with
   137  	// losing the existing data while migrating it to BlockBlob type. However,
   138  	// expectation is the clients pushing will be retrying when they get an error
   139  	// response.
   140  	blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
   141  	err := blobRef.GetProperties(nil)
   142  	if err != nil && !is404(err) {
   143  		return fmt.Errorf("failed to get blob properties: %v", err)
   144  	}
   145  	if err == nil && blobRef.Properties.BlobType != azure.BlobTypeBlock {
   146  		if err := blobRef.Delete(nil); err != nil {
   147  			return fmt.Errorf("failed to delete legacy blob (%s): %v", blobRef.Properties.BlobType, err)
   148  		}
   149  	}
   150  
   151  	r := bytes.NewReader(contents)
   152  	// reset properties to empty before doing overwrite
   153  	blobRef.Properties = azure.BlobProperties{}
   154  	return blobRef.CreateBlockBlobFromReader(r, nil)
   155  }
   156  
   157  // Reader retrieves an io.ReadCloser for the content stored at "path" with a
   158  // given byte offset.
   159  func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
   160  	blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
   161  	if ok, err := blobRef.Exists(); err != nil {
   162  		return nil, err
   163  	} else if !ok {
   164  		return nil, storagedriver.PathNotFoundError{Path: path}
   165  	}
   166  
   167  	err := blobRef.GetProperties(nil)
   168  	if err != nil {
   169  		return nil, err
   170  	}
   171  	info := blobRef.Properties
   172  	size := info.ContentLength
   173  	if offset >= size {
   174  		return ioutil.NopCloser(bytes.NewReader(nil)), nil
   175  	}
   176  
   177  	resp, err := blobRef.GetRange(&azure.GetBlobRangeOptions{
   178  		Range: &azure.BlobRange{
   179  			Start: uint64(offset),
   180  			End:   0,
   181  		},
   182  	})
   183  	if err != nil {
   184  		return nil, err
   185  	}
   186  	return resp, nil
   187  }
   188  
   189  // Writer returns a FileWriter which will store the content written to it
   190  // at the location designated by "path" after the call to Commit.
   191  func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
   192  	blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
   193  	blobExists, err := blobRef.Exists()
   194  	if err != nil {
   195  		return nil, err
   196  	}
   197  	var size int64
   198  	if blobExists {
   199  		if append {
   200  			err = blobRef.GetProperties(nil)
   201  			if err != nil {
   202  				return nil, err
   203  			}
   204  			blobProperties := blobRef.Properties
   205  			size = blobProperties.ContentLength
   206  		} else {
   207  			err = blobRef.Delete(nil)
   208  			if err != nil {
   209  				return nil, err
   210  			}
   211  		}
   212  	} else {
   213  		if append {
   214  			return nil, storagedriver.PathNotFoundError{Path: path}
   215  		}
   216  		err = blobRef.PutAppendBlob(nil)
   217  		if err != nil {
   218  			return nil, err
   219  		}
   220  	}
   221  
   222  	return d.newWriter(path, size), nil
   223  }
   224  
   225  // Stat retrieves the FileInfo for the given path, including the current size
   226  // in bytes and the creation time.
   227  func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
   228  	blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
   229  	// Check if the path is a blob
   230  	if ok, err := blobRef.Exists(); err != nil {
   231  		return nil, err
   232  	} else if ok {
   233  		err = blobRef.GetProperties(nil)
   234  		if err != nil {
   235  			return nil, err
   236  		}
   237  		blobProperties := blobRef.Properties
   238  
   239  		return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
   240  			Path:    path,
   241  			Size:    blobProperties.ContentLength,
   242  			ModTime: time.Time(blobProperties.LastModified),
   243  			IsDir:   false,
   244  		}}, nil
   245  	}
   246  
   247  	// Check if path is a virtual container
   248  	virtContainerPath := path
   249  	if !strings.HasSuffix(virtContainerPath, "/") {
   250  		virtContainerPath += "/"
   251  	}
   252  
   253  	containerRef := d.client.GetContainerReference(d.container)
   254  	blobs, err := containerRef.ListBlobs(azure.ListBlobsParameters{
   255  		Prefix:     virtContainerPath,
   256  		MaxResults: 1,
   257  	})
   258  	if err != nil {
   259  		return nil, err
   260  	}
   261  	if len(blobs.Blobs) > 0 {
   262  		// path is a virtual container
   263  		return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
   264  			Path:  path,
   265  			IsDir: true,
   266  		}}, nil
   267  	}
   268  
   269  	// path is not a blob or virtual container
   270  	return nil, storagedriver.PathNotFoundError{Path: path}
   271  }
   272  
   273  // List returns a list of the objects that are direct descendants of the given
   274  // path.
   275  func (d *driver) List(ctx context.Context, path string) ([]string, error) {
   276  	if path == "/" {
   277  		path = ""
   278  	}
   279  
   280  	blobs, err := d.listBlobs(d.container, path)
   281  	if err != nil {
   282  		return blobs, err
   283  	}
   284  
   285  	list := directDescendants(blobs, path)
   286  	if path != "" && len(list) == 0 {
   287  		return nil, storagedriver.PathNotFoundError{Path: path}
   288  	}
   289  	return list, nil
   290  }
   291  
   292  // Move moves an object stored at sourcePath to destPath, removing the original
   293  // object.
   294  func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
   295  	srcBlobRef := d.client.GetContainerReference(d.container).GetBlobReference(sourcePath)
   296  	sourceBlobURL := srcBlobRef.GetURL()
   297  	destBlobRef := d.client.GetContainerReference(d.container).GetBlobReference(destPath)
   298  	err := destBlobRef.Copy(sourceBlobURL, nil)
   299  	if err != nil {
   300  		if is404(err) {
   301  			return storagedriver.PathNotFoundError{Path: sourcePath}
   302  		}
   303  		return err
   304  	}
   305  
   306  	return srcBlobRef.Delete(nil)
   307  }
   308  
   309  // Delete recursively deletes all objects stored at "path" and its subpaths.
   310  func (d *driver) Delete(ctx context.Context, path string) error {
   311  	blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
   312  	ok, err := blobRef.DeleteIfExists(nil)
   313  	if err != nil {
   314  		return err
   315  	}
   316  	if ok {
   317  		return nil // was a blob and deleted, return
   318  	}
   319  
   320  	// Not a blob, see if path is a virtual container with blobs
   321  	blobs, err := d.listBlobs(d.container, path)
   322  	if err != nil {
   323  		return err
   324  	}
   325  
   326  	for _, b := range blobs {
   327  		blobRef = d.client.GetContainerReference(d.container).GetBlobReference(b)
   328  		if err = blobRef.Delete(nil); err != nil {
   329  			return err
   330  		}
   331  	}
   332  
   333  	if len(blobs) == 0 {
   334  		return storagedriver.PathNotFoundError{Path: path}
   335  	}
   336  	return nil
   337  }
   338  
   339  // URLFor returns a publicly accessible URL for the blob stored at given path
   340  // for specified duration by making use of Azure Storage Shared Access Signatures (SAS).
   341  // See https://msdn.microsoft.com/en-us/library/azure/ee395415.aspx for more info.
   342  func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
   343  	expiresTime := time.Now().UTC().Add(20 * time.Minute) // default expiration
   344  	expires, ok := options["expiry"]
   345  	if ok {
   346  		t, ok := expires.(time.Time)
   347  		if ok {
   348  			expiresTime = t
   349  		}
   350  	}
   351  	blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
   352  	return blobRef.GetSASURI(azure.BlobSASOptions{
   353  		BlobServiceSASPermissions: azure.BlobServiceSASPermissions{
   354  			Read: true,
   355  		},
   356  		SASOptions: azure.SASOptions{
   357  			Expiry: expiresTime,
   358  		},
   359  	})
   360  }
   361  
   362  // Walk traverses a filesystem defined within driver, starting
   363  // from the given path, calling f on each file
   364  func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
   365  	return storagedriver.WalkFallback(ctx, d, path, f)
   366  }
   367  
   368  // directDescendants will find direct descendants (blobs or virtual containers)
   369  // of from list of blob paths and will return their full paths. Elements in blobs
   370  // list must be prefixed with a "/" and
   371  //
   372  // Example: direct descendants of "/" in {"/foo", "/bar/1", "/bar/2"} is
   373  // {"/foo", "/bar"} and direct descendants of "bar" is {"/bar/1", "/bar/2"}
   374  func directDescendants(blobs []string, prefix string) []string {
   375  	if !strings.HasPrefix(prefix, "/") { // add trailing '/'
   376  		prefix = "/" + prefix
   377  	}
   378  	if !strings.HasSuffix(prefix, "/") { // containerify the path
   379  		prefix += "/"
   380  	}
   381  
   382  	out := make(map[string]bool)
   383  	for _, b := range blobs {
   384  		if strings.HasPrefix(b, prefix) {
   385  			rel := b[len(prefix):]
   386  			c := strings.Count(rel, "/")
   387  			if c == 0 {
   388  				out[b] = true
   389  			} else {
   390  				out[prefix+rel[:strings.Index(rel, "/")]] = true
   391  			}
   392  		}
   393  	}
   394  
   395  	var keys []string
   396  	for k := range out {
   397  		keys = append(keys, k)
   398  	}
   399  	return keys
   400  }
   401  
   402  func (d *driver) listBlobs(container, virtPath string) ([]string, error) {
   403  	if virtPath != "" && !strings.HasSuffix(virtPath, "/") { // containerify the path
   404  		virtPath += "/"
   405  	}
   406  
   407  	out := []string{}
   408  	marker := ""
   409  	containerRef := d.client.GetContainerReference(d.container)
   410  	for {
   411  		resp, err := containerRef.ListBlobs(azure.ListBlobsParameters{
   412  			Marker: marker,
   413  			Prefix: virtPath,
   414  		})
   415  
   416  		if err != nil {
   417  			return out, err
   418  		}
   419  
   420  		for _, b := range resp.Blobs {
   421  			out = append(out, b.Name)
   422  		}
   423  
   424  		if len(resp.Blobs) == 0 || resp.NextMarker == "" {
   425  			break
   426  		}
   427  		marker = resp.NextMarker
   428  	}
   429  	return out, nil
   430  }
   431  
   432  func is404(err error) bool {
   433  	statusCodeErr, ok := err.(azure.AzureStorageServiceError)
   434  	return ok && statusCodeErr.StatusCode == http.StatusNotFound
   435  }
   436  
   437  type writer struct {
   438  	driver    *driver
   439  	path      string
   440  	size      int64
   441  	bw        *bufio.Writer
   442  	closed    bool
   443  	committed bool
   444  	cancelled bool
   445  }
   446  
   447  func (d *driver) newWriter(path string, size int64) storagedriver.FileWriter {
   448  	return &writer{
   449  		driver: d,
   450  		path:   path,
   451  		size:   size,
   452  		bw: bufio.NewWriterSize(&blockWriter{
   453  			client:    d.client,
   454  			container: d.container,
   455  			path:      path,
   456  		}, maxChunkSize),
   457  	}
   458  }
   459  
   460  func (w *writer) Write(p []byte) (int, error) {
   461  	if w.closed {
   462  		return 0, fmt.Errorf("already closed")
   463  	} else if w.committed {
   464  		return 0, fmt.Errorf("already committed")
   465  	} else if w.cancelled {
   466  		return 0, fmt.Errorf("already cancelled")
   467  	}
   468  
   469  	n, err := w.bw.Write(p)
   470  	w.size += int64(n)
   471  	return n, err
   472  }
   473  
   474  func (w *writer) Size() int64 {
   475  	return w.size
   476  }
   477  
   478  func (w *writer) Close() error {
   479  	if w.closed {
   480  		return fmt.Errorf("already closed")
   481  	}
   482  	w.closed = true
   483  	return w.bw.Flush()
   484  }
   485  
   486  func (w *writer) Cancel() error {
   487  	if w.closed {
   488  		return fmt.Errorf("already closed")
   489  	} else if w.committed {
   490  		return fmt.Errorf("already committed")
   491  	}
   492  	w.cancelled = true
   493  	blobRef := w.driver.client.GetContainerReference(w.driver.container).GetBlobReference(w.path)
   494  	return blobRef.Delete(nil)
   495  }
   496  
   497  func (w *writer) Commit() error {
   498  	if w.closed {
   499  		return fmt.Errorf("already closed")
   500  	} else if w.committed {
   501  		return fmt.Errorf("already committed")
   502  	} else if w.cancelled {
   503  		return fmt.Errorf("already cancelled")
   504  	}
   505  	w.committed = true
   506  	return w.bw.Flush()
   507  }
   508  
   509  type blockWriter struct {
   510  	client    azure.BlobStorageClient
   511  	container string
   512  	path      string
   513  }
   514  
   515  func (bw *blockWriter) Write(p []byte) (int, error) {
   516  	n := 0
   517  	blobRef := bw.client.GetContainerReference(bw.container).GetBlobReference(bw.path)
   518  	for offset := 0; offset < len(p); offset += maxChunkSize {
   519  		chunkSize := maxChunkSize
   520  		if offset+chunkSize > len(p) {
   521  			chunkSize = len(p) - offset
   522  		}
   523  		err := blobRef.AppendBlock(p[offset:offset+chunkSize], nil)
   524  		if err != nil {
   525  			return n, err
   526  		}
   527  
   528  		n += chunkSize
   529  	}
   530  
   531  	return n, nil
   532  }
   533  

View as plain text