1
2
3
4
5
6
7
8
9
10
11
12 package oss
13
14 import (
15 "bytes"
16 "context"
17 "fmt"
18 "io"
19 "io/ioutil"
20 "net/http"
21 "reflect"
22 "strconv"
23 "strings"
24 "time"
25
26 "github.com/denverdino/aliyungo/oss"
27 storagedriver "github.com/docker/distribution/registry/storage/driver"
28 "github.com/docker/distribution/registry/storage/driver/base"
29 "github.com/docker/distribution/registry/storage/driver/factory"
30 "github.com/sirupsen/logrus"
31 )
32
33 const driverName = "oss"
34
35
36
37 const minChunkSize = 5 << 20
38
39 const defaultChunkSize = 2 * minChunkSize
40
41
42 const listMax = 1000
43
44
45 type DriverParameters struct {
46 AccessKeyID string
47 AccessKeySecret string
48 Bucket string
49 Region oss.Region
50 Internal bool
51 Encrypt bool
52 Secure bool
53 ChunkSize int64
54 RootDirectory string
55 Endpoint string
56 }
57
58 func init() {
59 factory.Register(driverName, &ossDriverFactory{})
60 }
61
62
63 type ossDriverFactory struct{}
64
65 func (factory *ossDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
66 return FromParameters(parameters)
67 }
68
69 var _ storagedriver.StorageDriver = &driver{}
70
71 type driver struct {
72 Client *oss.Client
73 Bucket *oss.Bucket
74 ChunkSize int64
75 Encrypt bool
76 RootDirectory string
77 }
78
79 type baseEmbed struct {
80 base.Base
81 }
82
83
84
85 type Driver struct {
86 baseEmbed
87 }
88
89
90
91
92
93
94
95
96 func FromParameters(parameters map[string]interface{}) (*Driver, error) {
97
98
99 accessKey, ok := parameters["accesskeyid"]
100 if !ok {
101 return nil, fmt.Errorf("No accesskeyid parameter provided")
102 }
103 secretKey, ok := parameters["accesskeysecret"]
104 if !ok {
105 return nil, fmt.Errorf("No accesskeysecret parameter provided")
106 }
107
108 regionName, ok := parameters["region"]
109 if !ok || fmt.Sprint(regionName) == "" {
110 return nil, fmt.Errorf("No region parameter provided")
111 }
112
113 bucket, ok := parameters["bucket"]
114 if !ok || fmt.Sprint(bucket) == "" {
115 return nil, fmt.Errorf("No bucket parameter provided")
116 }
117
118 internalBool := false
119 internal, ok := parameters["internal"]
120 if ok {
121 internalBool, ok = internal.(bool)
122 if !ok {
123 return nil, fmt.Errorf("The internal parameter should be a boolean")
124 }
125 }
126
127 encryptBool := false
128 encrypt, ok := parameters["encrypt"]
129 if ok {
130 encryptBool, ok = encrypt.(bool)
131 if !ok {
132 return nil, fmt.Errorf("The encrypt parameter should be a boolean")
133 }
134 }
135
136 secureBool := true
137 secure, ok := parameters["secure"]
138 if ok {
139 secureBool, ok = secure.(bool)
140 if !ok {
141 return nil, fmt.Errorf("The secure parameter should be a boolean")
142 }
143 }
144
145 chunkSize := int64(defaultChunkSize)
146 chunkSizeParam, ok := parameters["chunksize"]
147 if ok {
148 switch v := chunkSizeParam.(type) {
149 case string:
150 vv, err := strconv.ParseInt(v, 0, 64)
151 if err != nil {
152 return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
153 }
154 chunkSize = vv
155 case int64:
156 chunkSize = v
157 case int, uint, int32, uint32, uint64:
158 chunkSize = reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int()
159 default:
160 return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam)
161 }
162
163 if chunkSize < minChunkSize {
164 return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize)
165 }
166 }
167
168 rootDirectory, ok := parameters["rootdirectory"]
169 if !ok {
170 rootDirectory = ""
171 }
172
173 endpoint, ok := parameters["endpoint"]
174 if !ok {
175 endpoint = ""
176 }
177
178 params := DriverParameters{
179 AccessKeyID: fmt.Sprint(accessKey),
180 AccessKeySecret: fmt.Sprint(secretKey),
181 Bucket: fmt.Sprint(bucket),
182 Region: oss.Region(fmt.Sprint(regionName)),
183 ChunkSize: chunkSize,
184 RootDirectory: fmt.Sprint(rootDirectory),
185 Encrypt: encryptBool,
186 Secure: secureBool,
187 Internal: internalBool,
188 Endpoint: fmt.Sprint(endpoint),
189 }
190
191 return New(params)
192 }
193
194
195
196 func New(params DriverParameters) (*Driver, error) {
197
198 client := oss.NewOSSClient(params.Region, params.Internal, params.AccessKeyID, params.AccessKeySecret, params.Secure)
199 client.SetEndpoint(params.Endpoint)
200 bucket := client.Bucket(params.Bucket)
201 client.SetDebug(false)
202
203
204
205 if _, err := bucket.List(strings.TrimRight(params.RootDirectory, "/"), "", "", 1); err != nil {
206 return nil, err
207 }
208
209
210
211
212 d := &driver{
213 Client: client,
214 Bucket: bucket,
215 ChunkSize: params.ChunkSize,
216 Encrypt: params.Encrypt,
217 RootDirectory: params.RootDirectory,
218 }
219
220 return &Driver{
221 baseEmbed: baseEmbed{
222 Base: base.Base{
223 StorageDriver: d,
224 },
225 },
226 }, nil
227 }
228
229
230
231 func (d *driver) Name() string {
232 return driverName
233 }
234
235
236 func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
237 content, err := d.Bucket.Get(d.ossPath(path))
238 if err != nil {
239 return nil, parseError(path, err)
240 }
241 return content, nil
242 }
243
244
245 func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
246 return parseError(path, d.Bucket.Put(d.ossPath(path), contents, d.getContentType(), getPermissions(), d.getOptions()))
247 }
248
249
250
251 func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
252 headers := make(http.Header)
253 headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
254
255 resp, err := d.Bucket.GetResponseWithHeaders(d.ossPath(path), headers)
256 if err != nil {
257 return nil, parseError(path, err)
258 }
259
260
261
262
263
264 if resp.StatusCode != http.StatusPartialContent {
265 resp.Body.Close()
266 return ioutil.NopCloser(bytes.NewReader(nil)), nil
267 }
268
269 return resp.Body, nil
270 }
271
272
273
274 func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
275 key := d.ossPath(path)
276 if !append {
277
278 multi, err := d.Bucket.InitMulti(key, d.getContentType(), getPermissions(), d.getOptions())
279 if err != nil {
280 return nil, err
281 }
282 return d.newWriter(key, multi, nil), nil
283 }
284 multis, _, err := d.Bucket.ListMulti(key, "")
285 if err != nil {
286 return nil, parseError(path, err)
287 }
288 for _, multi := range multis {
289 if key != multi.Key {
290 continue
291 }
292 parts, err := multi.ListParts()
293 if err != nil {
294 return nil, parseError(path, err)
295 }
296 var multiSize int64
297 for _, part := range parts {
298 multiSize += part.Size
299 }
300 return d.newWriter(key, multi, parts), nil
301 }
302 return nil, storagedriver.PathNotFoundError{Path: path}
303 }
304
305
306
307 func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
308 listResponse, err := d.Bucket.List(d.ossPath(path), "", "", 1)
309 if err != nil {
310 return nil, err
311 }
312
313 fi := storagedriver.FileInfoFields{
314 Path: path,
315 }
316
317 if len(listResponse.Contents) == 1 {
318 if listResponse.Contents[0].Key != d.ossPath(path) {
319 fi.IsDir = true
320 } else {
321 fi.IsDir = false
322 fi.Size = listResponse.Contents[0].Size
323
324 timestamp, err := time.Parse(time.RFC3339Nano, listResponse.Contents[0].LastModified)
325 if err != nil {
326 return nil, err
327 }
328 fi.ModTime = timestamp
329 }
330 } else if len(listResponse.CommonPrefixes) == 1 {
331 fi.IsDir = true
332 } else {
333 return nil, storagedriver.PathNotFoundError{Path: path}
334 }
335
336 return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
337 }
338
339
340 func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
341 path := opath
342 if path != "/" && opath[len(path)-1] != '/' {
343 path = path + "/"
344 }
345
346
347
348
349 prefix := ""
350 if d.ossPath("") == "" {
351 prefix = "/"
352 }
353
354 ossPath := d.ossPath(path)
355 listResponse, err := d.Bucket.List(ossPath, "/", "", listMax)
356 if err != nil {
357 return nil, parseError(opath, err)
358 }
359
360 files := []string{}
361 directories := []string{}
362
363 for {
364 for _, key := range listResponse.Contents {
365 files = append(files, strings.Replace(key.Key, d.ossPath(""), prefix, 1))
366 }
367
368 for _, commonPrefix := range listResponse.CommonPrefixes {
369 directories = append(directories, strings.Replace(commonPrefix[0:len(commonPrefix)-1], d.ossPath(""), prefix, 1))
370 }
371
372 if listResponse.IsTruncated {
373 listResponse, err = d.Bucket.List(ossPath, "/", listResponse.NextMarker, listMax)
374 if err != nil {
375 return nil, err
376 }
377 } else {
378 break
379 }
380 }
381
382
383 if len(files) > 0 && files[0] == strings.Replace(ossPath, d.ossPath(""), prefix, 1) {
384 files = files[1:]
385 }
386
387 if opath != "/" {
388 if len(files) == 0 && len(directories) == 0 {
389
390
391 return nil, storagedriver.PathNotFoundError{Path: opath}
392 }
393 }
394
395 return append(files, directories...), nil
396 }
397
398 const maxConcurrency = 10
399
400
401
402 func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
403 logrus.Infof("Move from %s to %s", d.ossPath(sourcePath), d.ossPath(destPath))
404 err := d.Bucket.CopyLargeFileInParallel(d.ossPath(sourcePath), d.ossPath(destPath),
405 d.getContentType(),
406 getPermissions(),
407 oss.Options{},
408 maxConcurrency)
409 if err != nil {
410 logrus.Errorf("Failed for move from %s to %s: %v", d.ossPath(sourcePath), d.ossPath(destPath), err)
411 return parseError(sourcePath, err)
412 }
413
414 return d.Delete(ctx, sourcePath)
415 }
416
417
418 func (d *driver) Delete(ctx context.Context, path string) error {
419 ossPath := d.ossPath(path)
420 listResponse, err := d.Bucket.List(ossPath, "", "", listMax)
421 if err != nil || len(listResponse.Contents) == 0 {
422 return storagedriver.PathNotFoundError{Path: path}
423 }
424
425 ossObjects := make([]oss.Object, listMax)
426
427 for len(listResponse.Contents) > 0 {
428 numOssObjects := len(listResponse.Contents)
429 for index, key := range listResponse.Contents {
430
431 if len(key.Key) > len(ossPath) && (key.Key)[len(ossPath)] != '/' {
432 numOssObjects = index
433 break
434 }
435 ossObjects[index].Key = key.Key
436 }
437
438 err := d.Bucket.DelMulti(oss.Delete{Quiet: false, Objects: ossObjects[0:numOssObjects]})
439 if err != nil {
440 return nil
441 }
442
443 if numOssObjects < len(listResponse.Contents) {
444 return nil
445 }
446
447 listResponse, err = d.Bucket.List(d.ossPath(path), "", "", listMax)
448 if err != nil {
449 return err
450 }
451 }
452
453 return nil
454 }
455
456
457
458 func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
459 methodString := "GET"
460 method, ok := options["method"]
461 if ok {
462 methodString, ok = method.(string)
463 if !ok || (methodString != "GET") {
464 return "", storagedriver.ErrUnsupportedMethod{}
465 }
466 }
467
468 expiresTime := time.Now().Add(20 * time.Minute)
469
470 expires, ok := options["expiry"]
471 if ok {
472 et, ok := expires.(time.Time)
473 if ok {
474 expiresTime = et
475 }
476 }
477 logrus.Infof("methodString: %s, expiresTime: %v", methodString, expiresTime)
478 signedURL := d.Bucket.SignedURLWithMethod(methodString, d.ossPath(path), expiresTime, nil, nil)
479 logrus.Infof("signed URL: %s", signedURL)
480 return signedURL, nil
481 }
482
483
484
485 func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
486 return storagedriver.WalkFallback(ctx, d, path, f)
487 }
488
489 func (d *driver) ossPath(path string) string {
490 return strings.TrimLeft(strings.TrimRight(d.RootDirectory, "/")+path, "/")
491 }
492
493 func parseError(path string, err error) error {
494 if ossErr, ok := err.(*oss.Error); ok && ossErr.StatusCode == http.StatusNotFound && (ossErr.Code == "NoSuchKey" || ossErr.Code == "") {
495 return storagedriver.PathNotFoundError{Path: path}
496 }
497
498 return err
499 }
500
501 func (d *driver) getOptions() oss.Options {
502 return oss.Options{ServerSideEncryption: d.Encrypt}
503 }
504
505 func getPermissions() oss.ACL {
506 return oss.Private
507 }
508
509 func (d *driver) getContentType() string {
510 return "application/octet-stream"
511 }
512
513
514
515
516
517 type writer struct {
518 driver *driver
519 key string
520 multi *oss.Multi
521 parts []oss.Part
522 size int64
523 readyPart []byte
524 pendingPart []byte
525 closed bool
526 committed bool
527 cancelled bool
528 }
529
530 func (d *driver) newWriter(key string, multi *oss.Multi, parts []oss.Part) storagedriver.FileWriter {
531 var size int64
532 for _, part := range parts {
533 size += part.Size
534 }
535 return &writer{
536 driver: d,
537 key: key,
538 multi: multi,
539 parts: parts,
540 size: size,
541 }
542 }
543
544 func (w *writer) Write(p []byte) (int, error) {
545 if w.closed {
546 return 0, fmt.Errorf("already closed")
547 } else if w.committed {
548 return 0, fmt.Errorf("already committed")
549 } else if w.cancelled {
550 return 0, fmt.Errorf("already cancelled")
551 }
552
553
554
555 if len(w.parts) > 0 && int(w.parts[len(w.parts)-1].Size) < minChunkSize {
556 err := w.multi.Complete(w.parts)
557 if err != nil {
558 w.multi.Abort()
559 return 0, err
560 }
561
562 multi, err := w.driver.Bucket.InitMulti(w.key, w.driver.getContentType(), getPermissions(), w.driver.getOptions())
563 if err != nil {
564 return 0, err
565 }
566 w.multi = multi
567
568
569
570 if w.size < minChunkSize {
571 contents, err := w.driver.Bucket.Get(w.key)
572 if err != nil {
573 return 0, err
574 }
575 w.parts = nil
576 w.readyPart = contents
577 } else {
578
579 _, part, err := multi.PutPartCopy(1, oss.CopyOptions{}, w.driver.Bucket.Name+"/"+w.key)
580 if err != nil {
581 return 0, err
582 }
583 w.parts = []oss.Part{part}
584 }
585 }
586
587 var n int
588
589 for len(p) > 0 {
590
591 if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 {
592 if len(p) >= neededBytes {
593 w.readyPart = append(w.readyPart, p[:neededBytes]...)
594 n += neededBytes
595 p = p[neededBytes:]
596 } else {
597 w.readyPart = append(w.readyPart, p...)
598 n += len(p)
599 p = nil
600 }
601 }
602
603 if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 {
604 if len(p) >= neededBytes {
605 w.pendingPart = append(w.pendingPart, p[:neededBytes]...)
606 n += neededBytes
607 p = p[neededBytes:]
608 err := w.flushPart()
609 if err != nil {
610 w.size += int64(n)
611 return n, err
612 }
613 } else {
614 w.pendingPart = append(w.pendingPart, p...)
615 n += len(p)
616 p = nil
617 }
618 }
619 }
620 w.size += int64(n)
621 return n, nil
622 }
623
624 func (w *writer) Size() int64 {
625 return w.size
626 }
627
628 func (w *writer) Close() error {
629 if w.closed {
630 return fmt.Errorf("already closed")
631 }
632 w.closed = true
633 return w.flushPart()
634 }
635
636 func (w *writer) Cancel() error {
637 if w.closed {
638 return fmt.Errorf("already closed")
639 } else if w.committed {
640 return fmt.Errorf("already committed")
641 }
642 w.cancelled = true
643 err := w.multi.Abort()
644 return err
645 }
646
647 func (w *writer) Commit() error {
648 if w.closed {
649 return fmt.Errorf("already closed")
650 } else if w.committed {
651 return fmt.Errorf("already committed")
652 } else if w.cancelled {
653 return fmt.Errorf("already cancelled")
654 }
655 err := w.flushPart()
656 if err != nil {
657 return err
658 }
659 w.committed = true
660 err = w.multi.Complete(w.parts)
661 if err != nil {
662 w.multi.Abort()
663 return err
664 }
665 return nil
666 }
667
668
669
670 func (w *writer) flushPart() error {
671 if len(w.readyPart) == 0 && len(w.pendingPart) == 0 {
672
673 return nil
674 }
675 if len(w.pendingPart) < int(w.driver.ChunkSize) {
676
677
678 w.readyPart = append(w.readyPart, w.pendingPart...)
679 w.pendingPart = nil
680 }
681
682 part, err := w.multi.PutPart(len(w.parts)+1, bytes.NewReader(w.readyPart))
683 if err != nil {
684 return err
685 }
686 w.parts = append(w.parts, part)
687 w.readyPart = w.pendingPart
688 w.pendingPart = nil
689 return nil
690 }
691
View as plain text