1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package swift
17
18 import (
19 "bufio"
20 "bytes"
21 "context"
22 "crypto/rand"
23 "crypto/sha1"
24 "crypto/tls"
25 "encoding/hex"
26 "fmt"
27 "io"
28 "io/ioutil"
29 "net/http"
30 "net/url"
31 "strconv"
32 "strings"
33 "time"
34
35 "github.com/mitchellh/mapstructure"
36 "github.com/ncw/swift"
37
38 storagedriver "github.com/docker/distribution/registry/storage/driver"
39 "github.com/docker/distribution/registry/storage/driver/base"
40 "github.com/docker/distribution/registry/storage/driver/factory"
41 "github.com/docker/distribution/version"
42 )
43
44 const driverName = "swift"
45
46
47 const defaultChunkSize = 20 * 1024 * 1024
48
49
50 const minChunkSize = 1 << 20
51
52
53 const contentType = "application/octet-stream"
54
55
56 var readAfterWriteTimeout = 15 * time.Second
57
58
59 var readAfterWriteWait = 200 * time.Millisecond
60
61
62 type Parameters struct {
63 Username string
64 Password string
65 AuthURL string
66 Tenant string
67 TenantID string
68 Domain string
69 DomainID string
70 TenantDomain string
71 TenantDomainID string
72 TrustID string
73 Region string
74 AuthVersion int
75 Container string
76 Prefix string
77 EndpointType string
78 InsecureSkipVerify bool
79 ChunkSize int
80 SecretKey string
81 AccessKey string
82 TempURLContainerKey bool
83 TempURLMethods []string
84 }
85
86
87 type swiftInfo struct {
88 Swift struct {
89 Version string `mapstructure:"version"`
90 }
91 Tempurl struct {
92 Methods []string `mapstructure:"methods"`
93 }
94 BulkDelete struct {
95 MaxDeletesPerRequest int `mapstructure:"max_deletes_per_request"`
96 } `mapstructure:"bulk_delete"`
97 }
98
99 func init() {
100 factory.Register(driverName, &swiftDriverFactory{})
101 }
102
103
104 type swiftDriverFactory struct{}
105
106 func (factory *swiftDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
107 return FromParameters(parameters)
108 }
109
110 type driver struct {
111 Conn *swift.Connection
112 Container string
113 Prefix string
114 BulkDeleteSupport bool
115 BulkDeleteMaxDeletes int
116 ChunkSize int
117 SecretKey string
118 AccessKey string
119 TempURLContainerKey bool
120 TempURLMethods []string
121 }
122
123 type baseEmbed struct {
124 base.Base
125 }
126
127
128
129 type Driver struct {
130 baseEmbed
131 }
132
133
134
135
136
137
138
139 func FromParameters(parameters map[string]interface{}) (*Driver, error) {
140 params := Parameters{
141 ChunkSize: defaultChunkSize,
142 InsecureSkipVerify: false,
143 }
144
145
146
147
148
149 _, ok := parameters["tenant"]
150 if ok {
151 parameters["tenant"] = fmt.Sprint(parameters["tenant"])
152 }
153 _, ok = parameters["tenantid"]
154 if ok {
155 parameters["tenantid"] = fmt.Sprint(parameters["tenantid"])
156 }
157
158 if err := mapstructure.Decode(parameters, ¶ms); err != nil {
159 return nil, err
160 }
161
162 if params.Username == "" {
163 return nil, fmt.Errorf("no username parameter provided")
164 }
165
166 if params.Password == "" {
167 return nil, fmt.Errorf("no password parameter provided")
168 }
169
170 if params.AuthURL == "" {
171 return nil, fmt.Errorf("no authurl parameter provided")
172 }
173
174 if params.Container == "" {
175 return nil, fmt.Errorf("no container parameter provided")
176 }
177
178 if params.ChunkSize < minChunkSize {
179 return nil, fmt.Errorf("the chunksize %#v parameter should be a number that is larger than or equal to %d", params.ChunkSize, minChunkSize)
180 }
181
182 return New(params)
183 }
184
185
186 func New(params Parameters) (*Driver, error) {
187 transport := &http.Transport{
188 Proxy: http.ProxyFromEnvironment,
189 MaxIdleConnsPerHost: 2048,
190 TLSClientConfig: &tls.Config{InsecureSkipVerify: params.InsecureSkipVerify},
191 }
192
193 ct := &swift.Connection{
194 UserName: params.Username,
195 ApiKey: params.Password,
196 AuthUrl: params.AuthURL,
197 Region: params.Region,
198 AuthVersion: params.AuthVersion,
199 UserAgent: "distribution/" + version.Version,
200 Tenant: params.Tenant,
201 TenantId: params.TenantID,
202 Domain: params.Domain,
203 DomainId: params.DomainID,
204 TenantDomain: params.TenantDomain,
205 TenantDomainId: params.TenantDomainID,
206 TrustId: params.TrustID,
207 EndpointType: swift.EndpointType(params.EndpointType),
208 Transport: transport,
209 ConnectTimeout: 60 * time.Second,
210 Timeout: 15 * 60 * time.Second,
211 }
212 err := ct.Authenticate()
213 if err != nil {
214 return nil, fmt.Errorf("swift authentication failed: %s", err)
215 }
216
217 if _, _, err := ct.Container(params.Container); err == swift.ContainerNotFound {
218 if err := ct.ContainerCreate(params.Container, nil); err != nil {
219 return nil, fmt.Errorf("failed to create container %s (%s)", params.Container, err)
220 }
221 } else if err != nil {
222 return nil, fmt.Errorf("failed to retrieve info about container %s (%s)", params.Container, err)
223 }
224
225 d := &driver{
226 Conn: ct,
227 Container: params.Container,
228 Prefix: params.Prefix,
229 ChunkSize: params.ChunkSize,
230 TempURLMethods: make([]string, 0),
231 AccessKey: params.AccessKey,
232 }
233
234 info := swiftInfo{}
235 if config, err := d.Conn.QueryInfo(); err == nil {
236 _, d.BulkDeleteSupport = config["bulk_delete"]
237
238 if err := mapstructure.Decode(config, &info); err == nil {
239 d.TempURLContainerKey = info.Swift.Version >= "2.3.0"
240 d.TempURLMethods = info.Tempurl.Methods
241 if d.BulkDeleteSupport {
242 d.BulkDeleteMaxDeletes = info.BulkDelete.MaxDeletesPerRequest
243 }
244 }
245 } else {
246 d.TempURLContainerKey = params.TempURLContainerKey
247 d.TempURLMethods = params.TempURLMethods
248 }
249
250 if len(d.TempURLMethods) > 0 {
251 secretKey := params.SecretKey
252 if secretKey == "" {
253 secretKey, _ = generateSecret()
254 }
255
256
257
258 if d.TempURLContainerKey {
259 _, containerHeaders, err := d.Conn.Container(d.Container)
260 if err != nil {
261 return nil, fmt.Errorf("failed to fetch container info %s (%s)", d.Container, err)
262 }
263
264 d.SecretKey = containerHeaders["X-Container-Meta-Temp-Url-Key"]
265 if d.SecretKey == "" || (params.SecretKey != "" && d.SecretKey != params.SecretKey) {
266 m := swift.Metadata{}
267 m["temp-url-key"] = secretKey
268 if d.Conn.ContainerUpdate(d.Container, m.ContainerHeaders()); err == nil {
269 d.SecretKey = secretKey
270 }
271 }
272 } else {
273
274 _, accountHeaders, err := d.Conn.Account()
275 if err != nil {
276 return nil, fmt.Errorf("failed to fetch account info (%s)", err)
277 }
278
279 d.SecretKey = accountHeaders["X-Account-Meta-Temp-Url-Key"]
280 if d.SecretKey == "" || (params.SecretKey != "" && d.SecretKey != params.SecretKey) {
281 m := swift.Metadata{}
282 m["temp-url-key"] = secretKey
283 if err := d.Conn.AccountUpdate(m.AccountHeaders()); err == nil {
284 d.SecretKey = secretKey
285 }
286 }
287 }
288 }
289
290 return &Driver{
291 baseEmbed: baseEmbed{
292 Base: base.Base{
293 StorageDriver: d,
294 },
295 },
296 }, nil
297 }
298
299
300
301 func (d *driver) Name() string {
302 return driverName
303 }
304
305
306 func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
307 content, err := d.Conn.ObjectGetBytes(d.Container, d.swiftPath(path))
308 if err == swift.ObjectNotFound {
309 return nil, storagedriver.PathNotFoundError{Path: path}
310 }
311 return content, err
312 }
313
314
315 func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
316 err := d.Conn.ObjectPutBytes(d.Container, d.swiftPath(path), contents, contentType)
317 if err == swift.ObjectNotFound {
318 return storagedriver.PathNotFoundError{Path: path}
319 }
320 return err
321 }
322
323
324
325 func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
326 headers := make(swift.Headers)
327 headers["Range"] = "bytes=" + strconv.FormatInt(offset, 10) + "-"
328
329 waitingTime := readAfterWriteWait
330 endTime := time.Now().Add(readAfterWriteTimeout)
331
332 for {
333 file, headers, err := d.Conn.ObjectOpen(d.Container, d.swiftPath(path), false, headers)
334 if err != nil {
335 if err == swift.ObjectNotFound {
336 return nil, storagedriver.PathNotFoundError{Path: path}
337 }
338 if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == http.StatusRequestedRangeNotSatisfiable {
339 return ioutil.NopCloser(bytes.NewReader(nil)), nil
340 }
341 return file, err
342 }
343
344
345
346 _, isDLO := headers["X-Object-Manifest"]
347 size, err := file.Length()
348 if err != nil {
349 return file, err
350 }
351 if isDLO && size == 0 {
352 if time.Now().Add(waitingTime).After(endTime) {
353 return nil, fmt.Errorf("timeout expired while waiting for segments of %s to show up", path)
354 }
355 time.Sleep(waitingTime)
356 waitingTime *= 2
357 continue
358 }
359
360
361 return file, nil
362 }
363 }
364
365
366
367 func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
368 var (
369 segments []swift.Object
370 segmentsPath string
371 err error
372 )
373
374 if !append {
375 segmentsPath, err = d.swiftSegmentPath(path)
376 if err != nil {
377 return nil, err
378 }
379 } else {
380 info, headers, err := d.Conn.Object(d.Container, d.swiftPath(path))
381 if err == swift.ObjectNotFound {
382 return nil, storagedriver.PathNotFoundError{Path: path}
383 } else if err != nil {
384 return nil, err
385 }
386 manifest, ok := headers["X-Object-Manifest"]
387 if !ok {
388 segmentsPath, err = d.swiftSegmentPath(path)
389 if err != nil {
390 return nil, err
391 }
392 if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, getSegmentPath(segmentsPath, len(segments))); err != nil {
393 return nil, err
394 }
395 segments = []swift.Object{info}
396 } else {
397 _, segmentsPath = parseManifest(manifest)
398 if segments, err = d.getAllSegments(segmentsPath); err != nil {
399 return nil, err
400 }
401 }
402 }
403
404 return d.newWriter(path, segmentsPath, segments), nil
405 }
406
407
408
409 func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
410 swiftPath := d.swiftPath(path)
411 opts := &swift.ObjectsOpts{
412 Prefix: swiftPath,
413 Delimiter: '/',
414 }
415
416 objects, err := d.Conn.ObjectsAll(d.Container, opts)
417 if err != nil {
418 if err == swift.ContainerNotFound {
419 return nil, storagedriver.PathNotFoundError{Path: path}
420 }
421 return nil, err
422 }
423
424 fi := storagedriver.FileInfoFields{
425 Path: strings.TrimPrefix(strings.TrimSuffix(swiftPath, "/"), d.swiftPath("/")),
426 }
427
428 for _, obj := range objects {
429 if obj.PseudoDirectory && obj.Name == swiftPath+"/" {
430 fi.IsDir = true
431 return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
432 } else if obj.Name == swiftPath {
433
434
435 break
436 }
437 }
438
439
440
441
442 waitingTime := readAfterWriteWait
443 endTime := time.Now().Add(readAfterWriteTimeout)
444
445 for {
446 info, headers, err := d.Conn.Object(d.Container, swiftPath)
447 if err != nil {
448 if err == swift.ObjectNotFound {
449 return nil, storagedriver.PathNotFoundError{Path: path}
450 }
451 return nil, err
452 }
453
454
455
456 _, isDLO := headers["X-Object-Manifest"]
457 if isDLO && info.Bytes == 0 {
458 if time.Now().Add(waitingTime).After(endTime) {
459 return nil, fmt.Errorf("timeout expired while waiting for segments of %s to show up", path)
460 }
461 time.Sleep(waitingTime)
462 waitingTime *= 2
463 continue
464 }
465
466
467 fi.IsDir = false
468 fi.Size = info.Bytes
469 fi.ModTime = info.LastModified
470 return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
471 }
472 }
473
474
475 func (d *driver) List(ctx context.Context, path string) ([]string, error) {
476 var files []string
477
478 prefix := d.swiftPath(path)
479 if prefix != "" {
480 prefix += "/"
481 }
482
483 opts := &swift.ObjectsOpts{
484 Prefix: prefix,
485 Delimiter: '/',
486 }
487
488 objects, err := d.Conn.ObjectsAll(d.Container, opts)
489 for _, obj := range objects {
490 files = append(files, strings.TrimPrefix(strings.TrimSuffix(obj.Name, "/"), d.swiftPath("/")))
491 }
492
493 if err == swift.ContainerNotFound || (len(objects) == 0 && path != "/") {
494 return files, storagedriver.PathNotFoundError{Path: path}
495 }
496 return files, err
497 }
498
499
500
501 func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
502 _, headers, err := d.Conn.Object(d.Container, d.swiftPath(sourcePath))
503 if err == nil {
504 if manifest, ok := headers["X-Object-Manifest"]; ok {
505 if err = d.createManifest(destPath, manifest); err != nil {
506 return err
507 }
508 err = d.Conn.ObjectDelete(d.Container, d.swiftPath(sourcePath))
509 } else {
510 err = d.Conn.ObjectMove(d.Container, d.swiftPath(sourcePath), d.Container, d.swiftPath(destPath))
511 }
512 }
513 if err == swift.ObjectNotFound {
514 return storagedriver.PathNotFoundError{Path: sourcePath}
515 }
516 return err
517 }
518
519
520 func (d *driver) Delete(ctx context.Context, path string) error {
521 opts := swift.ObjectsOpts{
522 Prefix: d.swiftPath(path) + "/",
523 }
524
525 objects, err := d.Conn.ObjectsAll(d.Container, &opts)
526 if err != nil {
527 if err == swift.ContainerNotFound {
528 return storagedriver.PathNotFoundError{Path: path}
529 }
530 return err
531 }
532
533 for _, obj := range objects {
534 if obj.PseudoDirectory {
535 continue
536 }
537 if _, headers, err := d.Conn.Object(d.Container, obj.Name); err == nil {
538 manifest, ok := headers["X-Object-Manifest"]
539 if ok {
540 _, prefix := parseManifest(manifest)
541 segments, err := d.getAllSegments(prefix)
542 if err != nil {
543 return err
544 }
545 objects = append(objects, segments...)
546 }
547 } else {
548 if err == swift.ObjectNotFound {
549 return storagedriver.PathNotFoundError{Path: obj.Name}
550 }
551 return err
552 }
553 }
554
555 if d.BulkDeleteSupport && len(objects) > 0 && d.BulkDeleteMaxDeletes > 0 {
556 filenames := make([]string, len(objects))
557 for i, obj := range objects {
558 filenames[i] = obj.Name
559 }
560
561 chunks, err := chunkFilenames(filenames, d.BulkDeleteMaxDeletes)
562 if err != nil {
563 return err
564 }
565 for _, chunk := range chunks {
566 _, err := d.Conn.BulkDelete(d.Container, chunk)
567
568
569 if err != nil && err != swift.Forbidden && err != swift.ObjectNotFound {
570 if err == swift.ContainerNotFound {
571 return storagedriver.PathNotFoundError{Path: path}
572 }
573 return err
574 }
575 }
576 } else {
577 for _, obj := range objects {
578 if err := d.Conn.ObjectDelete(d.Container, obj.Name); err != nil {
579 if err == swift.ObjectNotFound {
580 return storagedriver.PathNotFoundError{Path: obj.Name}
581 }
582 return err
583 }
584 }
585 }
586
587 _, _, err = d.Conn.Object(d.Container, d.swiftPath(path))
588 if err == nil {
589 if err := d.Conn.ObjectDelete(d.Container, d.swiftPath(path)); err != nil {
590 if err == swift.ObjectNotFound {
591 return storagedriver.PathNotFoundError{Path: path}
592 }
593 return err
594 }
595 } else if err == swift.ObjectNotFound {
596 if len(objects) == 0 {
597 return storagedriver.PathNotFoundError{Path: path}
598 }
599 } else {
600 return err
601 }
602 return nil
603 }
604
605
606 func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
607 if d.SecretKey == "" {
608 return "", storagedriver.ErrUnsupportedMethod{}
609 }
610
611 methodString := "GET"
612 method, ok := options["method"]
613 if ok {
614 if methodString, ok = method.(string); !ok {
615 return "", storagedriver.ErrUnsupportedMethod{}
616 }
617 }
618
619 if methodString == "HEAD" {
620
621
622 methodString = "GET"
623 }
624
625 supported := false
626 for _, method := range d.TempURLMethods {
627 if method == methodString {
628 supported = true
629 break
630 }
631 }
632
633 if !supported {
634 return "", storagedriver.ErrUnsupportedMethod{}
635 }
636
637 expiresTime := time.Now().Add(20 * time.Minute)
638 expires, ok := options["expiry"]
639 if ok {
640 et, ok := expires.(time.Time)
641 if ok {
642 expiresTime = et
643 }
644 }
645
646 tempURL := d.Conn.ObjectTempUrl(d.Container, d.swiftPath(path), d.SecretKey, methodString, expiresTime)
647
648 if d.AccessKey != "" {
649
650 url, _ := url.Parse(tempURL)
651 query := url.Query()
652 query.Set("temp_url_sig", fmt.Sprintf("%s:%s:%s", d.Conn.TenantId, d.AccessKey, query.Get("temp_url_sig")))
653 url.RawQuery = query.Encode()
654 tempURL = url.String()
655 }
656
657 return tempURL, nil
658 }
659
660
661
662 func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
663 return storagedriver.WalkFallback(ctx, d, path, f)
664 }
665
666 func (d *driver) swiftPath(path string) string {
667 return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
668 }
669
670 func (d *driver) swiftSegmentPath(path string) (string, error) {
671 checksum := sha1.New()
672 random := make([]byte, 32)
673 if _, err := rand.Read(random); err != nil {
674 return "", err
675 }
676 path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...)))
677 return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
678 }
679
680 func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
681
682 segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path})
683 if err != nil {
684 if err == swift.ContainerNotFound {
685 return nil, storagedriver.PathNotFoundError{Path: path}
686 }
687 return nil, err
688 }
689
690
691 hasObjectName := make(map[string]struct{})
692 for _, segment := range segments {
693 hasObjectName[segment.Name] = struct{}{}
694 }
695
696
697
698
699 segmentNumber := 0
700 for {
701 segmentNumber++
702 segmentPath := getSegmentPath(path, segmentNumber)
703
704 if _, seen := hasObjectName[segmentPath]; seen {
705 continue
706 }
707
708
709
710
711
712
713 segment, _, err := d.Conn.Object(d.Container, segmentPath)
714 switch err {
715 case nil:
716
717 segments = append(segments, segment)
718 continue
719 case swift.ObjectNotFound:
720
721
722 return segments, nil
723 default:
724 return nil, err
725 }
726 }
727 }
728
729 func (d *driver) createManifest(path string, segments string) error {
730 headers := make(swift.Headers)
731 headers["X-Object-Manifest"] = segments
732 manifest, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "", contentType, headers)
733 if err != nil {
734 if err == swift.ObjectNotFound {
735 return storagedriver.PathNotFoundError{Path: path}
736 }
737 return err
738 }
739 if err := manifest.Close(); err != nil {
740 if err == swift.ObjectNotFound {
741 return storagedriver.PathNotFoundError{Path: path}
742 }
743 return err
744 }
745 return nil
746 }
747
748 func chunkFilenames(slice []string, maxSize int) (chunks [][]string, err error) {
749 if maxSize > 0 {
750 for offset := 0; offset < len(slice); offset += maxSize {
751 chunkSize := maxSize
752 if offset+chunkSize > len(slice) {
753 chunkSize = len(slice) - offset
754 }
755 chunks = append(chunks, slice[offset:offset+chunkSize])
756 }
757 } else {
758 return nil, fmt.Errorf("max chunk size must be > 0")
759 }
760 return
761 }
762
763 func parseManifest(manifest string) (container string, prefix string) {
764 components := strings.SplitN(manifest, "/", 2)
765 container = components[0]
766 if len(components) > 1 {
767 prefix = components[1]
768 }
769 return container, prefix
770 }
771
772 func generateSecret() (string, error) {
773 var secretBytes [32]byte
774 if _, err := rand.Read(secretBytes[:]); err != nil {
775 return "", fmt.Errorf("could not generate random bytes for Swift secret key: %v", err)
776 }
777 return hex.EncodeToString(secretBytes[:]), nil
778 }
779
780 func getSegmentPath(segmentsPath string, partNumber int) string {
781 return fmt.Sprintf("%s/%016d", segmentsPath, partNumber)
782 }
783
784 type writer struct {
785 driver *driver
786 path string
787 segmentsPath string
788 size int64
789 bw *bufio.Writer
790 closed bool
791 committed bool
792 cancelled bool
793 }
794
795 func (d *driver) newWriter(path, segmentsPath string, segments []swift.Object) storagedriver.FileWriter {
796 var size int64
797 for _, segment := range segments {
798 size += segment.Bytes
799 }
800 return &writer{
801 driver: d,
802 path: path,
803 segmentsPath: segmentsPath,
804 size: size,
805 bw: bufio.NewWriterSize(&segmentWriter{
806 conn: d.Conn,
807 container: d.Container,
808 segmentsPath: segmentsPath,
809 segmentNumber: len(segments) + 1,
810 maxChunkSize: d.ChunkSize,
811 }, d.ChunkSize),
812 }
813 }
814
815 func (w *writer) Write(p []byte) (int, error) {
816 if w.closed {
817 return 0, fmt.Errorf("already closed")
818 } else if w.committed {
819 return 0, fmt.Errorf("already committed")
820 } else if w.cancelled {
821 return 0, fmt.Errorf("already cancelled")
822 }
823
824 n, err := w.bw.Write(p)
825 w.size += int64(n)
826 return n, err
827 }
828
829 func (w *writer) Size() int64 {
830 return w.size
831 }
832
833 func (w *writer) Close() error {
834 if w.closed {
835 return fmt.Errorf("already closed")
836 }
837
838 if err := w.bw.Flush(); err != nil {
839 return err
840 }
841
842 if !w.committed && !w.cancelled {
843 if err := w.driver.createManifest(w.path, w.driver.Container+"/"+w.segmentsPath); err != nil {
844 return err
845 }
846 if err := w.waitForSegmentsToShowUp(); err != nil {
847 return err
848 }
849 }
850 w.closed = true
851
852 return nil
853 }
854
855 func (w *writer) Cancel() error {
856 if w.closed {
857 return fmt.Errorf("already closed")
858 } else if w.committed {
859 return fmt.Errorf("already committed")
860 }
861 w.cancelled = true
862 return w.driver.Delete(context.Background(), w.path)
863 }
864
865 func (w *writer) Commit() error {
866 if w.closed {
867 return fmt.Errorf("already closed")
868 } else if w.committed {
869 return fmt.Errorf("already committed")
870 } else if w.cancelled {
871 return fmt.Errorf("already cancelled")
872 }
873
874 if err := w.bw.Flush(); err != nil {
875 return err
876 }
877
878 if err := w.driver.createManifest(w.path, w.driver.Container+"/"+w.segmentsPath); err != nil {
879 return err
880 }
881
882 w.committed = true
883 return w.waitForSegmentsToShowUp()
884 }
885
886 func (w *writer) waitForSegmentsToShowUp() error {
887 var err error
888 waitingTime := readAfterWriteWait
889 endTime := time.Now().Add(readAfterWriteTimeout)
890
891 for {
892 var info swift.Object
893 if info, _, err = w.driver.Conn.Object(w.driver.Container, w.driver.swiftPath(w.path)); err == nil {
894 if info.Bytes == w.size {
895 break
896 }
897 err = fmt.Errorf("timeout expired while waiting for segments of %s to show up", w.path)
898 }
899 if time.Now().Add(waitingTime).After(endTime) {
900 break
901 }
902 time.Sleep(waitingTime)
903 waitingTime *= 2
904 }
905
906 return err
907 }
908
909 type segmentWriter struct {
910 conn *swift.Connection
911 container string
912 segmentsPath string
913 segmentNumber int
914 maxChunkSize int
915 }
916
917 func (sw *segmentWriter) Write(p []byte) (int, error) {
918 n := 0
919 for offset := 0; offset < len(p); offset += sw.maxChunkSize {
920 chunkSize := sw.maxChunkSize
921 if offset+chunkSize > len(p) {
922 chunkSize = len(p) - offset
923 }
924 _, err := sw.conn.ObjectPut(sw.container, getSegmentPath(sw.segmentsPath, sw.segmentNumber), bytes.NewReader(p[offset:offset+chunkSize]), false, "", contentType, nil)
925 if err != nil {
926 return n, err
927 }
928
929 sw.segmentNumber++
930 n += chunkSize
931 }
932
933 return n, nil
934 }
935
View as plain text