1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package gcs
16
17 import (
18 "bytes"
19 "context"
20 "encoding/json"
21 "fmt"
22 "io"
23 "io/ioutil"
24 "math/rand"
25 "net/http"
26 "net/url"
27 "reflect"
28 "regexp"
29 "sort"
30 "strconv"
31 "strings"
32 "time"
33
34 storagedriver "github.com/docker/distribution/registry/storage/driver"
35 "github.com/docker/distribution/registry/storage/driver/base"
36 "github.com/docker/distribution/registry/storage/driver/factory"
37 "github.com/sirupsen/logrus"
38 "golang.org/x/oauth2"
39 "golang.org/x/oauth2/google"
40 "golang.org/x/oauth2/jwt"
41 "google.golang.org/api/googleapi"
42 "google.golang.org/cloud"
43 "google.golang.org/cloud/storage"
44 )
45
46 const (
47 driverName = "gcs"
48 dummyProjectID = "<unknown>"
49
50 uploadSessionContentType = "application/x-docker-upload-session"
51 minChunkSize = 256 * 1024
52 defaultChunkSize = 20 * minChunkSize
53 defaultMaxConcurrency = 50
54 minConcurrency = 25
55
56 maxTries = 5
57 )
58
59 var rangeHeader = regexp.MustCompile(`^bytes=([0-9])+-([0-9]+)$`)
60
61
62 type driverParameters struct {
63 bucket string
64 email string
65 privateKey []byte
66 client *http.Client
67 rootDirectory string
68 chunkSize int
69
70
71
72
73
74 maxConcurrency uint64
75 }
76
77 func init() {
78 factory.Register(driverName, &gcsDriverFactory{})
79 }
80
81
82 type gcsDriverFactory struct{}
83
84
85 func (factory *gcsDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
86 return FromParameters(parameters)
87 }
88
89 var _ storagedriver.StorageDriver = &driver{}
90
91
92
93 type driver struct {
94 client *http.Client
95 bucket string
96 email string
97 privateKey []byte
98 rootDirectory string
99 chunkSize int
100 }
101
102
103
104 type Wrapper struct {
105 baseEmbed
106 }
107
108 type baseEmbed struct {
109 base.Base
110 }
111
112
113
114
115 func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
116 bucket, ok := parameters["bucket"]
117 if !ok || fmt.Sprint(bucket) == "" {
118 return nil, fmt.Errorf("No bucket parameter provided")
119 }
120
121 rootDirectory, ok := parameters["rootdirectory"]
122 if !ok {
123 rootDirectory = ""
124 }
125
126 chunkSize := defaultChunkSize
127 chunkSizeParam, ok := parameters["chunksize"]
128 if ok {
129 switch v := chunkSizeParam.(type) {
130 case string:
131 vv, err := strconv.Atoi(v)
132 if err != nil {
133 return nil, fmt.Errorf("chunksize parameter must be an integer, %v invalid", chunkSizeParam)
134 }
135 chunkSize = vv
136 case int, uint, int32, uint32, uint64, int64:
137 chunkSize = int(reflect.ValueOf(v).Convert(reflect.TypeOf(chunkSize)).Int())
138 default:
139 return nil, fmt.Errorf("invalid valud for chunksize: %#v", chunkSizeParam)
140 }
141
142 if chunkSize < minChunkSize {
143 return nil, fmt.Errorf("The chunksize %#v parameter should be a number that is larger than or equal to %d", chunkSize, minChunkSize)
144 }
145
146 if chunkSize%minChunkSize != 0 {
147 return nil, fmt.Errorf("chunksize should be a multiple of %d", minChunkSize)
148 }
149 }
150
151 var ts oauth2.TokenSource
152 jwtConf := new(jwt.Config)
153 if keyfile, ok := parameters["keyfile"]; ok {
154 jsonKey, err := ioutil.ReadFile(fmt.Sprint(keyfile))
155 if err != nil {
156 return nil, err
157 }
158 jwtConf, err = google.JWTConfigFromJSON(jsonKey, storage.ScopeFullControl)
159 if err != nil {
160 return nil, err
161 }
162 ts = jwtConf.TokenSource(context.Background())
163 } else if credentials, ok := parameters["credentials"]; ok {
164 credentialMap, ok := credentials.(map[interface{}]interface{})
165 if !ok {
166 return nil, fmt.Errorf("The credentials were not specified in the correct format")
167 }
168
169 stringMap := map[string]interface{}{}
170 for k, v := range credentialMap {
171 key, ok := k.(string)
172 if !ok {
173 return nil, fmt.Errorf("One of the credential keys was not a string: %s", fmt.Sprint(k))
174 }
175 stringMap[key] = v
176 }
177
178 data, err := json.Marshal(stringMap)
179 if err != nil {
180 return nil, fmt.Errorf("Failed to marshal gcs credentials to json")
181 }
182
183 jwtConf, err = google.JWTConfigFromJSON(data, storage.ScopeFullControl)
184 if err != nil {
185 return nil, err
186 }
187 ts = jwtConf.TokenSource(context.Background())
188 } else {
189 var err error
190 ts, err = google.DefaultTokenSource(context.Background(), storage.ScopeFullControl)
191 if err != nil {
192 return nil, err
193 }
194 }
195
196 maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency)
197 if err != nil {
198 return nil, fmt.Errorf("maxconcurrency config error: %s", err)
199 }
200
201 params := driverParameters{
202 bucket: fmt.Sprint(bucket),
203 rootDirectory: fmt.Sprint(rootDirectory),
204 email: jwtConf.Email,
205 privateKey: jwtConf.PrivateKey,
206 client: oauth2.NewClient(context.Background(), ts),
207 chunkSize: chunkSize,
208 maxConcurrency: maxConcurrency,
209 }
210
211 return New(params)
212 }
213
214
215 func New(params driverParameters) (storagedriver.StorageDriver, error) {
216 rootDirectory := strings.Trim(params.rootDirectory, "/")
217 if rootDirectory != "" {
218 rootDirectory += "/"
219 }
220 if params.chunkSize <= 0 || params.chunkSize%minChunkSize != 0 {
221 return nil, fmt.Errorf("Invalid chunksize: %d is not a positive multiple of %d", params.chunkSize, minChunkSize)
222 }
223 d := &driver{
224 bucket: params.bucket,
225 rootDirectory: rootDirectory,
226 email: params.email,
227 privateKey: params.privateKey,
228 client: params.client,
229 chunkSize: params.chunkSize,
230 }
231
232 return &Wrapper{
233 baseEmbed: baseEmbed{
234 Base: base.Base{
235 StorageDriver: base.NewRegulator(d, params.maxConcurrency),
236 },
237 },
238 }, nil
239 }
240
241
242
243 func (d *driver) Name() string {
244 return driverName
245 }
246
247
248
249 func (d *driver) GetContent(context context.Context, path string) ([]byte, error) {
250 gcsContext := d.context(context)
251 name := d.pathToKey(path)
252 var rc io.ReadCloser
253 err := retry(func() error {
254 var err error
255 rc, err = storage.NewReader(gcsContext, d.bucket, name)
256 return err
257 })
258 if err == storage.ErrObjectNotExist {
259 return nil, storagedriver.PathNotFoundError{Path: path}
260 }
261 if err != nil {
262 return nil, err
263 }
264 defer rc.Close()
265
266 p, err := ioutil.ReadAll(rc)
267 if err != nil {
268 return nil, err
269 }
270 return p, nil
271 }
272
273
274
275 func (d *driver) PutContent(context context.Context, path string, contents []byte) error {
276 return retry(func() error {
277 wc := storage.NewWriter(d.context(context), d.bucket, d.pathToKey(path))
278 wc.ContentType = "application/octet-stream"
279 return putContentsClose(wc, contents)
280 })
281 }
282
283
284
285
286 func (d *driver) Reader(context context.Context, path string, offset int64) (io.ReadCloser, error) {
287 res, err := getObject(d.client, d.bucket, d.pathToKey(path), offset)
288 if err != nil {
289 if res != nil {
290 if res.StatusCode == http.StatusNotFound {
291 res.Body.Close()
292 return nil, storagedriver.PathNotFoundError{Path: path}
293 }
294
295 if res.StatusCode == http.StatusRequestedRangeNotSatisfiable {
296 res.Body.Close()
297 obj, err := storageStatObject(d.context(context), d.bucket, d.pathToKey(path))
298 if err != nil {
299 return nil, err
300 }
301 if offset == obj.Size {
302 return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
303 }
304 return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
305 }
306 }
307 return nil, err
308 }
309 if res.Header.Get("Content-Type") == uploadSessionContentType {
310 defer res.Body.Close()
311 return nil, storagedriver.PathNotFoundError{Path: path}
312 }
313 return res.Body, nil
314 }
315
316 func getObject(client *http.Client, bucket string, name string, offset int64) (*http.Response, error) {
317
318
319 u := &url.URL{
320 Scheme: "https",
321 Host: "storage.googleapis.com",
322 Path: fmt.Sprintf("/%s/%s", bucket, name),
323 }
324 req, err := http.NewRequest("GET", u.String(), nil)
325 if err != nil {
326 return nil, err
327 }
328 if offset > 0 {
329 req.Header.Set("Range", fmt.Sprintf("bytes=%v-", offset))
330 }
331 var res *http.Response
332 err = retry(func() error {
333 var err error
334 res, err = client.Do(req)
335 return err
336 })
337 if err != nil {
338 return nil, err
339 }
340 return res, googleapi.CheckMediaResponse(res)
341 }
342
343
344
345 func (d *driver) Writer(context context.Context, path string, append bool) (storagedriver.FileWriter, error) {
346 writer := &writer{
347 client: d.client,
348 bucket: d.bucket,
349 name: d.pathToKey(path),
350 buffer: make([]byte, d.chunkSize),
351 }
352
353 if append {
354 err := writer.init(path)
355 if err != nil {
356 return nil, err
357 }
358 }
359 return writer, nil
360 }
361
362 type writer struct {
363 client *http.Client
364 bucket string
365 name string
366 size int64
367 offset int64
368 closed bool
369 sessionURI string
370 buffer []byte
371 buffSize int
372 }
373
374
375 func (w *writer) Cancel() error {
376 w.closed = true
377 err := storageDeleteObject(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
378 if err != nil {
379 if status, ok := err.(*googleapi.Error); ok {
380 if status.Code == http.StatusNotFound {
381 err = nil
382 }
383 }
384 }
385 return err
386 }
387
388 func (w *writer) Close() error {
389 if w.closed {
390 return nil
391 }
392 w.closed = true
393
394 err := w.writeChunk()
395 if err != nil {
396 return err
397 }
398
399
400
401
402
403 if w.buffSize > minChunkSize {
404 w.buffSize = minChunkSize
405 }
406
407
408 err = retry(func() error {
409 wc := storage.NewWriter(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
410 wc.ContentType = uploadSessionContentType
411 wc.Metadata = map[string]string{
412 "Session-URI": w.sessionURI,
413 "Offset": strconv.FormatInt(w.offset, 10),
414 }
415 return putContentsClose(wc, w.buffer[0:w.buffSize])
416 })
417 if err != nil {
418 return err
419 }
420 w.size = w.offset + int64(w.buffSize)
421 w.buffSize = 0
422 return nil
423 }
424
425 func putContentsClose(wc *storage.Writer, contents []byte) error {
426 size := len(contents)
427 var nn int
428 var err error
429 for nn < size {
430 n, err := wc.Write(contents[nn:size])
431 nn += n
432 if err != nil {
433 break
434 }
435 }
436 if err != nil {
437 return err
438 }
439 return wc.Close()
440 }
441
442
443
444
445 func (w *writer) Commit() error {
446
447 if err := w.checkClosed(); err != nil {
448 return err
449 }
450 w.closed = true
451
452
453 if w.sessionURI == "" {
454 err := retry(func() error {
455 wc := storage.NewWriter(cloud.NewContext(dummyProjectID, w.client), w.bucket, w.name)
456 wc.ContentType = "application/octet-stream"
457 return putContentsClose(wc, w.buffer[0:w.buffSize])
458 })
459 if err != nil {
460 return err
461 }
462 w.size = w.offset + int64(w.buffSize)
463 w.buffSize = 0
464 return nil
465 }
466 size := w.offset + int64(w.buffSize)
467 var nn int
468
469
470 for {
471 n, err := putChunk(w.client, w.sessionURI, w.buffer[nn:w.buffSize], w.offset, size)
472 nn += int(n)
473 w.offset += n
474 w.size = w.offset
475 if err != nil {
476 w.buffSize = copy(w.buffer, w.buffer[nn:w.buffSize])
477 return err
478 }
479 if nn == w.buffSize {
480 break
481 }
482 }
483 w.buffSize = 0
484 return nil
485 }
486
487 func (w *writer) checkClosed() error {
488 if w.closed {
489 return fmt.Errorf("Writer already closed")
490 }
491 return nil
492 }
493
494 func (w *writer) writeChunk() error {
495 var err error
496
497
498 chunkSize := w.buffSize - (w.buffSize % minChunkSize)
499 if chunkSize == 0 {
500 return nil
501 }
502
503 if w.sessionURI == "" {
504 w.sessionURI, err = startSession(w.client, w.bucket, w.name)
505 }
506 if err != nil {
507 return err
508 }
509 nn, err := putChunk(w.client, w.sessionURI, w.buffer[0:chunkSize], w.offset, -1)
510 w.offset += nn
511 if w.offset > w.size {
512 w.size = w.offset
513 }
514
515 w.buffSize = copy(w.buffer, w.buffer[int(nn):w.buffSize])
516
517 return err
518 }
519
520 func (w *writer) Write(p []byte) (int, error) {
521 err := w.checkClosed()
522 if err != nil {
523 return 0, err
524 }
525
526 var nn int
527 for nn < len(p) {
528 n := copy(w.buffer[w.buffSize:], p[nn:])
529 w.buffSize += n
530 if w.buffSize == cap(w.buffer) {
531 err = w.writeChunk()
532 if err != nil {
533 break
534 }
535 }
536 nn += n
537 }
538 return nn, err
539 }
540
541
542 func (w *writer) Size() int64 {
543 return w.size
544 }
545
546 func (w *writer) init(path string) error {
547 res, err := getObject(w.client, w.bucket, w.name, 0)
548 if err != nil {
549 return err
550 }
551 defer res.Body.Close()
552 if res.Header.Get("Content-Type") != uploadSessionContentType {
553 return storagedriver.PathNotFoundError{Path: path}
554 }
555 offset, err := strconv.ParseInt(res.Header.Get("X-Goog-Meta-Offset"), 10, 64)
556 if err != nil {
557 return err
558 }
559 buffer, err := ioutil.ReadAll(res.Body)
560 if err != nil {
561 return err
562 }
563 w.sessionURI = res.Header.Get("X-Goog-Meta-Session-URI")
564 w.buffSize = copy(w.buffer, buffer)
565 w.offset = offset
566 w.size = offset + int64(w.buffSize)
567 return nil
568 }
569
570 type request func() error
571
572 func retry(req request) error {
573 backoff := time.Second
574 var err error
575 for i := 0; i < maxTries; i++ {
576 err = req()
577 if err == nil {
578 return nil
579 }
580
581 status, ok := err.(*googleapi.Error)
582 if !ok || (status.Code != 429 && status.Code < http.StatusInternalServerError) {
583 return err
584 }
585
586 time.Sleep(backoff - time.Second + (time.Duration(rand.Int31n(1000)) * time.Millisecond))
587 if i <= 4 {
588 backoff = backoff * 2
589 }
590 }
591 return err
592 }
593
594
595
596 func (d *driver) Stat(context context.Context, path string) (storagedriver.FileInfo, error) {
597 var fi storagedriver.FileInfoFields
598
599 gcsContext := d.context(context)
600 obj, err := storageStatObject(gcsContext, d.bucket, d.pathToKey(path))
601 if err == nil {
602 if obj.ContentType == uploadSessionContentType {
603 return nil, storagedriver.PathNotFoundError{Path: path}
604 }
605 fi = storagedriver.FileInfoFields{
606 Path: path,
607 Size: obj.Size,
608 ModTime: obj.Updated,
609 IsDir: false,
610 }
611 return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
612 }
613
614 dirpath := d.pathToDirKey(path)
615
616 query := &storage.Query{
617 Prefix: dirpath,
618 MaxResults: 1,
619 }
620
621 objects, err := storageListObjects(gcsContext, d.bucket, query)
622 if err != nil {
623 return nil, err
624 }
625 if len(objects.Results) < 1 {
626 return nil, storagedriver.PathNotFoundError{Path: path}
627 }
628 fi = storagedriver.FileInfoFields{
629 Path: path,
630 IsDir: true,
631 }
632 obj = objects.Results[0]
633 if obj.Name == dirpath {
634 fi.Size = obj.Size
635 fi.ModTime = obj.Updated
636 }
637 return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
638 }
639
640
641
642 func (d *driver) List(context context.Context, path string) ([]string, error) {
643 query := &storage.Query{
644 Delimiter: "/",
645 Prefix: d.pathToDirKey(path),
646 }
647 list := make([]string, 0, 64)
648 for {
649 objects, err := storageListObjects(d.context(context), d.bucket, query)
650 if err != nil {
651 return nil, err
652 }
653 for _, object := range objects.Results {
654
655
656
657 if object.Deleted.IsZero() && object.ContentType != uploadSessionContentType {
658 list = append(list, d.keyToPath(object.Name))
659 }
660 }
661 for _, subpath := range objects.Prefixes {
662 subpath = d.keyToPath(subpath)
663 list = append(list, subpath)
664 }
665 query = objects.Next
666 if query == nil {
667 break
668 }
669 }
670 if path != "/" && len(list) == 0 {
671
672
673 return nil, storagedriver.PathNotFoundError{Path: path}
674 }
675 return list, nil
676 }
677
678
679
680 func (d *driver) Move(context context.Context, sourcePath string, destPath string) error {
681 gcsContext := d.context(context)
682 _, err := storageCopyObject(gcsContext, d.bucket, d.pathToKey(sourcePath), d.bucket, d.pathToKey(destPath), nil)
683 if err != nil {
684 if status, ok := err.(*googleapi.Error); ok {
685 if status.Code == http.StatusNotFound {
686 return storagedriver.PathNotFoundError{Path: sourcePath}
687 }
688 }
689 return err
690 }
691 err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(sourcePath))
692
693
694 if err != nil {
695 logrus.Infof("error deleting file: %v due to %v", sourcePath, err)
696 }
697 return nil
698 }
699
700
701 func (d *driver) listAll(context context.Context, prefix string) ([]string, error) {
702 list := make([]string, 0, 64)
703 query := &storage.Query{}
704 query.Prefix = prefix
705 query.Versions = false
706 for {
707 objects, err := storageListObjects(d.context(context), d.bucket, query)
708 if err != nil {
709 return nil, err
710 }
711 for _, obj := range objects.Results {
712
713
714
715 if obj.Deleted.IsZero() {
716 list = append(list, obj.Name)
717 }
718 }
719 query = objects.Next
720 if query == nil {
721 break
722 }
723 }
724 return list, nil
725 }
726
727
728 func (d *driver) Delete(context context.Context, path string) error {
729 prefix := d.pathToDirKey(path)
730 gcsContext := d.context(context)
731 keys, err := d.listAll(gcsContext, prefix)
732 if err != nil {
733 return err
734 }
735 if len(keys) > 0 {
736 sort.Sort(sort.Reverse(sort.StringSlice(keys)))
737 for _, key := range keys {
738 err := storageDeleteObject(gcsContext, d.bucket, key)
739
740
741
742 if status, ok := err.(*googleapi.Error); ok {
743 if status.Code == http.StatusNotFound {
744 err = nil
745 }
746 }
747 if err != nil {
748 return err
749 }
750 }
751 return nil
752 }
753 err = storageDeleteObject(gcsContext, d.bucket, d.pathToKey(path))
754 if err != nil {
755 if status, ok := err.(*googleapi.Error); ok {
756 if status.Code == http.StatusNotFound {
757 return storagedriver.PathNotFoundError{Path: path}
758 }
759 }
760 }
761 return err
762 }
763
764 func storageDeleteObject(context context.Context, bucket string, name string) error {
765 return retry(func() error {
766 return storage.DeleteObject(context, bucket, name)
767 })
768 }
769
770 func storageStatObject(context context.Context, bucket string, name string) (*storage.Object, error) {
771 var obj *storage.Object
772 err := retry(func() error {
773 var err error
774 obj, err = storage.StatObject(context, bucket, name)
775 return err
776 })
777 return obj, err
778 }
779
780 func storageListObjects(context context.Context, bucket string, q *storage.Query) (*storage.Objects, error) {
781 var objs *storage.Objects
782 err := retry(func() error {
783 var err error
784 objs, err = storage.ListObjects(context, bucket, q)
785 return err
786 })
787 return objs, err
788 }
789
790 func storageCopyObject(context context.Context, srcBucket, srcName string, destBucket, destName string, attrs *storage.ObjectAttrs) (*storage.Object, error) {
791 var obj *storage.Object
792 err := retry(func() error {
793 var err error
794 obj, err = storage.CopyObject(context, srcBucket, srcName, destBucket, destName, attrs)
795 return err
796 })
797 return obj, err
798 }
799
800
801
802
803 func (d *driver) URLFor(context context.Context, path string, options map[string]interface{}) (string, error) {
804 if d.privateKey == nil {
805 return "", storagedriver.ErrUnsupportedMethod{}
806 }
807
808 name := d.pathToKey(path)
809 methodString := "GET"
810 method, ok := options["method"]
811 if ok {
812 methodString, ok = method.(string)
813 if !ok || (methodString != "GET" && methodString != "HEAD") {
814 return "", storagedriver.ErrUnsupportedMethod{}
815 }
816 }
817
818 expiresTime := time.Now().Add(20 * time.Minute)
819 expires, ok := options["expiry"]
820 if ok {
821 et, ok := expires.(time.Time)
822 if ok {
823 expiresTime = et
824 }
825 }
826
827 opts := &storage.SignedURLOptions{
828 GoogleAccessID: d.email,
829 PrivateKey: d.privateKey,
830 Method: methodString,
831 Expires: expiresTime,
832 }
833 return storage.SignedURL(d.bucket, name, opts)
834 }
835
836
837
838 func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
839 return storagedriver.WalkFallback(ctx, d, path, f)
840 }
841
842 func startSession(client *http.Client, bucket string, name string) (uri string, err error) {
843 u := &url.URL{
844 Scheme: "https",
845 Host: "www.googleapis.com",
846 Path: fmt.Sprintf("/upload/storage/v1/b/%v/o", bucket),
847 RawQuery: fmt.Sprintf("uploadType=resumable&name=%v", name),
848 }
849 err = retry(func() error {
850 req, err := http.NewRequest("POST", u.String(), nil)
851 if err != nil {
852 return err
853 }
854 req.Header.Set("X-Upload-Content-Type", "application/octet-stream")
855 req.Header.Set("Content-Length", "0")
856 resp, err := client.Do(req)
857 if err != nil {
858 return err
859 }
860 defer resp.Body.Close()
861 err = googleapi.CheckMediaResponse(resp)
862 if err != nil {
863 return err
864 }
865 uri = resp.Header.Get("Location")
866 return nil
867 })
868 return uri, err
869 }
870
871 func putChunk(client *http.Client, sessionURI string, chunk []byte, from int64, totalSize int64) (int64, error) {
872 bytesPut := int64(0)
873 err := retry(func() error {
874 req, err := http.NewRequest("PUT", sessionURI, bytes.NewReader(chunk))
875 if err != nil {
876 return err
877 }
878 length := int64(len(chunk))
879 to := from + length - 1
880 size := "*"
881 if totalSize >= 0 {
882 size = strconv.FormatInt(totalSize, 10)
883 }
884 req.Header.Set("Content-Type", "application/octet-stream")
885 if from == to+1 {
886 req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", size))
887 } else {
888 req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", from, to, size))
889 }
890 req.Header.Set("Content-Length", strconv.FormatInt(length, 10))
891
892 resp, err := client.Do(req)
893 if err != nil {
894 return err
895 }
896 defer resp.Body.Close()
897 if totalSize < 0 && resp.StatusCode == 308 {
898 groups := rangeHeader.FindStringSubmatch(resp.Header.Get("Range"))
899 end, err := strconv.ParseInt(groups[2], 10, 64)
900 if err != nil {
901 return err
902 }
903 bytesPut = end - from + 1
904 return nil
905 }
906 err = googleapi.CheckMediaResponse(resp)
907 if err != nil {
908 return err
909 }
910 bytesPut = to - from + 1
911 return nil
912 })
913 return bytesPut, err
914 }
915
916 func (d *driver) context(context context.Context) context.Context {
917 return cloud.WithContext(context, dummyProjectID, d.client)
918 }
919
920 func (d *driver) pathToKey(path string) string {
921 return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/"))
922 }
923
924 func (d *driver) pathToDirKey(path string) string {
925 return d.pathToKey(path) + "/"
926 }
927
928 func (d *driver) keyToPath(key string) string {
929 return "/" + strings.Trim(strings.TrimPrefix(key, d.rootDirectory), "/")
930 }
931
View as plain text