1
2
3 package azure
4
5 import (
6 "bufio"
7 "bytes"
8 "context"
9 "fmt"
10 "io"
11 "io/ioutil"
12 "net/http"
13 "strings"
14 "time"
15
16 storagedriver "github.com/docker/distribution/registry/storage/driver"
17 "github.com/docker/distribution/registry/storage/driver/base"
18 "github.com/docker/distribution/registry/storage/driver/factory"
19
20 azure "github.com/Azure/azure-sdk-for-go/storage"
21 )
22
23 const driverName = "azure"
24
25 const (
26 paramAccountName = "accountname"
27 paramAccountKey = "accountkey"
28 paramContainer = "container"
29 paramRealm = "realm"
30 maxChunkSize = 4 * 1024 * 1024
31 )
32
33 type driver struct {
34 client azure.BlobStorageClient
35 container string
36 }
37
38 type baseEmbed struct{ base.Base }
39
40
41
42 type Driver struct{ baseEmbed }
43
44 func init() {
45 factory.Register(driverName, &azureDriverFactory{})
46 }
47
48 type azureDriverFactory struct{}
49
50 func (factory *azureDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
51 return FromParameters(parameters)
52 }
53
54
55 func FromParameters(parameters map[string]interface{}) (*Driver, error) {
56 accountName, ok := parameters[paramAccountName]
57 if !ok || fmt.Sprint(accountName) == "" {
58 return nil, fmt.Errorf("no %s parameter provided", paramAccountName)
59 }
60
61 accountKey, ok := parameters[paramAccountKey]
62 if !ok || fmt.Sprint(accountKey) == "" {
63 return nil, fmt.Errorf("no %s parameter provided", paramAccountKey)
64 }
65
66 container, ok := parameters[paramContainer]
67 if !ok || fmt.Sprint(container) == "" {
68 return nil, fmt.Errorf("no %s parameter provided", paramContainer)
69 }
70
71 realm, ok := parameters[paramRealm]
72 if !ok || fmt.Sprint(realm) == "" {
73 realm = azure.DefaultBaseURL
74 }
75
76 return New(fmt.Sprint(accountName), fmt.Sprint(accountKey), fmt.Sprint(container), fmt.Sprint(realm))
77 }
78
79
80 func New(accountName, accountKey, container, realm string) (*Driver, error) {
81 api, err := azure.NewClient(accountName, accountKey, realm, azure.DefaultAPIVersion, true)
82 if err != nil {
83 return nil, err
84 }
85
86 blobClient := api.GetBlobService()
87
88
89 containerRef := blobClient.GetContainerReference(container)
90 if _, err = containerRef.CreateIfNotExists(nil); err != nil {
91 return nil, err
92 }
93
94 d := &driver{
95 client: blobClient,
96 container: container}
97 return &Driver{baseEmbed: baseEmbed{Base: base.Base{StorageDriver: d}}}, nil
98 }
99
100
101 func (d *driver) Name() string {
102 return driverName
103 }
104
105
106 func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
107 blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
108 blob, err := blobRef.Get(nil)
109 if err != nil {
110 if is404(err) {
111 return nil, storagedriver.PathNotFoundError{Path: path}
112 }
113 return nil, err
114 }
115
116 defer blob.Close()
117 return ioutil.ReadAll(blob)
118 }
119
120
121 func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
122
123
124 const limit = 256 * 1024 * 1024
125 if len(contents) > limit {
126 return fmt.Errorf("uploading %d bytes with PutContent is not supported; limit: %d bytes", len(contents), limit)
127 }
128
129
130
131
132
133
134
135
136
137
138
139
140 blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
141 err := blobRef.GetProperties(nil)
142 if err != nil && !is404(err) {
143 return fmt.Errorf("failed to get blob properties: %v", err)
144 }
145 if err == nil && blobRef.Properties.BlobType != azure.BlobTypeBlock {
146 if err := blobRef.Delete(nil); err != nil {
147 return fmt.Errorf("failed to delete legacy blob (%s): %v", blobRef.Properties.BlobType, err)
148 }
149 }
150
151 r := bytes.NewReader(contents)
152
153 blobRef.Properties = azure.BlobProperties{}
154 return blobRef.CreateBlockBlobFromReader(r, nil)
155 }
156
157
158
159 func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
160 blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
161 if ok, err := blobRef.Exists(); err != nil {
162 return nil, err
163 } else if !ok {
164 return nil, storagedriver.PathNotFoundError{Path: path}
165 }
166
167 err := blobRef.GetProperties(nil)
168 if err != nil {
169 return nil, err
170 }
171 info := blobRef.Properties
172 size := info.ContentLength
173 if offset >= size {
174 return ioutil.NopCloser(bytes.NewReader(nil)), nil
175 }
176
177 resp, err := blobRef.GetRange(&azure.GetBlobRangeOptions{
178 Range: &azure.BlobRange{
179 Start: uint64(offset),
180 End: 0,
181 },
182 })
183 if err != nil {
184 return nil, err
185 }
186 return resp, nil
187 }
188
189
190
191 func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
192 blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
193 blobExists, err := blobRef.Exists()
194 if err != nil {
195 return nil, err
196 }
197 var size int64
198 if blobExists {
199 if append {
200 err = blobRef.GetProperties(nil)
201 if err != nil {
202 return nil, err
203 }
204 blobProperties := blobRef.Properties
205 size = blobProperties.ContentLength
206 } else {
207 err = blobRef.Delete(nil)
208 if err != nil {
209 return nil, err
210 }
211 }
212 } else {
213 if append {
214 return nil, storagedriver.PathNotFoundError{Path: path}
215 }
216 err = blobRef.PutAppendBlob(nil)
217 if err != nil {
218 return nil, err
219 }
220 }
221
222 return d.newWriter(path, size), nil
223 }
224
225
226
227 func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
228 blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
229
230 if ok, err := blobRef.Exists(); err != nil {
231 return nil, err
232 } else if ok {
233 err = blobRef.GetProperties(nil)
234 if err != nil {
235 return nil, err
236 }
237 blobProperties := blobRef.Properties
238
239 return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
240 Path: path,
241 Size: blobProperties.ContentLength,
242 ModTime: time.Time(blobProperties.LastModified),
243 IsDir: false,
244 }}, nil
245 }
246
247
248 virtContainerPath := path
249 if !strings.HasSuffix(virtContainerPath, "/") {
250 virtContainerPath += "/"
251 }
252
253 containerRef := d.client.GetContainerReference(d.container)
254 blobs, err := containerRef.ListBlobs(azure.ListBlobsParameters{
255 Prefix: virtContainerPath,
256 MaxResults: 1,
257 })
258 if err != nil {
259 return nil, err
260 }
261 if len(blobs.Blobs) > 0 {
262
263 return storagedriver.FileInfoInternal{FileInfoFields: storagedriver.FileInfoFields{
264 Path: path,
265 IsDir: true,
266 }}, nil
267 }
268
269
270 return nil, storagedriver.PathNotFoundError{Path: path}
271 }
272
273
274
275 func (d *driver) List(ctx context.Context, path string) ([]string, error) {
276 if path == "/" {
277 path = ""
278 }
279
280 blobs, err := d.listBlobs(d.container, path)
281 if err != nil {
282 return blobs, err
283 }
284
285 list := directDescendants(blobs, path)
286 if path != "" && len(list) == 0 {
287 return nil, storagedriver.PathNotFoundError{Path: path}
288 }
289 return list, nil
290 }
291
292
293
294 func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
295 srcBlobRef := d.client.GetContainerReference(d.container).GetBlobReference(sourcePath)
296 sourceBlobURL := srcBlobRef.GetURL()
297 destBlobRef := d.client.GetContainerReference(d.container).GetBlobReference(destPath)
298 err := destBlobRef.Copy(sourceBlobURL, nil)
299 if err != nil {
300 if is404(err) {
301 return storagedriver.PathNotFoundError{Path: sourcePath}
302 }
303 return err
304 }
305
306 return srcBlobRef.Delete(nil)
307 }
308
309
310 func (d *driver) Delete(ctx context.Context, path string) error {
311 blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
312 ok, err := blobRef.DeleteIfExists(nil)
313 if err != nil {
314 return err
315 }
316 if ok {
317 return nil
318 }
319
320
321 blobs, err := d.listBlobs(d.container, path)
322 if err != nil {
323 return err
324 }
325
326 for _, b := range blobs {
327 blobRef = d.client.GetContainerReference(d.container).GetBlobReference(b)
328 if err = blobRef.Delete(nil); err != nil {
329 return err
330 }
331 }
332
333 if len(blobs) == 0 {
334 return storagedriver.PathNotFoundError{Path: path}
335 }
336 return nil
337 }
338
339
340
341
342 func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
343 expiresTime := time.Now().UTC().Add(20 * time.Minute)
344 expires, ok := options["expiry"]
345 if ok {
346 t, ok := expires.(time.Time)
347 if ok {
348 expiresTime = t
349 }
350 }
351 blobRef := d.client.GetContainerReference(d.container).GetBlobReference(path)
352 return blobRef.GetSASURI(azure.BlobSASOptions{
353 BlobServiceSASPermissions: azure.BlobServiceSASPermissions{
354 Read: true,
355 },
356 SASOptions: azure.SASOptions{
357 Expiry: expiresTime,
358 },
359 })
360 }
361
362
363
364 func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
365 return storagedriver.WalkFallback(ctx, d, path, f)
366 }
367
368
369
370
371
372
373
374 func directDescendants(blobs []string, prefix string) []string {
375 if !strings.HasPrefix(prefix, "/") {
376 prefix = "/" + prefix
377 }
378 if !strings.HasSuffix(prefix, "/") {
379 prefix += "/"
380 }
381
382 out := make(map[string]bool)
383 for _, b := range blobs {
384 if strings.HasPrefix(b, prefix) {
385 rel := b[len(prefix):]
386 c := strings.Count(rel, "/")
387 if c == 0 {
388 out[b] = true
389 } else {
390 out[prefix+rel[:strings.Index(rel, "/")]] = true
391 }
392 }
393 }
394
395 var keys []string
396 for k := range out {
397 keys = append(keys, k)
398 }
399 return keys
400 }
401
402 func (d *driver) listBlobs(container, virtPath string) ([]string, error) {
403 if virtPath != "" && !strings.HasSuffix(virtPath, "/") {
404 virtPath += "/"
405 }
406
407 out := []string{}
408 marker := ""
409 containerRef := d.client.GetContainerReference(d.container)
410 for {
411 resp, err := containerRef.ListBlobs(azure.ListBlobsParameters{
412 Marker: marker,
413 Prefix: virtPath,
414 })
415
416 if err != nil {
417 return out, err
418 }
419
420 for _, b := range resp.Blobs {
421 out = append(out, b.Name)
422 }
423
424 if len(resp.Blobs) == 0 || resp.NextMarker == "" {
425 break
426 }
427 marker = resp.NextMarker
428 }
429 return out, nil
430 }
431
432 func is404(err error) bool {
433 statusCodeErr, ok := err.(azure.AzureStorageServiceError)
434 return ok && statusCodeErr.StatusCode == http.StatusNotFound
435 }
436
437 type writer struct {
438 driver *driver
439 path string
440 size int64
441 bw *bufio.Writer
442 closed bool
443 committed bool
444 cancelled bool
445 }
446
447 func (d *driver) newWriter(path string, size int64) storagedriver.FileWriter {
448 return &writer{
449 driver: d,
450 path: path,
451 size: size,
452 bw: bufio.NewWriterSize(&blockWriter{
453 client: d.client,
454 container: d.container,
455 path: path,
456 }, maxChunkSize),
457 }
458 }
459
460 func (w *writer) Write(p []byte) (int, error) {
461 if w.closed {
462 return 0, fmt.Errorf("already closed")
463 } else if w.committed {
464 return 0, fmt.Errorf("already committed")
465 } else if w.cancelled {
466 return 0, fmt.Errorf("already cancelled")
467 }
468
469 n, err := w.bw.Write(p)
470 w.size += int64(n)
471 return n, err
472 }
473
474 func (w *writer) Size() int64 {
475 return w.size
476 }
477
478 func (w *writer) Close() error {
479 if w.closed {
480 return fmt.Errorf("already closed")
481 }
482 w.closed = true
483 return w.bw.Flush()
484 }
485
486 func (w *writer) Cancel() error {
487 if w.closed {
488 return fmt.Errorf("already closed")
489 } else if w.committed {
490 return fmt.Errorf("already committed")
491 }
492 w.cancelled = true
493 blobRef := w.driver.client.GetContainerReference(w.driver.container).GetBlobReference(w.path)
494 return blobRef.Delete(nil)
495 }
496
497 func (w *writer) Commit() error {
498 if w.closed {
499 return fmt.Errorf("already closed")
500 } else if w.committed {
501 return fmt.Errorf("already committed")
502 } else if w.cancelled {
503 return fmt.Errorf("already cancelled")
504 }
505 w.committed = true
506 return w.bw.Flush()
507 }
508
509 type blockWriter struct {
510 client azure.BlobStorageClient
511 container string
512 path string
513 }
514
515 func (bw *blockWriter) Write(p []byte) (int, error) {
516 n := 0
517 blobRef := bw.client.GetContainerReference(bw.container).GetBlobReference(bw.path)
518 for offset := 0; offset < len(p); offset += maxChunkSize {
519 chunkSize := maxChunkSize
520 if offset+chunkSize > len(p) {
521 chunkSize = len(p) - offset
522 }
523 err := blobRef.AppendBlock(p[offset:offset+chunkSize], nil)
524 if err != nil {
525 return n, err
526 }
527
528 n += chunkSize
529 }
530
531 return n, nil
532 }
533
View as plain text