1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package storage
16
17 import (
18 "context"
19 "encoding/base64"
20 "errors"
21 "fmt"
22 "hash/crc32"
23 "io"
24 "net/url"
25 "os"
26
27 "cloud.google.com/go/iam/apiv1/iampb"
28 "cloud.google.com/go/internal/trace"
29 gapic "cloud.google.com/go/storage/internal/apiv2"
30 "cloud.google.com/go/storage/internal/apiv2/storagepb"
31 "github.com/googleapis/gax-go/v2"
32 "google.golang.org/api/googleapi"
33 "google.golang.org/api/iterator"
34 "google.golang.org/api/option"
35 "google.golang.org/api/option/internaloption"
36 "google.golang.org/grpc"
37 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/encoding"
39 "google.golang.org/grpc/metadata"
40 "google.golang.org/grpc/status"
41 "google.golang.org/protobuf/encoding/protowire"
42 "google.golang.org/protobuf/proto"
43 fieldmaskpb "google.golang.org/protobuf/types/known/fieldmaskpb"
44 )
45
46 const (
47
48
49
50
51
52
53
54 defaultConnPoolSize = 1
55
56
57
58
59
60 maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES)
61
62
63
64
65 globalProjectAlias = "_"
66
67
68
69
70 msgEntityNotSupported = "The gRPC API currently does not support ACL entities using project ID, use project numbers instead"
71 )
72
73
74
75 func defaultGRPCOptions() []option.ClientOption {
76 defaults := []option.ClientOption{
77 option.WithGRPCConnectionPool(defaultConnPoolSize),
78 }
79
80
81
82
83
84
85
86
87
88
89 if host := os.Getenv("STORAGE_EMULATOR_HOST_GRPC"); host != "" {
90
91
92 host = stripScheme(host)
93
94 defaults = append(defaults,
95 option.WithEndpoint(host),
96 option.WithGRPCDialOption(grpc.WithInsecure()),
97 option.WithoutAuthentication(),
98 )
99 } else {
100
101 defaults = append(defaults, internaloption.EnableDirectPath(true))
102 }
103
104 return defaults
105 }
106
107
108
109 type grpcStorageClient struct {
110 raw *gapic.Client
111 settings *settings
112 }
113
114
115
116 func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageClient, error) {
117 s := initSettings(opts...)
118 s.clientOption = append(defaultGRPCOptions(), s.clientOption...)
119
120 s.gax = append(s.gax, gax.WithRetry(nil))
121
122 config := newStorageConfig(s.clientOption...)
123 if config.readAPIWasSet {
124 return nil, errors.New("storage: GRPC is incompatible with any option that specifies an API for reads")
125 }
126
127 g, err := gapic.NewClient(ctx, s.clientOption...)
128 if err != nil {
129 return nil, err
130 }
131
132 return &grpcStorageClient{
133 raw: g,
134 settings: s,
135 }, nil
136 }
137
138 func (c *grpcStorageClient) Close() error {
139 return c.raw.Close()
140 }
141
142
143
144 func (c *grpcStorageClient) GetServiceAccount(ctx context.Context, project string, opts ...storageOption) (string, error) {
145 s := callSettings(c.settings, opts...)
146 req := &storagepb.GetServiceAccountRequest{
147 Project: toProjectResource(project),
148 }
149 var resp *storagepb.ServiceAccount
150 err := run(ctx, func(ctx context.Context) error {
151 var err error
152 resp, err = c.raw.GetServiceAccount(ctx, req, s.gax...)
153 return err
154 }, s.retry, s.idempotent)
155 if err != nil {
156 return "", err
157 }
158 return resp.EmailAddress, err
159 }
160
161 func (c *grpcStorageClient) CreateBucket(ctx context.Context, project, bucket string, attrs *BucketAttrs, enableObjectRetention *bool, opts ...storageOption) (*BucketAttrs, error) {
162 if enableObjectRetention != nil {
163
164 return nil, status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC")
165 }
166
167 s := callSettings(c.settings, opts...)
168 b := attrs.toProtoBucket()
169 b.Project = toProjectResource(project)
170
171
172 if b.GetLocation() == "" && b.GetLifecycle() != nil {
173 b.Location = "US"
174 }
175
176 req := &storagepb.CreateBucketRequest{
177 Parent: fmt.Sprintf("projects/%s", globalProjectAlias),
178 Bucket: b,
179 BucketId: bucket,
180 }
181 if attrs != nil {
182 req.PredefinedAcl = attrs.PredefinedACL
183 req.PredefinedDefaultObjectAcl = attrs.PredefinedDefaultObjectACL
184 }
185
186 var battrs *BucketAttrs
187 err := run(ctx, func(ctx context.Context) error {
188 res, err := c.raw.CreateBucket(ctx, req, s.gax...)
189
190 battrs = newBucketFromProto(res)
191
192 return err
193 }, s.retry, s.idempotent)
194
195 return battrs, err
196 }
197
198 func (c *grpcStorageClient) ListBuckets(ctx context.Context, project string, opts ...storageOption) *BucketIterator {
199 s := callSettings(c.settings, opts...)
200 it := &BucketIterator{
201 ctx: ctx,
202 projectID: project,
203 }
204
205 var gitr *gapic.BucketIterator
206 fetch := func(pageSize int, pageToken string) (token string, err error) {
207
208 var buckets []*storagepb.Bucket
209 var next string
210 err = run(it.ctx, func(ctx context.Context) error {
211
212
213
214
215
216
217 if pageToken == "" {
218 req := &storagepb.ListBucketsRequest{
219 Parent: toProjectResource(it.projectID),
220 Prefix: it.Prefix,
221 }
222 gitr = c.raw.ListBuckets(ctx, req, s.gax...)
223 }
224 buckets, next, err = gitr.InternalFetch(pageSize, pageToken)
225 return err
226 }, s.retry, s.idempotent)
227 if err != nil {
228 return "", err
229 }
230
231 for _, bkt := range buckets {
232 b := newBucketFromProto(bkt)
233 it.buckets = append(it.buckets, b)
234 }
235
236 return next, nil
237 }
238 it.pageInfo, it.nextFunc = iterator.NewPageInfo(
239 fetch,
240 func() int { return len(it.buckets) },
241 func() interface{} { b := it.buckets; it.buckets = nil; return b })
242
243 return it
244 }
245
246
247
248 func (c *grpcStorageClient) DeleteBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
249 s := callSettings(c.settings, opts...)
250 req := &storagepb.DeleteBucketRequest{
251 Name: bucketResourceName(globalProjectAlias, bucket),
252 }
253 if err := applyBucketCondsProto("grpcStorageClient.DeleteBucket", conds, req); err != nil {
254 return err
255 }
256 if s.userProject != "" {
257 ctx = setUserProjectMetadata(ctx, s.userProject)
258 }
259
260 return run(ctx, func(ctx context.Context) error {
261 return c.raw.DeleteBucket(ctx, req, s.gax...)
262 }, s.retry, s.idempotent)
263 }
264
265 func (c *grpcStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
266 s := callSettings(c.settings, opts...)
267 req := &storagepb.GetBucketRequest{
268 Name: bucketResourceName(globalProjectAlias, bucket),
269 ReadMask: &fieldmaskpb.FieldMask{Paths: []string{"*"}},
270 }
271 if err := applyBucketCondsProto("grpcStorageClient.GetBucket", conds, req); err != nil {
272 return nil, err
273 }
274 if s.userProject != "" {
275 ctx = setUserProjectMetadata(ctx, s.userProject)
276 }
277
278 var battrs *BucketAttrs
279 err := run(ctx, func(ctx context.Context) error {
280 res, err := c.raw.GetBucket(ctx, req, s.gax...)
281
282 battrs = newBucketFromProto(res)
283
284 return err
285 }, s.retry, s.idempotent)
286
287 if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
288 return nil, ErrBucketNotExist
289 }
290
291 return battrs, err
292 }
293 func (c *grpcStorageClient) UpdateBucket(ctx context.Context, bucket string, uattrs *BucketAttrsToUpdate, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
294 s := callSettings(c.settings, opts...)
295 b := uattrs.toProtoBucket()
296 b.Name = bucketResourceName(globalProjectAlias, bucket)
297 req := &storagepb.UpdateBucketRequest{
298 Bucket: b,
299 PredefinedAcl: uattrs.PredefinedACL,
300 PredefinedDefaultObjectAcl: uattrs.PredefinedDefaultObjectACL,
301 }
302 if err := applyBucketCondsProto("grpcStorageClient.UpdateBucket", conds, req); err != nil {
303 return nil, err
304 }
305 if s.userProject != "" {
306 ctx = setUserProjectMetadata(ctx, s.userProject)
307 }
308
309 var paths []string
310 fieldMask := &fieldmaskpb.FieldMask{
311 Paths: paths,
312 }
313 if uattrs.CORS != nil {
314 fieldMask.Paths = append(fieldMask.Paths, "cors")
315 }
316 if uattrs.DefaultEventBasedHold != nil {
317 fieldMask.Paths = append(fieldMask.Paths, "default_event_based_hold")
318 }
319 if uattrs.RetentionPolicy != nil {
320 fieldMask.Paths = append(fieldMask.Paths, "retention_policy")
321 }
322 if uattrs.VersioningEnabled != nil {
323 fieldMask.Paths = append(fieldMask.Paths, "versioning")
324 }
325 if uattrs.RequesterPays != nil {
326 fieldMask.Paths = append(fieldMask.Paths, "billing")
327 }
328 if uattrs.BucketPolicyOnly != nil || uattrs.UniformBucketLevelAccess != nil || uattrs.PublicAccessPrevention != PublicAccessPreventionUnknown {
329 fieldMask.Paths = append(fieldMask.Paths, "iam_config")
330 }
331 if uattrs.Encryption != nil {
332 fieldMask.Paths = append(fieldMask.Paths, "encryption")
333 }
334 if uattrs.Lifecycle != nil {
335 fieldMask.Paths = append(fieldMask.Paths, "lifecycle")
336 }
337 if uattrs.Logging != nil {
338 fieldMask.Paths = append(fieldMask.Paths, "logging")
339 }
340 if uattrs.Website != nil {
341 fieldMask.Paths = append(fieldMask.Paths, "website")
342 }
343 if uattrs.PredefinedACL != "" {
344
345 fieldMask.Paths = append(fieldMask.Paths, "acl")
346 }
347 if uattrs.PredefinedDefaultObjectACL != "" {
348
349 fieldMask.Paths = append(fieldMask.Paths, "default_object_acl")
350 }
351
352
353 if uattrs.acl != nil {
354
355 fieldMask.Paths = append(fieldMask.Paths, "acl")
356 }
357 if uattrs.defaultObjectACL != nil {
358
359 fieldMask.Paths = append(fieldMask.Paths, "default_object_acl")
360 }
361 if uattrs.StorageClass != "" {
362 fieldMask.Paths = append(fieldMask.Paths, "storage_class")
363 }
364 if uattrs.RPO != RPOUnknown {
365 fieldMask.Paths = append(fieldMask.Paths, "rpo")
366 }
367 if uattrs.Autoclass != nil {
368 fieldMask.Paths = append(fieldMask.Paths, "autoclass")
369 }
370 if uattrs.SoftDeletePolicy != nil {
371 fieldMask.Paths = append(fieldMask.Paths, "soft_delete_policy")
372 }
373
374 for label := range uattrs.setLabels {
375 fieldMask.Paths = append(fieldMask.Paths, fmt.Sprintf("labels.%s", label))
376 }
377
378
379 for label := range uattrs.deleteLabels {
380 fieldMask.Paths = append(fieldMask.Paths, fmt.Sprintf("labels.%s", label))
381 }
382
383 req.UpdateMask = fieldMask
384
385 if len(fieldMask.Paths) < 1 {
386
387
388 opts = append(opts, idempotent(true))
389 return c.GetBucket(ctx, bucket, conds, opts...)
390 }
391
392 var battrs *BucketAttrs
393 err := run(ctx, func(ctx context.Context) error {
394 res, err := c.raw.UpdateBucket(ctx, req, s.gax...)
395 battrs = newBucketFromProto(res)
396 return err
397 }, s.retry, s.idempotent)
398
399 return battrs, err
400 }
401 func (c *grpcStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
402 s := callSettings(c.settings, opts...)
403 req := &storagepb.LockBucketRetentionPolicyRequest{
404 Bucket: bucketResourceName(globalProjectAlias, bucket),
405 }
406 if err := applyBucketCondsProto("grpcStorageClient.LockBucketRetentionPolicy", conds, req); err != nil {
407 return err
408 }
409
410 return run(ctx, func(ctx context.Context) error {
411 _, err := c.raw.LockBucketRetentionPolicy(ctx, req, s.gax...)
412 return err
413 }, s.retry, s.idempotent)
414
415 }
416 func (c *grpcStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) *ObjectIterator {
417 s := callSettings(c.settings, opts...)
418 it := &ObjectIterator{
419 ctx: ctx,
420 }
421 if q != nil {
422 it.query = *q
423 }
424 req := &storagepb.ListObjectsRequest{
425 Parent: bucketResourceName(globalProjectAlias, bucket),
426 Prefix: it.query.Prefix,
427 Delimiter: it.query.Delimiter,
428 Versions: it.query.Versions,
429 LexicographicStart: it.query.StartOffset,
430 LexicographicEnd: it.query.EndOffset,
431 IncludeTrailingDelimiter: it.query.IncludeTrailingDelimiter,
432 MatchGlob: it.query.MatchGlob,
433 ReadMask: q.toFieldMask(),
434 SoftDeleted: it.query.SoftDeleted,
435 }
436 if s.userProject != "" {
437 ctx = setUserProjectMetadata(ctx, s.userProject)
438 }
439 fetch := func(pageSize int, pageToken string) (token string, err error) {
440
441
442 if it.query.IncludeFoldersAsPrefixes {
443 return "", status.Errorf(codes.Unimplemented, "storage: IncludeFoldersAsPrefixes is not supported in gRPC")
444 }
445 var objects []*storagepb.Object
446 var gitr *gapic.ObjectIterator
447 err = run(it.ctx, func(ctx context.Context) error {
448 gitr = c.raw.ListObjects(ctx, req, s.gax...)
449 it.ctx = ctx
450 objects, token, err = gitr.InternalFetch(pageSize, pageToken)
451 return err
452 }, s.retry, s.idempotent)
453 if err != nil {
454 if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
455 err = ErrBucketNotExist
456 }
457 return "", err
458 }
459
460 for _, obj := range objects {
461 b := newObjectFromProto(obj)
462 it.items = append(it.items, b)
463 }
464
465
466 res := gitr.Response.(*storagepb.ListObjectsResponse)
467 for _, prefix := range res.GetPrefixes() {
468 it.items = append(it.items, &ObjectAttrs{Prefix: prefix})
469 }
470
471 return token, nil
472 }
473 it.pageInfo, it.nextFunc = iterator.NewPageInfo(
474 fetch,
475 func() int { return len(it.items) },
476 func() interface{} { b := it.items; it.items = nil; return b })
477
478 return it
479 }
480
481
482
483 func (c *grpcStorageClient) DeleteObject(ctx context.Context, bucket, object string, gen int64, conds *Conditions, opts ...storageOption) error {
484 s := callSettings(c.settings, opts...)
485 req := &storagepb.DeleteObjectRequest{
486 Bucket: bucketResourceName(globalProjectAlias, bucket),
487 Object: object,
488 }
489 if err := applyCondsProto("grpcStorageClient.DeleteObject", gen, conds, req); err != nil {
490 return err
491 }
492 if s.userProject != "" {
493 ctx = setUserProjectMetadata(ctx, s.userProject)
494 }
495 err := run(ctx, func(ctx context.Context) error {
496 return c.raw.DeleteObject(ctx, req, s.gax...)
497 }, s.retry, s.idempotent)
498 if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
499 return ErrObjectNotExist
500 }
501 return err
502 }
503
504 func (c *grpcStorageClient) GetObject(ctx context.Context, params *getObjectParams, opts ...storageOption) (*ObjectAttrs, error) {
505 s := callSettings(c.settings, opts...)
506 req := &storagepb.GetObjectRequest{
507 Bucket: bucketResourceName(globalProjectAlias, params.bucket),
508 Object: params.object,
509
510 ReadMask: &fieldmaskpb.FieldMask{Paths: []string{"*"}},
511 }
512 if err := applyCondsProto("grpcStorageClient.GetObject", params.gen, params.conds, req); err != nil {
513 return nil, err
514 }
515 if s.userProject != "" {
516 ctx = setUserProjectMetadata(ctx, s.userProject)
517 }
518 if params.encryptionKey != nil {
519 req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(params.encryptionKey)
520 }
521 if params.softDeleted {
522 req.SoftDeleted = ¶ms.softDeleted
523 }
524
525 var attrs *ObjectAttrs
526 err := run(ctx, func(ctx context.Context) error {
527 res, err := c.raw.GetObject(ctx, req, s.gax...)
528 attrs = newObjectFromProto(res)
529
530 return err
531 }, s.retry, s.idempotent)
532
533 if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
534 return nil, ErrObjectNotExist
535 }
536
537 return attrs, err
538 }
539
540 func (c *grpcStorageClient) UpdateObject(ctx context.Context, params *updateObjectParams, opts ...storageOption) (*ObjectAttrs, error) {
541 uattrs := params.uattrs
542 if params.overrideRetention != nil || uattrs.Retention != nil {
543
544 return nil, status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC")
545 }
546 s := callSettings(c.settings, opts...)
547 o := uattrs.toProtoObject(bucketResourceName(globalProjectAlias, params.bucket), params.object)
548
549 if params.gen >= 0 {
550 o.Generation = params.gen
551 }
552 req := &storagepb.UpdateObjectRequest{
553 Object: o,
554 PredefinedAcl: uattrs.PredefinedACL,
555 }
556 if err := applyCondsProto("grpcStorageClient.UpdateObject", defaultGen, params.conds, req); err != nil {
557 return nil, err
558 }
559 if s.userProject != "" {
560 ctx = setUserProjectMetadata(ctx, s.userProject)
561 }
562 if params.encryptionKey != nil {
563 req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(params.encryptionKey)
564 }
565
566 fieldMask := &fieldmaskpb.FieldMask{Paths: nil}
567 if uattrs.EventBasedHold != nil {
568 fieldMask.Paths = append(fieldMask.Paths, "event_based_hold")
569 }
570 if uattrs.TemporaryHold != nil {
571 fieldMask.Paths = append(fieldMask.Paths, "temporary_hold")
572 }
573 if uattrs.ContentType != nil {
574 fieldMask.Paths = append(fieldMask.Paths, "content_type")
575 }
576 if uattrs.ContentLanguage != nil {
577 fieldMask.Paths = append(fieldMask.Paths, "content_language")
578 }
579 if uattrs.ContentEncoding != nil {
580 fieldMask.Paths = append(fieldMask.Paths, "content_encoding")
581 }
582 if uattrs.ContentDisposition != nil {
583 fieldMask.Paths = append(fieldMask.Paths, "content_disposition")
584 }
585 if uattrs.CacheControl != nil {
586 fieldMask.Paths = append(fieldMask.Paths, "cache_control")
587 }
588 if !uattrs.CustomTime.IsZero() {
589 fieldMask.Paths = append(fieldMask.Paths, "custom_time")
590 }
591
592
593 if uattrs.ACL != nil || len(uattrs.PredefinedACL) > 0 {
594 fieldMask.Paths = append(fieldMask.Paths, "acl")
595 }
596
597 if uattrs.Metadata != nil {
598
599
600 if len(uattrs.Metadata) == 0 {
601 fieldMask.Paths = append(fieldMask.Paths, "metadata")
602 } else {
603
604 for key := range uattrs.Metadata {
605 fieldMask.Paths = append(fieldMask.Paths, fmt.Sprintf("metadata.%s", key))
606 }
607 }
608 }
609
610 req.UpdateMask = fieldMask
611
612 if len(fieldMask.Paths) < 1 {
613
614
615
616
617
618
619
620 fieldMask.Paths = append(fieldMask.Paths, "create_time")
621 }
622
623 var attrs *ObjectAttrs
624 err := run(ctx, func(ctx context.Context) error {
625 res, err := c.raw.UpdateObject(ctx, req, s.gax...)
626 attrs = newObjectFromProto(res)
627 return err
628 }, s.retry, s.idempotent)
629 if e, ok := status.FromError(err); ok && e.Code() == codes.NotFound {
630 return nil, ErrObjectNotExist
631 }
632
633 return attrs, err
634 }
635
636 func (c *grpcStorageClient) RestoreObject(ctx context.Context, params *restoreObjectParams, opts ...storageOption) (*ObjectAttrs, error) {
637 s := callSettings(c.settings, opts...)
638 req := &storagepb.RestoreObjectRequest{
639 Bucket: bucketResourceName(globalProjectAlias, params.bucket),
640 Object: params.object,
641 CopySourceAcl: ¶ms.copySourceACL,
642 }
643 if err := applyCondsProto("grpcStorageClient.RestoreObject", params.gen, params.conds, req); err != nil {
644 return nil, err
645 }
646 if s.userProject != "" {
647 ctx = setUserProjectMetadata(ctx, s.userProject)
648 }
649
650 var attrs *ObjectAttrs
651 err := run(ctx, func(ctx context.Context) error {
652 res, err := c.raw.RestoreObject(ctx, req, s.gax...)
653 attrs = newObjectFromProto(res)
654 return err
655 }, s.retry, s.idempotent)
656 if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
657 return nil, ErrObjectNotExist
658 }
659 return attrs, err
660 }
661
662
663
664 func (c *grpcStorageClient) DeleteDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
665
666
667 attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
668 if err != nil {
669 return err
670 }
671
672
673
674
675 invalidEntity := true
676 var acl []ACLRule
677 for _, a := range attrs.DefaultObjectACL {
678 if a.Entity != entity {
679 acl = append(acl, a)
680 }
681 if a.Entity == entity {
682 invalidEntity = false
683 }
684 }
685 if invalidEntity {
686 return fmt.Errorf("storage: entity %v was not found on bucket %v, got %v. %v", entity, bucket, attrs.DefaultObjectACL, msgEntityNotSupported)
687 }
688 uattrs := &BucketAttrsToUpdate{defaultObjectACL: acl}
689
690 if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
691 return err
692 }
693 return nil
694 }
695
696 func (c *grpcStorageClient) ListDefaultObjectACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
697 attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
698 if err != nil {
699 return nil, err
700 }
701 return attrs.DefaultObjectACL, nil
702 }
703
704 func (c *grpcStorageClient) UpdateDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
705
706
707 attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
708 if err != nil {
709 return err
710 }
711
712
713 var acl []ACLRule
714 aclRule := ACLRule{Entity: entity, Role: role}
715 acl = append(attrs.DefaultObjectACL, aclRule)
716 uattrs := &BucketAttrsToUpdate{defaultObjectACL: acl}
717
718 if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
719 return err
720 }
721 return nil
722 }
723
724
725
726 func (c *grpcStorageClient) DeleteBucketACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
727
728
729 attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
730 if err != nil {
731 return err
732 }
733
734
735
736
737 invalidEntity := true
738 var acl []ACLRule
739 for _, a := range attrs.ACL {
740 if a.Entity != entity {
741 acl = append(acl, a)
742 }
743 if a.Entity == entity {
744 invalidEntity = false
745 }
746 }
747 if invalidEntity {
748 return fmt.Errorf("storage: entity %v was not found on bucket %v, got %v. %v", entity, bucket, attrs.ACL, msgEntityNotSupported)
749 }
750 uattrs := &BucketAttrsToUpdate{acl: acl}
751
752 if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
753 return err
754 }
755 return nil
756 }
757
758 func (c *grpcStorageClient) ListBucketACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
759 attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
760 if err != nil {
761 return nil, err
762 }
763 return attrs.ACL, nil
764 }
765
766 func (c *grpcStorageClient) UpdateBucketACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
767
768
769 attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
770 if err != nil {
771 return err
772 }
773
774
775 var acl []ACLRule
776 aclRule := ACLRule{Entity: entity, Role: role}
777 acl = append(attrs.ACL, aclRule)
778 uattrs := &BucketAttrsToUpdate{acl: acl}
779
780 if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
781 return err
782 }
783 return nil
784 }
785
786
787
788 func (c *grpcStorageClient) DeleteObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, opts ...storageOption) error {
789
790
791 attrs, err := c.GetObject(ctx, &getObjectParams{bucket, object, defaultGen, nil, nil, false}, opts...)
792 if err != nil {
793 return err
794 }
795
796
797
798
799 invalidEntity := true
800 var acl []ACLRule
801 for _, a := range attrs.ACL {
802 if a.Entity != entity {
803 acl = append(acl, a)
804 }
805 if a.Entity == entity {
806 invalidEntity = false
807 }
808 }
809 if invalidEntity {
810 return fmt.Errorf("storage: entity %v was not found on bucket %v, got %v. %v", entity, bucket, attrs.ACL, msgEntityNotSupported)
811 }
812 uattrs := &ObjectAttrsToUpdate{ACL: acl}
813
814 params := &updateObjectParams{bucket: bucket, object: object, uattrs: uattrs, gen: defaultGen, conds: &Conditions{MetagenerationMatch: attrs.Metageneration}}
815 if _, err = c.UpdateObject(ctx, params, opts...); err != nil {
816 return err
817 }
818 return nil
819 }
820
821
822
823 func (c *grpcStorageClient) ListObjectACLs(ctx context.Context, bucket, object string, opts ...storageOption) ([]ACLRule, error) {
824 o, err := c.GetObject(ctx, &getObjectParams{bucket, object, defaultGen, nil, nil, false}, opts...)
825 if err != nil {
826 return nil, err
827 }
828 return o.ACL, nil
829 }
830
831 func (c *grpcStorageClient) UpdateObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
832
833
834 attrs, err := c.GetObject(ctx, &getObjectParams{bucket, object, defaultGen, nil, nil, false}, opts...)
835 if err != nil {
836 return err
837 }
838
839
840 var acl []ACLRule
841 aclRule := ACLRule{Entity: entity, Role: role}
842 acl = append(attrs.ACL, aclRule)
843 uattrs := &ObjectAttrsToUpdate{ACL: acl}
844
845 params := &updateObjectParams{bucket: bucket, object: object, uattrs: uattrs, gen: defaultGen, conds: &Conditions{MetagenerationMatch: attrs.Metageneration}}
846 if _, err = c.UpdateObject(ctx, params, opts...); err != nil {
847 return err
848 }
849 return nil
850 }
851
852
853
854 func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjectRequest, opts ...storageOption) (*ObjectAttrs, error) {
855 s := callSettings(c.settings, opts...)
856 if s.userProject != "" {
857 ctx = setUserProjectMetadata(ctx, s.userProject)
858 }
859
860 dstObjPb := req.dstObject.attrs.toProtoObject(req.dstBucket)
861 dstObjPb.Name = req.dstObject.name
862
863 if req.sendCRC32C {
864 dstObjPb.Checksums.Crc32C = &req.dstObject.attrs.CRC32C
865 }
866
867 srcs := []*storagepb.ComposeObjectRequest_SourceObject{}
868 for _, src := range req.srcs {
869 srcObjPb := &storagepb.ComposeObjectRequest_SourceObject{Name: src.name, ObjectPreconditions: &storagepb.ComposeObjectRequest_SourceObject_ObjectPreconditions{}}
870 if src.gen >= 0 {
871 srcObjPb.Generation = src.gen
872 }
873 if err := applyCondsProto("ComposeObject source", defaultGen, src.conds, srcObjPb.ObjectPreconditions); err != nil {
874 return nil, err
875 }
876 srcs = append(srcs, srcObjPb)
877 }
878
879 rawReq := &storagepb.ComposeObjectRequest{
880 Destination: dstObjPb,
881 SourceObjects: srcs,
882 }
883 if err := applyCondsProto("ComposeObject destination", defaultGen, req.dstObject.conds, rawReq); err != nil {
884 return nil, err
885 }
886 if req.predefinedACL != "" {
887 rawReq.DestinationPredefinedAcl = req.predefinedACL
888 }
889 if req.dstObject.encryptionKey != nil {
890 rawReq.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey)
891 }
892
893 var obj *storagepb.Object
894 var err error
895 if err := run(ctx, func(ctx context.Context) error {
896 obj, err = c.raw.ComposeObject(ctx, rawReq, s.gax...)
897 return err
898 }, s.retry, s.idempotent); err != nil {
899 return nil, err
900 }
901
902 return newObjectFromProto(obj), nil
903 }
904 func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) {
905 s := callSettings(c.settings, opts...)
906 obj := req.dstObject.attrs.toProtoObject("")
907 call := &storagepb.RewriteObjectRequest{
908 SourceBucket: bucketResourceName(globalProjectAlias, req.srcObject.bucket),
909 SourceObject: req.srcObject.name,
910 RewriteToken: req.token,
911 DestinationBucket: bucketResourceName(globalProjectAlias, req.dstObject.bucket),
912 DestinationName: req.dstObject.name,
913 Destination: obj,
914 DestinationKmsKey: req.dstObject.keyName,
915 DestinationPredefinedAcl: req.predefinedACL,
916 CommonObjectRequestParams: toProtoCommonObjectRequestParams(req.dstObject.encryptionKey),
917 }
918
919
920 if s.userProject != "" {
921 ctx = setUserProjectMetadata(ctx, s.userProject)
922 }
923 if err := applyCondsProto("Copy destination", defaultGen, req.dstObject.conds, call); err != nil {
924 return nil, err
925 }
926 if err := applySourceCondsProto(req.srcObject.gen, req.srcObject.conds, call); err != nil {
927 return nil, err
928 }
929
930 if len(req.dstObject.encryptionKey) > 0 {
931 call.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey)
932 }
933 if len(req.srcObject.encryptionKey) > 0 {
934 srcParams := toProtoCommonObjectRequestParams(req.srcObject.encryptionKey)
935 call.CopySourceEncryptionAlgorithm = srcParams.GetEncryptionAlgorithm()
936 call.CopySourceEncryptionKeyBytes = srcParams.GetEncryptionKeyBytes()
937 call.CopySourceEncryptionKeySha256Bytes = srcParams.GetEncryptionKeySha256Bytes()
938 }
939
940 call.MaxBytesRewrittenPerCall = req.maxBytesRewrittenPerCall
941
942 var res *storagepb.RewriteResponse
943 var err error
944
945 retryCall := func(ctx context.Context) error { res, err = c.raw.RewriteObject(ctx, call, s.gax...); return err }
946
947 if err := run(ctx, retryCall, s.retry, s.idempotent); err != nil {
948 return nil, err
949 }
950
951 r := &rewriteObjectResponse{
952 done: res.GetDone(),
953 written: res.GetTotalBytesRewritten(),
954 size: res.GetObjectSize(),
955 token: res.GetRewriteToken(),
956 resource: newObjectFromProto(res.GetResource()),
957 }
958
959 return r, nil
960 }
961
962
963
964 type bytesCodec struct {
965 encoding.Codec
966 }
967
968 func (bytesCodec) Marshal(v any) ([]byte, error) {
969 vv, ok := v.(proto.Message)
970 if !ok {
971 return nil, fmt.Errorf("failed to marshal, message is %T, want proto.Message", v)
972 }
973 return proto.Marshal(vv)
974 }
975
976 func (bytesCodec) Unmarshal(data []byte, v any) error {
977 switch v := v.(type) {
978 case *[]byte:
979
980
981 *v = data
982 return nil
983 case proto.Message:
984 return proto.Unmarshal(data, v)
985 default:
986 return fmt.Errorf("can not unmarshal type %T", v)
987 }
988 }
989
990 func (bytesCodec) Name() string {
991
992
993 return ""
994 }
995
996 func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) {
997 ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.NewRangeReader")
998 defer func() { trace.EndSpan(ctx, err) }()
999
1000 s := callSettings(c.settings, opts...)
1001
1002 s.gax = append(s.gax, gax.WithGRPCOptions(
1003 grpc.ForceCodec(bytesCodec{}),
1004 ))
1005
1006 if s.userProject != "" {
1007 ctx = setUserProjectMetadata(ctx, s.userProject)
1008 }
1009
1010 b := bucketResourceName(globalProjectAlias, params.bucket)
1011 req := &storagepb.ReadObjectRequest{
1012 Bucket: b,
1013 Object: params.object,
1014 CommonObjectRequestParams: toProtoCommonObjectRequestParams(params.encryptionKey),
1015 }
1016
1017 if params.gen >= 0 {
1018 req.Generation = params.gen
1019 }
1020
1021 var databuf []byte
1022
1023
1024
1025 reopen := func(seen int64) (*readStreamResponse, context.CancelFunc, error) {
1026
1027
1028 if err := ctx.Err(); err != nil {
1029 return nil, nil, err
1030 }
1031
1032 cc, cancel := context.WithCancel(ctx)
1033
1034 req.ReadOffset = params.offset + seen
1035
1036
1037
1038 if params.length > 0 {
1039 req.ReadLimit = params.length - seen
1040 }
1041
1042 if err := applyCondsProto("gRPCReader.reopen", params.gen, params.conds, req); err != nil {
1043 cancel()
1044 return nil, nil, err
1045 }
1046
1047 var stream storagepb.Storage_ReadObjectClient
1048 var msg *storagepb.ReadObjectResponse
1049 var err error
1050
1051 err = run(cc, func(ctx context.Context) error {
1052 stream, err = c.raw.ReadObject(cc, req, s.gax...)
1053 if err != nil {
1054 return err
1055 }
1056
1057
1058
1059 err := stream.RecvMsg(&databuf)
1060
1061
1062 if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
1063 return ErrObjectNotExist
1064 }
1065 if err != nil {
1066 return err
1067 }
1068
1069
1070
1071
1072
1073 msg, err = readFullObjectResponse(databuf)
1074
1075 return err
1076 }, s.retry, s.idempotent)
1077 if err != nil {
1078
1079
1080 cancel()
1081 return nil, nil, err
1082 }
1083
1084 return &readStreamResponse{stream, msg}, cancel, nil
1085 }
1086
1087 res, cancel, err := reopen(0)
1088 if err != nil {
1089 return nil, err
1090 }
1091
1092
1093
1094 msg := res.response
1095 obj := msg.GetMetadata()
1096
1097 size := obj.GetSize()
1098
1099
1100 var (
1101 wantCRC uint32
1102 checkCRC bool
1103 )
1104 if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 {
1105 wantCRC = checksums.GetCrc32C()
1106 checkCRC = true
1107 }
1108
1109 r = &Reader{
1110 Attrs: ReaderObjectAttrs{
1111 Size: size,
1112 ContentType: obj.GetContentType(),
1113 ContentEncoding: obj.GetContentEncoding(),
1114 CacheControl: obj.GetCacheControl(),
1115 LastModified: obj.GetUpdateTime().AsTime(),
1116 Metageneration: obj.GetMetageneration(),
1117 Generation: obj.GetGeneration(),
1118 },
1119 reader: &gRPCReader{
1120 stream: res.stream,
1121 reopen: reopen,
1122 cancel: cancel,
1123 size: size,
1124
1125
1126 leftovers: msg.GetChecksummedData().GetContent(),
1127 settings: s,
1128 zeroRange: params.length == 0,
1129 databuf: databuf,
1130 wantCRC: wantCRC,
1131 checkCRC: checkCRC,
1132 },
1133 checkCRC: checkCRC,
1134 }
1135
1136 cr := msg.GetContentRange()
1137 if cr != nil {
1138 r.Attrs.StartOffset = cr.GetStart()
1139 r.remain = cr.GetEnd() - cr.GetStart()
1140 } else {
1141 r.remain = size
1142 }
1143
1144
1145
1146 if params.length == 0 {
1147 r.remain = 0
1148 r.reader.Close()
1149 }
1150
1151 return r, nil
1152 }
1153
1154 func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error) {
1155 s := callSettings(c.settings, opts...)
1156
1157 var offset int64
1158 errorf := params.setError
1159 progress := params.progress
1160 setObj := params.setObj
1161
1162 pr, pw := io.Pipe()
1163 gw := newGRPCWriter(c, params, pr)
1164 gw.settings = s
1165 if s.userProject != "" {
1166 gw.ctx = setUserProjectMetadata(gw.ctx, s.userProject)
1167 }
1168
1169
1170
1171 go func() {
1172 defer close(params.donec)
1173
1174
1175 for {
1176
1177 recvd, doneReading, err := gw.read()
1178 if err != nil {
1179 err = checkCanceled(err)
1180 errorf(err)
1181 pr.CloseWithError(err)
1182 return
1183 }
1184
1185 if params.attrs.Retention != nil {
1186
1187 err = status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC")
1188 errorf(err)
1189 pr.CloseWithError(err)
1190 return
1191 }
1192
1193
1194
1195
1196
1197
1198
1199 if !doneReading && gw.upid == "" && params.chunkSize != 0 {
1200 err = gw.startResumableUpload()
1201 if err != nil {
1202 err = checkCanceled(err)
1203 errorf(err)
1204 pr.CloseWithError(err)
1205 return
1206 }
1207 }
1208
1209 o, off, err := gw.uploadBuffer(recvd, offset, doneReading)
1210 if err != nil {
1211 err = checkCanceled(err)
1212 errorf(err)
1213 pr.CloseWithError(err)
1214 return
1215 }
1216
1217
1218
1219
1220
1221 if gw.upid != "" || gw.chunkSize == 0 {
1222 offset = off
1223 }
1224 if gw.upid != "" {
1225 progress(offset)
1226 }
1227
1228
1229
1230 if doneReading {
1231
1232 setObj(newObjectFromProto(o))
1233 return
1234 }
1235 }
1236 }()
1237
1238 return pw, nil
1239 }
1240
1241
1242
1243 func (c *grpcStorageClient) GetIamPolicy(ctx context.Context, resource string, version int32, opts ...storageOption) (*iampb.Policy, error) {
1244
1245 s := callSettings(c.settings, opts...)
1246 req := &iampb.GetIamPolicyRequest{
1247 Resource: bucketResourceName(globalProjectAlias, resource),
1248 Options: &iampb.GetPolicyOptions{
1249 RequestedPolicyVersion: version,
1250 },
1251 }
1252 var rp *iampb.Policy
1253 err := run(ctx, func(ctx context.Context) error {
1254 var err error
1255 rp, err = c.raw.GetIamPolicy(ctx, req, s.gax...)
1256 return err
1257 }, s.retry, s.idempotent)
1258
1259 return rp, err
1260 }
1261
1262 func (c *grpcStorageClient) SetIamPolicy(ctx context.Context, resource string, policy *iampb.Policy, opts ...storageOption) error {
1263
1264 s := callSettings(c.settings, opts...)
1265
1266 req := &iampb.SetIamPolicyRequest{
1267 Resource: bucketResourceName(globalProjectAlias, resource),
1268 Policy: policy,
1269 }
1270
1271 return run(ctx, func(ctx context.Context) error {
1272 _, err := c.raw.SetIamPolicy(ctx, req, s.gax...)
1273 return err
1274 }, s.retry, s.idempotent)
1275 }
1276
1277 func (c *grpcStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) {
1278
1279 s := callSettings(c.settings, opts...)
1280 req := &iampb.TestIamPermissionsRequest{
1281 Resource: bucketResourceName(globalProjectAlias, resource),
1282 Permissions: permissions,
1283 }
1284 var res *iampb.TestIamPermissionsResponse
1285 err := run(ctx, func(ctx context.Context) error {
1286 var err error
1287 res, err = c.raw.TestIamPermissions(ctx, req, s.gax...)
1288 return err
1289 }, s.retry, s.idempotent)
1290 if err != nil {
1291 return nil, err
1292 }
1293 return res.Permissions, nil
1294 }
1295
1296
1297
1298 func (c *grpcStorageClient) GetHMACKey(ctx context.Context, project, accessID string, opts ...storageOption) (*HMACKey, error) {
1299 s := callSettings(c.settings, opts...)
1300 req := &storagepb.GetHmacKeyRequest{
1301 AccessId: accessID,
1302 Project: toProjectResource(project),
1303 }
1304 if s.userProject != "" {
1305 ctx = setUserProjectMetadata(ctx, s.userProject)
1306 }
1307 var metadata *storagepb.HmacKeyMetadata
1308 err := run(ctx, func(ctx context.Context) error {
1309 var err error
1310 metadata, err = c.raw.GetHmacKey(ctx, req, s.gax...)
1311 return err
1312 }, s.retry, s.idempotent)
1313 if err != nil {
1314 return nil, err
1315 }
1316 return toHMACKeyFromProto(metadata), nil
1317 }
1318
1319 func (c *grpcStorageClient) ListHMACKeys(ctx context.Context, project, serviceAccountEmail string, showDeletedKeys bool, opts ...storageOption) *HMACKeysIterator {
1320 s := callSettings(c.settings, opts...)
1321 req := &storagepb.ListHmacKeysRequest{
1322 Project: toProjectResource(project),
1323 ServiceAccountEmail: serviceAccountEmail,
1324 ShowDeletedKeys: showDeletedKeys,
1325 }
1326 if s.userProject != "" {
1327 ctx = setUserProjectMetadata(ctx, s.userProject)
1328 }
1329 it := &HMACKeysIterator{
1330 ctx: ctx,
1331 projectID: project,
1332 retry: s.retry,
1333 }
1334 fetch := func(pageSize int, pageToken string) (token string, err error) {
1335 var hmacKeys []*storagepb.HmacKeyMetadata
1336 err = run(it.ctx, func(ctx context.Context) error {
1337 gitr := c.raw.ListHmacKeys(ctx, req, s.gax...)
1338 hmacKeys, token, err = gitr.InternalFetch(pageSize, pageToken)
1339 return err
1340 }, s.retry, s.idempotent)
1341 if err != nil {
1342 return "", err
1343 }
1344 for _, hkmd := range hmacKeys {
1345 hk := toHMACKeyFromProto(hkmd)
1346 it.hmacKeys = append(it.hmacKeys, hk)
1347 }
1348
1349 return token, nil
1350 }
1351 it.pageInfo, it.nextFunc = iterator.NewPageInfo(
1352 fetch,
1353 func() int { return len(it.hmacKeys) - it.index },
1354 func() interface{} {
1355 prev := it.hmacKeys
1356 it.hmacKeys = it.hmacKeys[:0]
1357 it.index = 0
1358 return prev
1359 })
1360 return it
1361 }
1362
1363 func (c *grpcStorageClient) UpdateHMACKey(ctx context.Context, project, serviceAccountEmail, accessID string, attrs *HMACKeyAttrsToUpdate, opts ...storageOption) (*HMACKey, error) {
1364 s := callSettings(c.settings, opts...)
1365 hk := &storagepb.HmacKeyMetadata{
1366 AccessId: accessID,
1367 Project: toProjectResource(project),
1368 ServiceAccountEmail: serviceAccountEmail,
1369 State: string(attrs.State),
1370 Etag: attrs.Etag,
1371 }
1372 var paths []string
1373 fieldMask := &fieldmaskpb.FieldMask{
1374 Paths: paths,
1375 }
1376 if attrs.State != "" {
1377 fieldMask.Paths = append(fieldMask.Paths, "state")
1378 }
1379 req := &storagepb.UpdateHmacKeyRequest{
1380 HmacKey: hk,
1381 UpdateMask: fieldMask,
1382 }
1383 if s.userProject != "" {
1384 ctx = setUserProjectMetadata(ctx, s.userProject)
1385 }
1386 var metadata *storagepb.HmacKeyMetadata
1387 err := run(ctx, func(ctx context.Context) error {
1388 var err error
1389 metadata, err = c.raw.UpdateHmacKey(ctx, req, s.gax...)
1390 return err
1391 }, s.retry, s.idempotent)
1392 if err != nil {
1393 return nil, err
1394 }
1395 return toHMACKeyFromProto(metadata), nil
1396 }
1397
1398 func (c *grpcStorageClient) CreateHMACKey(ctx context.Context, project, serviceAccountEmail string, opts ...storageOption) (*HMACKey, error) {
1399 s := callSettings(c.settings, opts...)
1400 req := &storagepb.CreateHmacKeyRequest{
1401 Project: toProjectResource(project),
1402 ServiceAccountEmail: serviceAccountEmail,
1403 }
1404 if s.userProject != "" {
1405 ctx = setUserProjectMetadata(ctx, s.userProject)
1406 }
1407 var res *storagepb.CreateHmacKeyResponse
1408 err := run(ctx, func(ctx context.Context) error {
1409 var err error
1410 res, err = c.raw.CreateHmacKey(ctx, req, s.gax...)
1411 return err
1412 }, s.retry, s.idempotent)
1413 if err != nil {
1414 return nil, err
1415 }
1416 key := toHMACKeyFromProto(res.Metadata)
1417 key.Secret = base64.StdEncoding.EncodeToString(res.SecretKeyBytes)
1418
1419 return key, nil
1420 }
1421
1422 func (c *grpcStorageClient) DeleteHMACKey(ctx context.Context, project string, accessID string, opts ...storageOption) error {
1423 s := callSettings(c.settings, opts...)
1424 req := &storagepb.DeleteHmacKeyRequest{
1425 AccessId: accessID,
1426 Project: toProjectResource(project),
1427 }
1428 if s.userProject != "" {
1429 ctx = setUserProjectMetadata(ctx, s.userProject)
1430 }
1431 return run(ctx, func(ctx context.Context) error {
1432 return c.raw.DeleteHmacKey(ctx, req, s.gax...)
1433 }, s.retry, s.idempotent)
1434 }
1435
1436
1437
1438 func (c *grpcStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) {
1439 ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.ListNotifications")
1440 defer func() { trace.EndSpan(ctx, err) }()
1441
1442 s := callSettings(c.settings, opts...)
1443 if s.userProject != "" {
1444 ctx = setUserProjectMetadata(ctx, s.userProject)
1445 }
1446 req := &storagepb.ListNotificationConfigsRequest{
1447 Parent: bucketResourceName(globalProjectAlias, bucket),
1448 }
1449 var notifications []*storagepb.NotificationConfig
1450 err = run(ctx, func(ctx context.Context) error {
1451 gitr := c.raw.ListNotificationConfigs(ctx, req, s.gax...)
1452 for {
1453
1454 items, nextPageToken, err := gitr.InternalFetch(int(req.GetPageSize()), req.GetPageToken())
1455 if err != nil {
1456 return err
1457 }
1458 notifications = append(notifications, items...)
1459
1460 if nextPageToken == "" {
1461 return err
1462 }
1463 req.PageToken = nextPageToken
1464 }
1465 }, s.retry, s.idempotent)
1466 if err != nil {
1467 return nil, err
1468 }
1469
1470 return notificationsToMapFromProto(notifications), nil
1471 }
1472
1473 func (c *grpcStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) {
1474 ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.CreateNotification")
1475 defer func() { trace.EndSpan(ctx, err) }()
1476
1477 s := callSettings(c.settings, opts...)
1478 req := &storagepb.CreateNotificationConfigRequest{
1479 Parent: bucketResourceName(globalProjectAlias, bucket),
1480 NotificationConfig: toProtoNotification(n),
1481 }
1482 var pbn *storagepb.NotificationConfig
1483 err = run(ctx, func(ctx context.Context) error {
1484 var err error
1485 pbn, err = c.raw.CreateNotificationConfig(ctx, req, s.gax...)
1486 return err
1487 }, s.retry, s.idempotent)
1488 if err != nil {
1489 return nil, err
1490 }
1491 return toNotificationFromProto(pbn), err
1492 }
1493
1494 func (c *grpcStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) {
1495 ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.DeleteNotification")
1496 defer func() { trace.EndSpan(ctx, err) }()
1497
1498 s := callSettings(c.settings, opts...)
1499 req := &storagepb.DeleteNotificationConfigRequest{Name: id}
1500 return run(ctx, func(ctx context.Context) error {
1501 return c.raw.DeleteNotificationConfig(ctx, req, s.gax...)
1502 }, s.retry, s.idempotent)
1503 }
1504
1505
1506
1507
1508
1509
1510 func setUserProjectMetadata(ctx context.Context, project string) context.Context {
1511 return metadata.AppendToOutgoingContext(ctx, "x-goog-user-project", project)
1512 }
1513
1514 type readStreamResponse struct {
1515 stream storagepb.Storage_ReadObjectClient
1516 response *storagepb.ReadObjectResponse
1517 }
1518
1519 type gRPCReader struct {
1520 seen, size int64
1521 zeroRange bool
1522 stream storagepb.Storage_ReadObjectClient
1523 reopen func(seen int64) (*readStreamResponse, context.CancelFunc, error)
1524 leftovers []byte
1525 databuf []byte
1526 cancel context.CancelFunc
1527 settings *settings
1528 checkCRC bool
1529 wantCRC uint32
1530 gotCRC uint32
1531 }
1532
1533
1534 func (r *gRPCReader) updateCRC(b []byte) {
1535 if r.checkCRC {
1536 r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, b)
1537 }
1538 }
1539
1540
1541 func (r *gRPCReader) runCRCCheck() error {
1542 if r.checkCRC && r.gotCRC != r.wantCRC {
1543 return fmt.Errorf("storage: bad CRC on read: got %d, want %d", r.gotCRC, r.wantCRC)
1544 }
1545 return nil
1546 }
1547
1548
1549 func (r *gRPCReader) Read(p []byte) (int, error) {
1550
1551
1552 if r.size == r.seen || r.zeroRange {
1553 if err := r.runCRCCheck(); err != nil {
1554 return 0, err
1555 }
1556 return 0, io.EOF
1557 }
1558
1559
1560
1561
1562
1563 if r.stream == nil {
1564 return 0, fmt.Errorf("storage: reader has been closed")
1565 }
1566
1567 var n int
1568
1569
1570 if len(r.leftovers) > 0 {
1571 n = copy(p, r.leftovers)
1572 r.seen += int64(n)
1573 r.updateCRC(p[:n])
1574 r.leftovers = r.leftovers[n:]
1575 return n, nil
1576 }
1577
1578
1579 content, err := r.recv()
1580 if err != nil {
1581 return 0, err
1582 }
1583
1584
1585
1586
1587
1588
1589
1590
1591 n = copy(p[n:], content)
1592 leftover := len(content) - n
1593 if leftover > 0 {
1594
1595
1596 r.leftovers = content[n:]
1597 }
1598 r.seen += int64(n)
1599 r.updateCRC(p[:n])
1600
1601 return n, nil
1602 }
1603
1604
1605
1606 func (r *gRPCReader) WriteTo(w io.Writer) (int64, error) {
1607
1608
1609 if r.size == r.seen || r.zeroRange {
1610 if err := r.runCRCCheck(); err != nil {
1611 return 0, err
1612 }
1613 return 0, nil
1614 }
1615
1616
1617
1618
1619
1620 if r.stream == nil {
1621 return 0, fmt.Errorf("storage: reader has been closed")
1622 }
1623
1624
1625 var alreadySeen = r.seen
1626
1627
1628
1629 if len(r.leftovers) > 0 {
1630
1631 written, err := w.Write(r.leftovers)
1632 r.seen += int64(written)
1633 r.updateCRC(r.leftovers)
1634 r.leftovers = nil
1635 if err != nil {
1636 return r.seen - alreadySeen, err
1637 }
1638 }
1639
1640
1641 for {
1642
1643
1644
1645 msg, err := r.recv()
1646 if err != nil {
1647 if err == io.EOF {
1648
1649 err = r.runCRCCheck()
1650 }
1651 return r.seen - alreadySeen, err
1652 }
1653
1654
1655
1656
1657
1658
1659
1660
1661 written, err := w.Write(msg)
1662 r.seen += int64(written)
1663 r.updateCRC(msg)
1664 if err != nil {
1665 return r.seen - alreadySeen, err
1666 }
1667 }
1668
1669 }
1670
1671
1672
1673 func (r *gRPCReader) Close() error {
1674 if r.cancel != nil {
1675 r.cancel()
1676 }
1677 r.stream = nil
1678 return nil
1679 }
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692 func (r *gRPCReader) recv() ([]byte, error) {
1693 err := r.stream.RecvMsg(&r.databuf)
1694
1695 var shouldRetry = ShouldRetry
1696 if r.settings.retry != nil && r.settings.retry.shouldRetry != nil {
1697 shouldRetry = r.settings.retry.shouldRetry
1698 }
1699 if err != nil && shouldRetry(err) {
1700
1701
1702
1703
1704 msg, err := r.reopenStream()
1705 return msg.GetChecksummedData().GetContent(), err
1706 }
1707
1708 if err != nil {
1709 return nil, err
1710 }
1711
1712 return readObjectResponseContent(r.databuf)
1713 }
1714
1715
1716 const (
1717 checksummedDataField = protowire.Number(1)
1718 checksummedDataContentField = protowire.Number(1)
1719 checksummedDataCRC32CField = protowire.Number(2)
1720 objectChecksumsField = protowire.Number(2)
1721 contentRangeField = protowire.Number(3)
1722 metadataField = protowire.Number(4)
1723 )
1724
1725
1726
1727
1728
1729 func readObjectResponseContent(b []byte) ([]byte, error) {
1730 checksummedData, err := readProtoBytes(b, checksummedDataField)
1731 if err != nil {
1732 return b, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData: %v", err)
1733 }
1734 content, err := readProtoBytes(checksummedData, checksummedDataContentField)
1735 if err != nil {
1736 return content, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Content: %v", err)
1737 }
1738
1739 return content, nil
1740 }
1741
1742
1743
1744
1745
1746
1747
1748
1749 func readFullObjectResponse(b []byte) (*storagepb.ReadObjectResponse, error) {
1750 msg := &storagepb.ReadObjectResponse{}
1751
1752
1753
1754
1755 off := 0
1756 for off < len(b) {
1757
1758
1759 fieldNum, fieldType, fieldLength := protowire.ConsumeTag(b[off:])
1760 if fieldLength < 0 {
1761 return nil, protowire.ParseError(fieldLength)
1762 }
1763 off += fieldLength
1764
1765
1766
1767 switch {
1768 case fieldNum == checksummedDataField && fieldType == protowire.BytesType:
1769
1770 msg.ChecksummedData = &storagepb.ChecksummedData{}
1771
1772
1773 fieldContent, n := protowire.ConsumeBytes(b[off:])
1774 if n < 0 {
1775 return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData: %v", protowire.ParseError(n))
1776 }
1777 off += n
1778
1779
1780
1781 contentOff := 0
1782 for contentOff < len(fieldContent) {
1783 gotNum, gotTyp, n := protowire.ConsumeTag(fieldContent[contentOff:])
1784 if n < 0 {
1785 return nil, protowire.ParseError(n)
1786 }
1787 contentOff += n
1788
1789 switch {
1790 case gotNum == checksummedDataContentField && gotTyp == protowire.BytesType:
1791
1792 bytes, n := protowire.ConsumeBytes(fieldContent[contentOff:])
1793 if n < 0 {
1794 return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Content: %v", protowire.ParseError(n))
1795 }
1796 msg.ChecksummedData.Content = bytes
1797 contentOff += n
1798 case gotNum == checksummedDataCRC32CField && gotTyp == protowire.Fixed32Type:
1799 v, n := protowire.ConsumeFixed32(fieldContent[contentOff:])
1800 if n < 0 {
1801 return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Crc32C: %v", protowire.ParseError(n))
1802 }
1803 msg.ChecksummedData.Crc32C = &v
1804 contentOff += n
1805 default:
1806 n = protowire.ConsumeFieldValue(gotNum, gotTyp, fieldContent[contentOff:])
1807 if n < 0 {
1808 return nil, protowire.ParseError(n)
1809 }
1810 contentOff += n
1811 }
1812 }
1813 case fieldNum == objectChecksumsField && fieldType == protowire.BytesType:
1814
1815 msg.ObjectChecksums = &storagepb.ObjectChecksums{}
1816
1817
1818 bytes, n := protowire.ConsumeBytes(b[off:])
1819 if n < 0 {
1820 return nil, fmt.Errorf("invalid ReadObjectResponse.ObjectChecksums: %v", protowire.ParseError(n))
1821 }
1822 off += n
1823
1824
1825 if err := proto.Unmarshal(bytes, msg.ObjectChecksums); err != nil {
1826 return nil, err
1827 }
1828 case fieldNum == contentRangeField && fieldType == protowire.BytesType:
1829 msg.ContentRange = &storagepb.ContentRange{}
1830
1831 bytes, n := protowire.ConsumeBytes(b[off:])
1832 if n < 0 {
1833 return nil, fmt.Errorf("invalid ReadObjectResponse.ContentRange: %v", protowire.ParseError(n))
1834 }
1835 off += n
1836
1837 if err := proto.Unmarshal(bytes, msg.ContentRange); err != nil {
1838 return nil, err
1839 }
1840 case fieldNum == metadataField && fieldType == protowire.BytesType:
1841 msg.Metadata = &storagepb.Object{}
1842
1843 bytes, n := protowire.ConsumeBytes(b[off:])
1844 if n < 0 {
1845 return nil, fmt.Errorf("invalid ReadObjectResponse.Metadata: %v", protowire.ParseError(n))
1846 }
1847 off += n
1848
1849 if err := proto.Unmarshal(bytes, msg.Metadata); err != nil {
1850 return nil, err
1851 }
1852 default:
1853 fieldLength = protowire.ConsumeFieldValue(fieldNum, fieldType, b[off:])
1854 if fieldLength < 0 {
1855 return nil, fmt.Errorf("default: %v", protowire.ParseError(fieldLength))
1856 }
1857 off += fieldLength
1858 }
1859 }
1860
1861 return msg, nil
1862 }
1863
1864
1865
1866
1867
1868
1869
1870
1871 func readProtoBytes(b []byte, num protowire.Number) ([]byte, error) {
1872 off := 0
1873 for off < len(b) {
1874 gotNum, gotTyp, n := protowire.ConsumeTag(b[off:])
1875 if n < 0 {
1876 return nil, protowire.ParseError(n)
1877 }
1878 off += n
1879 if gotNum == num && gotTyp == protowire.BytesType {
1880 b, n := protowire.ConsumeBytes(b[off:])
1881 if n < 0 {
1882 return nil, protowire.ParseError(n)
1883 }
1884 return b, nil
1885 }
1886 n = protowire.ConsumeFieldValue(gotNum, gotTyp, b[off:])
1887 if n < 0 {
1888 return nil, protowire.ParseError(n)
1889 }
1890 off += n
1891 }
1892 return nil, nil
1893 }
1894
1895
1896
1897 func (r *gRPCReader) reopenStream() (*storagepb.ReadObjectResponse, error) {
1898
1899 r.Close()
1900
1901 res, cancel, err := r.reopen(r.seen)
1902 if err != nil {
1903 return nil, err
1904 }
1905 r.stream = res.stream
1906 r.cancel = cancel
1907 return res.response, nil
1908 }
1909
1910 func newGRPCWriter(c *grpcStorageClient, params *openWriterParams, r io.Reader) *gRPCWriter {
1911 size := params.chunkSize
1912
1913
1914 if size%googleapi.MinUploadChunkSize != 0 {
1915 size += googleapi.MinUploadChunkSize - (size % googleapi.MinUploadChunkSize)
1916 }
1917
1918
1919
1920
1921 if params.chunkSize == 0 {
1922 size = googleapi.MinUploadChunkSize
1923 }
1924
1925 return &gRPCWriter{
1926 buf: make([]byte, size),
1927 c: c,
1928 ctx: params.ctx,
1929 reader: r,
1930 bucket: params.bucket,
1931 attrs: params.attrs,
1932 conds: params.conds,
1933 encryptionKey: params.encryptionKey,
1934 sendCRC32C: params.sendCRC32C,
1935 chunkSize: params.chunkSize,
1936 forceEmptyContentType: params.forceEmptyContentType,
1937 }
1938 }
1939
1940
1941
1942 type gRPCWriter struct {
1943 c *grpcStorageClient
1944 buf []byte
1945 reader io.Reader
1946
1947 ctx context.Context
1948
1949 bucket string
1950 attrs *ObjectAttrs
1951 conds *Conditions
1952 encryptionKey []byte
1953 settings *settings
1954
1955 sendCRC32C bool
1956 chunkSize int
1957 forceEmptyContentType bool
1958
1959
1960 stream storagepb.Storage_BidiWriteObjectClient
1961
1962
1963 upid string
1964 }
1965
1966
1967
1968 func (w *gRPCWriter) startResumableUpload() error {
1969 spec, err := w.writeObjectSpec()
1970 if err != nil {
1971 return err
1972 }
1973 req := &storagepb.StartResumableWriteRequest{
1974 WriteObjectSpec: spec,
1975 CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
1976 }
1977
1978
1979
1980 req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
1981 return run(w.ctx, func(ctx context.Context) error {
1982 upres, err := w.c.raw.StartResumableWrite(w.ctx, req)
1983 w.upid = upres.GetUploadId()
1984 return err
1985 }, w.settings.retry, w.settings.idempotent)
1986 }
1987
1988
1989
1990 func (w *gRPCWriter) queryProgress() (int64, error) {
1991 var persistedSize int64
1992 err := run(w.ctx, func(ctx context.Context) error {
1993 q, err := w.c.raw.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{
1994 UploadId: w.upid,
1995 })
1996 persistedSize = q.GetPersistedSize()
1997 return err
1998 }, w.settings.retry, true)
1999
2000
2001 return persistedSize, err
2002 }
2003
2004
2005
2006
2007
2008
2009
2010
2011 func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, error) {
2012 var shouldRetry = ShouldRetry
2013 if w.settings.retry != nil && w.settings.retry.shouldRetry != nil {
2014 shouldRetry = w.settings.retry.shouldRetry
2015 }
2016
2017 var err error
2018 var lastWriteOfEntireObject bool
2019
2020 sent := 0
2021 writeOffset := start
2022
2023 toWrite := w.buf[:recvd]
2024
2025
2026
2027 sendBytes:
2028 for {
2029 bytesNotYetSent := recvd - sent
2030 remainingDataFitsInSingleReq := bytesNotYetSent <= maxPerMessageWriteSize
2031
2032 if remainingDataFitsInSingleReq && doneReading {
2033 lastWriteOfEntireObject = true
2034 }
2035
2036
2037 bytesToSendInCurrReq := maxPerMessageWriteSize
2038 if remainingDataFitsInSingleReq {
2039 bytesToSendInCurrReq = bytesNotYetSent
2040 }
2041
2042
2043 data := toWrite[sent : sent+bytesToSendInCurrReq]
2044
2045 req := &storagepb.BidiWriteObjectRequest{
2046 Data: &storagepb.BidiWriteObjectRequest_ChecksummedData{
2047 ChecksummedData: &storagepb.ChecksummedData{
2048 Content: data,
2049 },
2050 },
2051 WriteOffset: writeOffset,
2052 FinishWrite: lastWriteOfEntireObject,
2053 Flush: remainingDataFitsInSingleReq && !lastWriteOfEntireObject,
2054 StateLookup: remainingDataFitsInSingleReq && !lastWriteOfEntireObject,
2055 }
2056
2057
2058
2059
2060 if w.stream == nil {
2061 hds := []string{"x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket))}
2062 ctx := gax.InsertMetadataIntoOutgoingContext(w.ctx, hds...)
2063
2064 w.stream, err = w.c.raw.BidiWriteObject(ctx)
2065 if err != nil {
2066 return nil, 0, err
2067 }
2068
2069 if w.upid != "" {
2070 req.FirstMessage = &storagepb.BidiWriteObjectRequest_UploadId{UploadId: w.upid}
2071 } else {
2072 spec, err := w.writeObjectSpec()
2073 if err != nil {
2074 return nil, 0, err
2075 }
2076 req.FirstMessage = &storagepb.BidiWriteObjectRequest_WriteObjectSpec{
2077 WriteObjectSpec: spec,
2078 }
2079 req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(w.encryptionKey)
2080
2081
2082
2083
2084 req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
2085 }
2086 }
2087
2088 err = w.stream.Send(req)
2089 if err == io.EOF {
2090
2091
2092
2093
2094
2095
2096
2097 err = nil
2098 for err == nil {
2099 _, err = w.stream.Recv()
2100 }
2101
2102
2103 w.stream = nil
2104
2105
2106
2107
2108 if shouldRetry(err) {
2109
2110 writeOffset, err = w.determineOffset(start)
2111 if err != nil {
2112 return nil, 0, err
2113 }
2114 sent = int(writeOffset) - int(start)
2115
2116
2117
2118 continue sendBytes
2119 }
2120 }
2121 if err != nil {
2122 return nil, 0, err
2123 }
2124
2125
2126
2127 sent += len(data)
2128 writeOffset += int64(len(data))
2129
2130
2131
2132 if recvd-sent > 0 {
2133 continue sendBytes
2134 }
2135
2136
2137
2138
2139 if !lastWriteOfEntireObject && w.chunkSize == 0 {
2140 return nil, writeOffset, nil
2141 }
2142
2143
2144
2145
2146
2147 if !lastWriteOfEntireObject {
2148 resp, err := w.stream.Recv()
2149
2150
2151
2152
2153
2154 if shouldRetry(err) {
2155 writeOffset, err = w.determineOffset(start)
2156 if err != nil {
2157 return nil, 0, err
2158 }
2159 sent = int(writeOffset) - int(start)
2160
2161
2162 w.stream = nil
2163
2164 continue sendBytes
2165 }
2166 if err != nil {
2167 return nil, 0, err
2168 }
2169
2170 if resp.GetPersistedSize() != writeOffset {
2171
2172 writeOffset = resp.GetPersistedSize()
2173 sent = int(writeOffset) - int(start)
2174 continue sendBytes
2175 }
2176 } else {
2177
2178
2179
2180 err = w.stream.CloseSend()
2181 if err != nil {
2182
2183
2184
2185 return nil, 0, err
2186 }
2187
2188
2189
2190
2191 var obj *storagepb.Object
2192 for obj == nil {
2193 resp, err := w.stream.Recv()
2194 if shouldRetry(err) {
2195 writeOffset, err = w.determineOffset(start)
2196 if err != nil {
2197 return nil, 0, err
2198 }
2199 sent = int(writeOffset) - int(start)
2200 w.stream = nil
2201 continue sendBytes
2202 }
2203 if err != nil {
2204 return nil, 0, err
2205 }
2206
2207 obj = resp.GetResource()
2208 }
2209
2210
2211
2212
2213
2214 for err == nil {
2215 _, err = w.stream.Recv()
2216 }
2217
2218 return obj, writeOffset, nil
2219 }
2220
2221 return nil, writeOffset, nil
2222 }
2223 }
2224
2225
2226
2227
2228 func (w *gRPCWriter) determineOffset(offset int64) (int64, error) {
2229
2230
2231 if w.upid != "" {
2232 committed, err := w.queryProgress()
2233 if err != nil {
2234 return 0, err
2235 }
2236 offset = committed
2237 }
2238 return offset, nil
2239 }
2240
2241
2242
2243 func (w *gRPCWriter) writeObjectSpec() (*storagepb.WriteObjectSpec, error) {
2244
2245
2246
2247 attrs := *w.attrs
2248
2249 spec := &storagepb.WriteObjectSpec{
2250 Resource: attrs.toProtoObject(w.bucket),
2251 }
2252
2253 if err := applyCondsProto("WriteObject", defaultGen, w.conds, spec); err != nil {
2254 return nil, err
2255 }
2256 return spec, nil
2257 }
2258
2259
2260
2261
2262
2263 func (w *gRPCWriter) read() (int, bool, error) {
2264 if w.attrs.ContentType == "" && !w.forceEmptyContentType {
2265 w.reader, w.attrs.ContentType = gax.DetermineContentType(w.reader)
2266 }
2267
2268 var n, recvd int = -1, 0
2269 var err error
2270 for err == nil && n != 0 {
2271
2272 n, err = w.reader.Read(w.buf[recvd:])
2273 recvd += n
2274 }
2275 var done bool
2276 if err == io.EOF {
2277 done = true
2278 err = nil
2279 }
2280 return recvd, done, err
2281 }
2282
2283 func checkCanceled(err error) error {
2284 if status.Code(err) == codes.Canceled {
2285 return context.Canceled
2286 }
2287
2288 return err
2289 }
2290
View as plain text