...

Source file src/cloud.google.com/go/storage/grpc_client.go

Documentation: cloud.google.com/go/storage

     1  // Copyright 2022 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package storage
    16  
    17  import (
    18  	"context"
    19  	"encoding/base64"
    20  	"errors"
    21  	"fmt"
    22  	"hash/crc32"
    23  	"io"
    24  	"net/url"
    25  	"os"
    26  
    27  	"cloud.google.com/go/iam/apiv1/iampb"
    28  	"cloud.google.com/go/internal/trace"
    29  	gapic "cloud.google.com/go/storage/internal/apiv2"
    30  	"cloud.google.com/go/storage/internal/apiv2/storagepb"
    31  	"github.com/googleapis/gax-go/v2"
    32  	"google.golang.org/api/googleapi"
    33  	"google.golang.org/api/iterator"
    34  	"google.golang.org/api/option"
    35  	"google.golang.org/api/option/internaloption"
    36  	"google.golang.org/grpc"
    37  	"google.golang.org/grpc/codes"
    38  	"google.golang.org/grpc/encoding"
    39  	"google.golang.org/grpc/metadata"
    40  	"google.golang.org/grpc/status"
    41  	"google.golang.org/protobuf/encoding/protowire"
    42  	"google.golang.org/protobuf/proto"
    43  	fieldmaskpb "google.golang.org/protobuf/types/known/fieldmaskpb"
    44  )
    45  
    46  const (
    47  	// defaultConnPoolSize is the default number of channels
    48  	// to initialize in the GAPIC gRPC connection pool. A larger
    49  	// connection pool may be necessary for jobs that require
    50  	// high throughput and/or leverage many concurrent streams
    51  	// if not running via DirectPath.
    52  	//
    53  	// This is only used for the gRPC client.
    54  	defaultConnPoolSize = 1
    55  
    56  	// maxPerMessageWriteSize is the maximum amount of content that can be sent
    57  	// per WriteObjectRequest message. A buffer reaching this amount will
    58  	// precipitate a flush of the buffer. It is only used by the gRPC Writer
    59  	// implementation.
    60  	maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES)
    61  
    62  	// globalProjectAlias is the project ID alias used for global buckets.
    63  	//
    64  	// This is only used for the gRPC API.
    65  	globalProjectAlias = "_"
    66  
    67  	// msgEntityNotSupported indicates ACL entites using project ID are not currently supported.
    68  	//
    69  	// This is only used for the gRPC API.
    70  	msgEntityNotSupported = "The gRPC API currently does not support ACL entities using project ID, use project numbers instead"
    71  )
    72  
    73  // defaultGRPCOptions returns a set of the default client options
    74  // for gRPC client initialization.
    75  func defaultGRPCOptions() []option.ClientOption {
    76  	defaults := []option.ClientOption{
    77  		option.WithGRPCConnectionPool(defaultConnPoolSize),
    78  	}
    79  
    80  	// Set emulator options for gRPC if an emulator was specified. Note that in a
    81  	// hybrid client, STORAGE_EMULATOR_HOST will set the host to use for HTTP and
    82  	// STORAGE_EMULATOR_HOST_GRPC will set the host to use for gRPC (when using a
    83  	// local emulator, HTTP and gRPC must use different ports, so this is
    84  	// necessary).
    85  	//
    86  	// TODO: When the newHybridClient is not longer used, remove
    87  	// STORAGE_EMULATOR_HOST_GRPC and use STORAGE_EMULATOR_HOST for both the
    88  	// HTTP and gRPC based clients.
    89  	if host := os.Getenv("STORAGE_EMULATOR_HOST_GRPC"); host != "" {
    90  		// Strip the scheme from the emulator host. WithEndpoint does not take a
    91  		// scheme for gRPC.
    92  		host = stripScheme(host)
    93  
    94  		defaults = append(defaults,
    95  			option.WithEndpoint(host),
    96  			option.WithGRPCDialOption(grpc.WithInsecure()),
    97  			option.WithoutAuthentication(),
    98  		)
    99  	} else {
   100  		// Only enable DirectPath when the emulator is not being targeted.
   101  		defaults = append(defaults, internaloption.EnableDirectPath(true))
   102  	}
   103  
   104  	return defaults
   105  }
   106  
   107  // grpcStorageClient is the gRPC API implementation of the transport-agnostic
   108  // storageClient interface.
   109  type grpcStorageClient struct {
   110  	raw      *gapic.Client
   111  	settings *settings
   112  }
   113  
   114  // newGRPCStorageClient initializes a new storageClient that uses the gRPC
   115  // Storage API.
   116  func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageClient, error) {
   117  	s := initSettings(opts...)
   118  	s.clientOption = append(defaultGRPCOptions(), s.clientOption...)
   119  	// Disable all gax-level retries in favor of retry logic in the veneer client.
   120  	s.gax = append(s.gax, gax.WithRetry(nil))
   121  
   122  	config := newStorageConfig(s.clientOption...)
   123  	if config.readAPIWasSet {
   124  		return nil, errors.New("storage: GRPC is incompatible with any option that specifies an API for reads")
   125  	}
   126  
   127  	g, err := gapic.NewClient(ctx, s.clientOption...)
   128  	if err != nil {
   129  		return nil, err
   130  	}
   131  
   132  	return &grpcStorageClient{
   133  		raw:      g,
   134  		settings: s,
   135  	}, nil
   136  }
   137  
   138  func (c *grpcStorageClient) Close() error {
   139  	return c.raw.Close()
   140  }
   141  
   142  // Top-level methods.
   143  
   144  func (c *grpcStorageClient) GetServiceAccount(ctx context.Context, project string, opts ...storageOption) (string, error) {
   145  	s := callSettings(c.settings, opts...)
   146  	req := &storagepb.GetServiceAccountRequest{
   147  		Project: toProjectResource(project),
   148  	}
   149  	var resp *storagepb.ServiceAccount
   150  	err := run(ctx, func(ctx context.Context) error {
   151  		var err error
   152  		resp, err = c.raw.GetServiceAccount(ctx, req, s.gax...)
   153  		return err
   154  	}, s.retry, s.idempotent)
   155  	if err != nil {
   156  		return "", err
   157  	}
   158  	return resp.EmailAddress, err
   159  }
   160  
   161  func (c *grpcStorageClient) CreateBucket(ctx context.Context, project, bucket string, attrs *BucketAttrs, enableObjectRetention *bool, opts ...storageOption) (*BucketAttrs, error) {
   162  	if enableObjectRetention != nil {
   163  		// TO-DO: implement ObjectRetention once available - see b/308194853
   164  		return nil, status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC")
   165  	}
   166  
   167  	s := callSettings(c.settings, opts...)
   168  	b := attrs.toProtoBucket()
   169  	b.Project = toProjectResource(project)
   170  	// If there is lifecycle information but no location, explicitly set
   171  	// the location. This is a GCS quirk/bug.
   172  	if b.GetLocation() == "" && b.GetLifecycle() != nil {
   173  		b.Location = "US"
   174  	}
   175  
   176  	req := &storagepb.CreateBucketRequest{
   177  		Parent:   fmt.Sprintf("projects/%s", globalProjectAlias),
   178  		Bucket:   b,
   179  		BucketId: bucket,
   180  	}
   181  	if attrs != nil {
   182  		req.PredefinedAcl = attrs.PredefinedACL
   183  		req.PredefinedDefaultObjectAcl = attrs.PredefinedDefaultObjectACL
   184  	}
   185  
   186  	var battrs *BucketAttrs
   187  	err := run(ctx, func(ctx context.Context) error {
   188  		res, err := c.raw.CreateBucket(ctx, req, s.gax...)
   189  
   190  		battrs = newBucketFromProto(res)
   191  
   192  		return err
   193  	}, s.retry, s.idempotent)
   194  
   195  	return battrs, err
   196  }
   197  
   198  func (c *grpcStorageClient) ListBuckets(ctx context.Context, project string, opts ...storageOption) *BucketIterator {
   199  	s := callSettings(c.settings, opts...)
   200  	it := &BucketIterator{
   201  		ctx:       ctx,
   202  		projectID: project,
   203  	}
   204  
   205  	var gitr *gapic.BucketIterator
   206  	fetch := func(pageSize int, pageToken string) (token string, err error) {
   207  
   208  		var buckets []*storagepb.Bucket
   209  		var next string
   210  		err = run(it.ctx, func(ctx context.Context) error {
   211  			// Initialize GAPIC-based iterator when pageToken is empty, which
   212  			// indicates that this fetch call is attempting to get the first page.
   213  			//
   214  			// Note: Initializing the GAPIC-based iterator lazily is necessary to
   215  			// capture the BucketIterator.Prefix set by the user *after* the
   216  			// BucketIterator is returned to them from the veneer.
   217  			if pageToken == "" {
   218  				req := &storagepb.ListBucketsRequest{
   219  					Parent: toProjectResource(it.projectID),
   220  					Prefix: it.Prefix,
   221  				}
   222  				gitr = c.raw.ListBuckets(ctx, req, s.gax...)
   223  			}
   224  			buckets, next, err = gitr.InternalFetch(pageSize, pageToken)
   225  			return err
   226  		}, s.retry, s.idempotent)
   227  		if err != nil {
   228  			return "", err
   229  		}
   230  
   231  		for _, bkt := range buckets {
   232  			b := newBucketFromProto(bkt)
   233  			it.buckets = append(it.buckets, b)
   234  		}
   235  
   236  		return next, nil
   237  	}
   238  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
   239  		fetch,
   240  		func() int { return len(it.buckets) },
   241  		func() interface{} { b := it.buckets; it.buckets = nil; return b })
   242  
   243  	return it
   244  }
   245  
   246  // Bucket methods.
   247  
   248  func (c *grpcStorageClient) DeleteBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
   249  	s := callSettings(c.settings, opts...)
   250  	req := &storagepb.DeleteBucketRequest{
   251  		Name: bucketResourceName(globalProjectAlias, bucket),
   252  	}
   253  	if err := applyBucketCondsProto("grpcStorageClient.DeleteBucket", conds, req); err != nil {
   254  		return err
   255  	}
   256  	if s.userProject != "" {
   257  		ctx = setUserProjectMetadata(ctx, s.userProject)
   258  	}
   259  
   260  	return run(ctx, func(ctx context.Context) error {
   261  		return c.raw.DeleteBucket(ctx, req, s.gax...)
   262  	}, s.retry, s.idempotent)
   263  }
   264  
   265  func (c *grpcStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
   266  	s := callSettings(c.settings, opts...)
   267  	req := &storagepb.GetBucketRequest{
   268  		Name:     bucketResourceName(globalProjectAlias, bucket),
   269  		ReadMask: &fieldmaskpb.FieldMask{Paths: []string{"*"}},
   270  	}
   271  	if err := applyBucketCondsProto("grpcStorageClient.GetBucket", conds, req); err != nil {
   272  		return nil, err
   273  	}
   274  	if s.userProject != "" {
   275  		ctx = setUserProjectMetadata(ctx, s.userProject)
   276  	}
   277  
   278  	var battrs *BucketAttrs
   279  	err := run(ctx, func(ctx context.Context) error {
   280  		res, err := c.raw.GetBucket(ctx, req, s.gax...)
   281  
   282  		battrs = newBucketFromProto(res)
   283  
   284  		return err
   285  	}, s.retry, s.idempotent)
   286  
   287  	if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
   288  		return nil, ErrBucketNotExist
   289  	}
   290  
   291  	return battrs, err
   292  }
   293  func (c *grpcStorageClient) UpdateBucket(ctx context.Context, bucket string, uattrs *BucketAttrsToUpdate, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
   294  	s := callSettings(c.settings, opts...)
   295  	b := uattrs.toProtoBucket()
   296  	b.Name = bucketResourceName(globalProjectAlias, bucket)
   297  	req := &storagepb.UpdateBucketRequest{
   298  		Bucket:                     b,
   299  		PredefinedAcl:              uattrs.PredefinedACL,
   300  		PredefinedDefaultObjectAcl: uattrs.PredefinedDefaultObjectACL,
   301  	}
   302  	if err := applyBucketCondsProto("grpcStorageClient.UpdateBucket", conds, req); err != nil {
   303  		return nil, err
   304  	}
   305  	if s.userProject != "" {
   306  		ctx = setUserProjectMetadata(ctx, s.userProject)
   307  	}
   308  
   309  	var paths []string
   310  	fieldMask := &fieldmaskpb.FieldMask{
   311  		Paths: paths,
   312  	}
   313  	if uattrs.CORS != nil {
   314  		fieldMask.Paths = append(fieldMask.Paths, "cors")
   315  	}
   316  	if uattrs.DefaultEventBasedHold != nil {
   317  		fieldMask.Paths = append(fieldMask.Paths, "default_event_based_hold")
   318  	}
   319  	if uattrs.RetentionPolicy != nil {
   320  		fieldMask.Paths = append(fieldMask.Paths, "retention_policy")
   321  	}
   322  	if uattrs.VersioningEnabled != nil {
   323  		fieldMask.Paths = append(fieldMask.Paths, "versioning")
   324  	}
   325  	if uattrs.RequesterPays != nil {
   326  		fieldMask.Paths = append(fieldMask.Paths, "billing")
   327  	}
   328  	if uattrs.BucketPolicyOnly != nil || uattrs.UniformBucketLevelAccess != nil || uattrs.PublicAccessPrevention != PublicAccessPreventionUnknown {
   329  		fieldMask.Paths = append(fieldMask.Paths, "iam_config")
   330  	}
   331  	if uattrs.Encryption != nil {
   332  		fieldMask.Paths = append(fieldMask.Paths, "encryption")
   333  	}
   334  	if uattrs.Lifecycle != nil {
   335  		fieldMask.Paths = append(fieldMask.Paths, "lifecycle")
   336  	}
   337  	if uattrs.Logging != nil {
   338  		fieldMask.Paths = append(fieldMask.Paths, "logging")
   339  	}
   340  	if uattrs.Website != nil {
   341  		fieldMask.Paths = append(fieldMask.Paths, "website")
   342  	}
   343  	if uattrs.PredefinedACL != "" {
   344  		// In cases where PredefinedACL is set, Acl is cleared.
   345  		fieldMask.Paths = append(fieldMask.Paths, "acl")
   346  	}
   347  	if uattrs.PredefinedDefaultObjectACL != "" {
   348  		// In cases where PredefinedDefaultObjectACL is set, DefaultObjectAcl is cleared.
   349  		fieldMask.Paths = append(fieldMask.Paths, "default_object_acl")
   350  	}
   351  	// Note: This API currently does not support entites using project ID.
   352  	// Use project numbers in ACL entities. Pending b/233617896.
   353  	if uattrs.acl != nil {
   354  		// In cases where acl is set by UpdateBucketACL method.
   355  		fieldMask.Paths = append(fieldMask.Paths, "acl")
   356  	}
   357  	if uattrs.defaultObjectACL != nil {
   358  		// In cases where defaultObjectACL is set by UpdateBucketACL method.
   359  		fieldMask.Paths = append(fieldMask.Paths, "default_object_acl")
   360  	}
   361  	if uattrs.StorageClass != "" {
   362  		fieldMask.Paths = append(fieldMask.Paths, "storage_class")
   363  	}
   364  	if uattrs.RPO != RPOUnknown {
   365  		fieldMask.Paths = append(fieldMask.Paths, "rpo")
   366  	}
   367  	if uattrs.Autoclass != nil {
   368  		fieldMask.Paths = append(fieldMask.Paths, "autoclass")
   369  	}
   370  	if uattrs.SoftDeletePolicy != nil {
   371  		fieldMask.Paths = append(fieldMask.Paths, "soft_delete_policy")
   372  	}
   373  
   374  	for label := range uattrs.setLabels {
   375  		fieldMask.Paths = append(fieldMask.Paths, fmt.Sprintf("labels.%s", label))
   376  	}
   377  
   378  	// Delete a label by not including it in Bucket.Labels but adding the key to the update mask.
   379  	for label := range uattrs.deleteLabels {
   380  		fieldMask.Paths = append(fieldMask.Paths, fmt.Sprintf("labels.%s", label))
   381  	}
   382  
   383  	req.UpdateMask = fieldMask
   384  
   385  	if len(fieldMask.Paths) < 1 {
   386  		// Nothing to update. Send a get request for current attrs instead. This
   387  		// maintains consistency with JSON bucket updates.
   388  		opts = append(opts, idempotent(true))
   389  		return c.GetBucket(ctx, bucket, conds, opts...)
   390  	}
   391  
   392  	var battrs *BucketAttrs
   393  	err := run(ctx, func(ctx context.Context) error {
   394  		res, err := c.raw.UpdateBucket(ctx, req, s.gax...)
   395  		battrs = newBucketFromProto(res)
   396  		return err
   397  	}, s.retry, s.idempotent)
   398  
   399  	return battrs, err
   400  }
   401  func (c *grpcStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
   402  	s := callSettings(c.settings, opts...)
   403  	req := &storagepb.LockBucketRetentionPolicyRequest{
   404  		Bucket: bucketResourceName(globalProjectAlias, bucket),
   405  	}
   406  	if err := applyBucketCondsProto("grpcStorageClient.LockBucketRetentionPolicy", conds, req); err != nil {
   407  		return err
   408  	}
   409  
   410  	return run(ctx, func(ctx context.Context) error {
   411  		_, err := c.raw.LockBucketRetentionPolicy(ctx, req, s.gax...)
   412  		return err
   413  	}, s.retry, s.idempotent)
   414  
   415  }
   416  func (c *grpcStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) *ObjectIterator {
   417  	s := callSettings(c.settings, opts...)
   418  	it := &ObjectIterator{
   419  		ctx: ctx,
   420  	}
   421  	if q != nil {
   422  		it.query = *q
   423  	}
   424  	req := &storagepb.ListObjectsRequest{
   425  		Parent:                   bucketResourceName(globalProjectAlias, bucket),
   426  		Prefix:                   it.query.Prefix,
   427  		Delimiter:                it.query.Delimiter,
   428  		Versions:                 it.query.Versions,
   429  		LexicographicStart:       it.query.StartOffset,
   430  		LexicographicEnd:         it.query.EndOffset,
   431  		IncludeTrailingDelimiter: it.query.IncludeTrailingDelimiter,
   432  		MatchGlob:                it.query.MatchGlob,
   433  		ReadMask:                 q.toFieldMask(), // a nil Query still results in a "*" FieldMask
   434  		SoftDeleted:              it.query.SoftDeleted,
   435  	}
   436  	if s.userProject != "" {
   437  		ctx = setUserProjectMetadata(ctx, s.userProject)
   438  	}
   439  	fetch := func(pageSize int, pageToken string) (token string, err error) {
   440  		// IncludeFoldersAsPrefixes is not supported for gRPC
   441  		// TODO: remove this when support is added in the proto.
   442  		if it.query.IncludeFoldersAsPrefixes {
   443  			return "", status.Errorf(codes.Unimplemented, "storage: IncludeFoldersAsPrefixes is not supported in gRPC")
   444  		}
   445  		var objects []*storagepb.Object
   446  		var gitr *gapic.ObjectIterator
   447  		err = run(it.ctx, func(ctx context.Context) error {
   448  			gitr = c.raw.ListObjects(ctx, req, s.gax...)
   449  			it.ctx = ctx
   450  			objects, token, err = gitr.InternalFetch(pageSize, pageToken)
   451  			return err
   452  		}, s.retry, s.idempotent)
   453  		if err != nil {
   454  			if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
   455  				err = ErrBucketNotExist
   456  			}
   457  			return "", err
   458  		}
   459  
   460  		for _, obj := range objects {
   461  			b := newObjectFromProto(obj)
   462  			it.items = append(it.items, b)
   463  		}
   464  
   465  		// Response is always non-nil after a successful request.
   466  		res := gitr.Response.(*storagepb.ListObjectsResponse)
   467  		for _, prefix := range res.GetPrefixes() {
   468  			it.items = append(it.items, &ObjectAttrs{Prefix: prefix})
   469  		}
   470  
   471  		return token, nil
   472  	}
   473  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
   474  		fetch,
   475  		func() int { return len(it.items) },
   476  		func() interface{} { b := it.items; it.items = nil; return b })
   477  
   478  	return it
   479  }
   480  
   481  // Object metadata methods.
   482  
   483  func (c *grpcStorageClient) DeleteObject(ctx context.Context, bucket, object string, gen int64, conds *Conditions, opts ...storageOption) error {
   484  	s := callSettings(c.settings, opts...)
   485  	req := &storagepb.DeleteObjectRequest{
   486  		Bucket: bucketResourceName(globalProjectAlias, bucket),
   487  		Object: object,
   488  	}
   489  	if err := applyCondsProto("grpcStorageClient.DeleteObject", gen, conds, req); err != nil {
   490  		return err
   491  	}
   492  	if s.userProject != "" {
   493  		ctx = setUserProjectMetadata(ctx, s.userProject)
   494  	}
   495  	err := run(ctx, func(ctx context.Context) error {
   496  		return c.raw.DeleteObject(ctx, req, s.gax...)
   497  	}, s.retry, s.idempotent)
   498  	if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
   499  		return ErrObjectNotExist
   500  	}
   501  	return err
   502  }
   503  
   504  func (c *grpcStorageClient) GetObject(ctx context.Context, params *getObjectParams, opts ...storageOption) (*ObjectAttrs, error) {
   505  	s := callSettings(c.settings, opts...)
   506  	req := &storagepb.GetObjectRequest{
   507  		Bucket: bucketResourceName(globalProjectAlias, params.bucket),
   508  		Object: params.object,
   509  		// ProjectionFull by default.
   510  		ReadMask: &fieldmaskpb.FieldMask{Paths: []string{"*"}},
   511  	}
   512  	if err := applyCondsProto("grpcStorageClient.GetObject", params.gen, params.conds, req); err != nil {
   513  		return nil, err
   514  	}
   515  	if s.userProject != "" {
   516  		ctx = setUserProjectMetadata(ctx, s.userProject)
   517  	}
   518  	if params.encryptionKey != nil {
   519  		req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(params.encryptionKey)
   520  	}
   521  	if params.softDeleted {
   522  		req.SoftDeleted = &params.softDeleted
   523  	}
   524  
   525  	var attrs *ObjectAttrs
   526  	err := run(ctx, func(ctx context.Context) error {
   527  		res, err := c.raw.GetObject(ctx, req, s.gax...)
   528  		attrs = newObjectFromProto(res)
   529  
   530  		return err
   531  	}, s.retry, s.idempotent)
   532  
   533  	if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
   534  		return nil, ErrObjectNotExist
   535  	}
   536  
   537  	return attrs, err
   538  }
   539  
   540  func (c *grpcStorageClient) UpdateObject(ctx context.Context, params *updateObjectParams, opts ...storageOption) (*ObjectAttrs, error) {
   541  	uattrs := params.uattrs
   542  	if params.overrideRetention != nil || uattrs.Retention != nil {
   543  		// TO-DO: implement ObjectRetention once available - see b/308194853
   544  		return nil, status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC")
   545  	}
   546  	s := callSettings(c.settings, opts...)
   547  	o := uattrs.toProtoObject(bucketResourceName(globalProjectAlias, params.bucket), params.object)
   548  	// For Update, generation is passed via the object message rather than a field on the request.
   549  	if params.gen >= 0 {
   550  		o.Generation = params.gen
   551  	}
   552  	req := &storagepb.UpdateObjectRequest{
   553  		Object:        o,
   554  		PredefinedAcl: uattrs.PredefinedACL,
   555  	}
   556  	if err := applyCondsProto("grpcStorageClient.UpdateObject", defaultGen, params.conds, req); err != nil {
   557  		return nil, err
   558  	}
   559  	if s.userProject != "" {
   560  		ctx = setUserProjectMetadata(ctx, s.userProject)
   561  	}
   562  	if params.encryptionKey != nil {
   563  		req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(params.encryptionKey)
   564  	}
   565  
   566  	fieldMask := &fieldmaskpb.FieldMask{Paths: nil}
   567  	if uattrs.EventBasedHold != nil {
   568  		fieldMask.Paths = append(fieldMask.Paths, "event_based_hold")
   569  	}
   570  	if uattrs.TemporaryHold != nil {
   571  		fieldMask.Paths = append(fieldMask.Paths, "temporary_hold")
   572  	}
   573  	if uattrs.ContentType != nil {
   574  		fieldMask.Paths = append(fieldMask.Paths, "content_type")
   575  	}
   576  	if uattrs.ContentLanguage != nil {
   577  		fieldMask.Paths = append(fieldMask.Paths, "content_language")
   578  	}
   579  	if uattrs.ContentEncoding != nil {
   580  		fieldMask.Paths = append(fieldMask.Paths, "content_encoding")
   581  	}
   582  	if uattrs.ContentDisposition != nil {
   583  		fieldMask.Paths = append(fieldMask.Paths, "content_disposition")
   584  	}
   585  	if uattrs.CacheControl != nil {
   586  		fieldMask.Paths = append(fieldMask.Paths, "cache_control")
   587  	}
   588  	if !uattrs.CustomTime.IsZero() {
   589  		fieldMask.Paths = append(fieldMask.Paths, "custom_time")
   590  	}
   591  	// Note: This API currently does not support entites using project ID.
   592  	// Use project numbers in ACL entities. Pending b/233617896.
   593  	if uattrs.ACL != nil || len(uattrs.PredefinedACL) > 0 {
   594  		fieldMask.Paths = append(fieldMask.Paths, "acl")
   595  	}
   596  
   597  	if uattrs.Metadata != nil {
   598  		// We don't support deleting a specific metadata key; metadata is deleted
   599  		// as a whole if provided an empty map, so we do not use dot notation here
   600  		if len(uattrs.Metadata) == 0 {
   601  			fieldMask.Paths = append(fieldMask.Paths, "metadata")
   602  		} else {
   603  			// We can, however, use dot notation for adding keys
   604  			for key := range uattrs.Metadata {
   605  				fieldMask.Paths = append(fieldMask.Paths, fmt.Sprintf("metadata.%s", key))
   606  			}
   607  		}
   608  	}
   609  
   610  	req.UpdateMask = fieldMask
   611  
   612  	if len(fieldMask.Paths) < 1 {
   613  		// Nothing to update. To maintain consistency with JSON, we must still
   614  		// update the object because metageneration and other fields are
   615  		// updated even on an empty update.
   616  		// gRPC will fail if the fieldmask is empty, so instead we add an
   617  		// output-only field to the update mask. Output-only fields are (and must
   618  		// be - see AIP 161) ignored, but allow us to send an empty update because
   619  		// any mask that is valid for read (as this one is) must be valid for write.
   620  		fieldMask.Paths = append(fieldMask.Paths, "create_time")
   621  	}
   622  
   623  	var attrs *ObjectAttrs
   624  	err := run(ctx, func(ctx context.Context) error {
   625  		res, err := c.raw.UpdateObject(ctx, req, s.gax...)
   626  		attrs = newObjectFromProto(res)
   627  		return err
   628  	}, s.retry, s.idempotent)
   629  	if e, ok := status.FromError(err); ok && e.Code() == codes.NotFound {
   630  		return nil, ErrObjectNotExist
   631  	}
   632  
   633  	return attrs, err
   634  }
   635  
   636  func (c *grpcStorageClient) RestoreObject(ctx context.Context, params *restoreObjectParams, opts ...storageOption) (*ObjectAttrs, error) {
   637  	s := callSettings(c.settings, opts...)
   638  	req := &storagepb.RestoreObjectRequest{
   639  		Bucket:        bucketResourceName(globalProjectAlias, params.bucket),
   640  		Object:        params.object,
   641  		CopySourceAcl: &params.copySourceACL,
   642  	}
   643  	if err := applyCondsProto("grpcStorageClient.RestoreObject", params.gen, params.conds, req); err != nil {
   644  		return nil, err
   645  	}
   646  	if s.userProject != "" {
   647  		ctx = setUserProjectMetadata(ctx, s.userProject)
   648  	}
   649  
   650  	var attrs *ObjectAttrs
   651  	err := run(ctx, func(ctx context.Context) error {
   652  		res, err := c.raw.RestoreObject(ctx, req, s.gax...)
   653  		attrs = newObjectFromProto(res)
   654  		return err
   655  	}, s.retry, s.idempotent)
   656  	if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
   657  		return nil, ErrObjectNotExist
   658  	}
   659  	return attrs, err
   660  }
   661  
   662  // Default Object ACL methods.
   663  
   664  func (c *grpcStorageClient) DeleteDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
   665  	// There is no separate API for PATCH in gRPC.
   666  	// Make a GET call first to retrieve BucketAttrs.
   667  	attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
   668  	if err != nil {
   669  		return err
   670  	}
   671  	// Delete the entity and copy other remaining ACL entities.
   672  	// Note: This API currently does not support entites using project ID.
   673  	// Use project numbers in ACL entities. Pending b/233617896.
   674  	// Return error if entity is not found or a project ID is used.
   675  	invalidEntity := true
   676  	var acl []ACLRule
   677  	for _, a := range attrs.DefaultObjectACL {
   678  		if a.Entity != entity {
   679  			acl = append(acl, a)
   680  		}
   681  		if a.Entity == entity {
   682  			invalidEntity = false
   683  		}
   684  	}
   685  	if invalidEntity {
   686  		return fmt.Errorf("storage: entity %v was not found on bucket %v, got %v. %v", entity, bucket, attrs.DefaultObjectACL, msgEntityNotSupported)
   687  	}
   688  	uattrs := &BucketAttrsToUpdate{defaultObjectACL: acl}
   689  	// Call UpdateBucket with a MetagenerationMatch precondition set.
   690  	if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
   691  		return err
   692  	}
   693  	return nil
   694  }
   695  
   696  func (c *grpcStorageClient) ListDefaultObjectACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
   697  	attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
   698  	if err != nil {
   699  		return nil, err
   700  	}
   701  	return attrs.DefaultObjectACL, nil
   702  }
   703  
   704  func (c *grpcStorageClient) UpdateDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
   705  	// There is no separate API for PATCH in gRPC.
   706  	// Make a GET call first to retrieve BucketAttrs.
   707  	attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
   708  	if err != nil {
   709  		return err
   710  	}
   711  	// Note: This API currently does not support entites using project ID.
   712  	// Use project numbers in ACL entities. Pending b/233617896.
   713  	var acl []ACLRule
   714  	aclRule := ACLRule{Entity: entity, Role: role}
   715  	acl = append(attrs.DefaultObjectACL, aclRule)
   716  	uattrs := &BucketAttrsToUpdate{defaultObjectACL: acl}
   717  	// Call UpdateBucket with a MetagenerationMatch precondition set.
   718  	if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
   719  		return err
   720  	}
   721  	return nil
   722  }
   723  
   724  // Bucket ACL methods.
   725  
   726  func (c *grpcStorageClient) DeleteBucketACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
   727  	// There is no separate API for PATCH in gRPC.
   728  	// Make a GET call first to retrieve BucketAttrs.
   729  	attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
   730  	if err != nil {
   731  		return err
   732  	}
   733  	// Delete the entity and copy other remaining ACL entities.
   734  	// Note: This API currently does not support entites using project ID.
   735  	// Use project numbers in ACL entities. Pending b/233617896.
   736  	// Return error if entity is not found or a project ID is used.
   737  	invalidEntity := true
   738  	var acl []ACLRule
   739  	for _, a := range attrs.ACL {
   740  		if a.Entity != entity {
   741  			acl = append(acl, a)
   742  		}
   743  		if a.Entity == entity {
   744  			invalidEntity = false
   745  		}
   746  	}
   747  	if invalidEntity {
   748  		return fmt.Errorf("storage: entity %v was not found on bucket %v, got %v. %v", entity, bucket, attrs.ACL, msgEntityNotSupported)
   749  	}
   750  	uattrs := &BucketAttrsToUpdate{acl: acl}
   751  	// Call UpdateBucket with a MetagenerationMatch precondition set.
   752  	if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
   753  		return err
   754  	}
   755  	return nil
   756  }
   757  
   758  func (c *grpcStorageClient) ListBucketACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
   759  	attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
   760  	if err != nil {
   761  		return nil, err
   762  	}
   763  	return attrs.ACL, nil
   764  }
   765  
   766  func (c *grpcStorageClient) UpdateBucketACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
   767  	// There is no separate API for PATCH in gRPC.
   768  	// Make a GET call first to retrieve BucketAttrs.
   769  	attrs, err := c.GetBucket(ctx, bucket, nil, opts...)
   770  	if err != nil {
   771  		return err
   772  	}
   773  	// Note: This API currently does not support entites using project ID.
   774  	// Use project numbers in ACL entities. Pending b/233617896.
   775  	var acl []ACLRule
   776  	aclRule := ACLRule{Entity: entity, Role: role}
   777  	acl = append(attrs.ACL, aclRule)
   778  	uattrs := &BucketAttrsToUpdate{acl: acl}
   779  	// Call UpdateBucket with a MetagenerationMatch precondition set.
   780  	if _, err = c.UpdateBucket(ctx, bucket, uattrs, &BucketConditions{MetagenerationMatch: attrs.MetaGeneration}, opts...); err != nil {
   781  		return err
   782  	}
   783  	return nil
   784  }
   785  
   786  // Object ACL methods.
   787  
   788  func (c *grpcStorageClient) DeleteObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, opts ...storageOption) error {
   789  	// There is no separate API for PATCH in gRPC.
   790  	// Make a GET call first to retrieve ObjectAttrs.
   791  	attrs, err := c.GetObject(ctx, &getObjectParams{bucket, object, defaultGen, nil, nil, false}, opts...)
   792  	if err != nil {
   793  		return err
   794  	}
   795  	// Delete the entity and copy other remaining ACL entities.
   796  	// Note: This API currently does not support entites using project ID.
   797  	// Use project numbers in ACL entities. Pending b/233617896.
   798  	// Return error if entity is not found or a project ID is used.
   799  	invalidEntity := true
   800  	var acl []ACLRule
   801  	for _, a := range attrs.ACL {
   802  		if a.Entity != entity {
   803  			acl = append(acl, a)
   804  		}
   805  		if a.Entity == entity {
   806  			invalidEntity = false
   807  		}
   808  	}
   809  	if invalidEntity {
   810  		return fmt.Errorf("storage: entity %v was not found on bucket %v, got %v. %v", entity, bucket, attrs.ACL, msgEntityNotSupported)
   811  	}
   812  	uattrs := &ObjectAttrsToUpdate{ACL: acl}
   813  	// Call UpdateObject with the specified metageneration.
   814  	params := &updateObjectParams{bucket: bucket, object: object, uattrs: uattrs, gen: defaultGen, conds: &Conditions{MetagenerationMatch: attrs.Metageneration}}
   815  	if _, err = c.UpdateObject(ctx, params, opts...); err != nil {
   816  		return err
   817  	}
   818  	return nil
   819  }
   820  
   821  // ListObjectACLs retrieves object ACL entries. By default, it operates on the latest generation of this object.
   822  // Selecting a specific generation of this object is not currently supported by the client.
   823  func (c *grpcStorageClient) ListObjectACLs(ctx context.Context, bucket, object string, opts ...storageOption) ([]ACLRule, error) {
   824  	o, err := c.GetObject(ctx, &getObjectParams{bucket, object, defaultGen, nil, nil, false}, opts...)
   825  	if err != nil {
   826  		return nil, err
   827  	}
   828  	return o.ACL, nil
   829  }
   830  
   831  func (c *grpcStorageClient) UpdateObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
   832  	// There is no separate API for PATCH in gRPC.
   833  	// Make a GET call first to retrieve ObjectAttrs.
   834  	attrs, err := c.GetObject(ctx, &getObjectParams{bucket, object, defaultGen, nil, nil, false}, opts...)
   835  	if err != nil {
   836  		return err
   837  	}
   838  	// Note: This API currently does not support entites using project ID.
   839  	// Use project numbers in ACL entities. Pending b/233617896.
   840  	var acl []ACLRule
   841  	aclRule := ACLRule{Entity: entity, Role: role}
   842  	acl = append(attrs.ACL, aclRule)
   843  	uattrs := &ObjectAttrsToUpdate{ACL: acl}
   844  	// Call UpdateObject with the specified metageneration.
   845  	params := &updateObjectParams{bucket: bucket, object: object, uattrs: uattrs, gen: defaultGen, conds: &Conditions{MetagenerationMatch: attrs.Metageneration}}
   846  	if _, err = c.UpdateObject(ctx, params, opts...); err != nil {
   847  		return err
   848  	}
   849  	return nil
   850  }
   851  
   852  // Media operations.
   853  
   854  func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjectRequest, opts ...storageOption) (*ObjectAttrs, error) {
   855  	s := callSettings(c.settings, opts...)
   856  	if s.userProject != "" {
   857  		ctx = setUserProjectMetadata(ctx, s.userProject)
   858  	}
   859  
   860  	dstObjPb := req.dstObject.attrs.toProtoObject(req.dstBucket)
   861  	dstObjPb.Name = req.dstObject.name
   862  
   863  	if req.sendCRC32C {
   864  		dstObjPb.Checksums.Crc32C = &req.dstObject.attrs.CRC32C
   865  	}
   866  
   867  	srcs := []*storagepb.ComposeObjectRequest_SourceObject{}
   868  	for _, src := range req.srcs {
   869  		srcObjPb := &storagepb.ComposeObjectRequest_SourceObject{Name: src.name, ObjectPreconditions: &storagepb.ComposeObjectRequest_SourceObject_ObjectPreconditions{}}
   870  		if src.gen >= 0 {
   871  			srcObjPb.Generation = src.gen
   872  		}
   873  		if err := applyCondsProto("ComposeObject source", defaultGen, src.conds, srcObjPb.ObjectPreconditions); err != nil {
   874  			return nil, err
   875  		}
   876  		srcs = append(srcs, srcObjPb)
   877  	}
   878  
   879  	rawReq := &storagepb.ComposeObjectRequest{
   880  		Destination:   dstObjPb,
   881  		SourceObjects: srcs,
   882  	}
   883  	if err := applyCondsProto("ComposeObject destination", defaultGen, req.dstObject.conds, rawReq); err != nil {
   884  		return nil, err
   885  	}
   886  	if req.predefinedACL != "" {
   887  		rawReq.DestinationPredefinedAcl = req.predefinedACL
   888  	}
   889  	if req.dstObject.encryptionKey != nil {
   890  		rawReq.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey)
   891  	}
   892  
   893  	var obj *storagepb.Object
   894  	var err error
   895  	if err := run(ctx, func(ctx context.Context) error {
   896  		obj, err = c.raw.ComposeObject(ctx, rawReq, s.gax...)
   897  		return err
   898  	}, s.retry, s.idempotent); err != nil {
   899  		return nil, err
   900  	}
   901  
   902  	return newObjectFromProto(obj), nil
   903  }
   904  func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) {
   905  	s := callSettings(c.settings, opts...)
   906  	obj := req.dstObject.attrs.toProtoObject("")
   907  	call := &storagepb.RewriteObjectRequest{
   908  		SourceBucket:              bucketResourceName(globalProjectAlias, req.srcObject.bucket),
   909  		SourceObject:              req.srcObject.name,
   910  		RewriteToken:              req.token,
   911  		DestinationBucket:         bucketResourceName(globalProjectAlias, req.dstObject.bucket),
   912  		DestinationName:           req.dstObject.name,
   913  		Destination:               obj,
   914  		DestinationKmsKey:         req.dstObject.keyName,
   915  		DestinationPredefinedAcl:  req.predefinedACL,
   916  		CommonObjectRequestParams: toProtoCommonObjectRequestParams(req.dstObject.encryptionKey),
   917  	}
   918  
   919  	// The userProject, whether source or destination project, is decided by the code calling the interface.
   920  	if s.userProject != "" {
   921  		ctx = setUserProjectMetadata(ctx, s.userProject)
   922  	}
   923  	if err := applyCondsProto("Copy destination", defaultGen, req.dstObject.conds, call); err != nil {
   924  		return nil, err
   925  	}
   926  	if err := applySourceCondsProto(req.srcObject.gen, req.srcObject.conds, call); err != nil {
   927  		return nil, err
   928  	}
   929  
   930  	if len(req.dstObject.encryptionKey) > 0 {
   931  		call.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey)
   932  	}
   933  	if len(req.srcObject.encryptionKey) > 0 {
   934  		srcParams := toProtoCommonObjectRequestParams(req.srcObject.encryptionKey)
   935  		call.CopySourceEncryptionAlgorithm = srcParams.GetEncryptionAlgorithm()
   936  		call.CopySourceEncryptionKeyBytes = srcParams.GetEncryptionKeyBytes()
   937  		call.CopySourceEncryptionKeySha256Bytes = srcParams.GetEncryptionKeySha256Bytes()
   938  	}
   939  
   940  	call.MaxBytesRewrittenPerCall = req.maxBytesRewrittenPerCall
   941  
   942  	var res *storagepb.RewriteResponse
   943  	var err error
   944  
   945  	retryCall := func(ctx context.Context) error { res, err = c.raw.RewriteObject(ctx, call, s.gax...); return err }
   946  
   947  	if err := run(ctx, retryCall, s.retry, s.idempotent); err != nil {
   948  		return nil, err
   949  	}
   950  
   951  	r := &rewriteObjectResponse{
   952  		done:     res.GetDone(),
   953  		written:  res.GetTotalBytesRewritten(),
   954  		size:     res.GetObjectSize(),
   955  		token:    res.GetRewriteToken(),
   956  		resource: newObjectFromProto(res.GetResource()),
   957  	}
   958  
   959  	return r, nil
   960  }
   961  
   962  // bytesCodec is a grpc codec which permits receiving messages as either
   963  // protobuf messages, or as raw []bytes.
   964  type bytesCodec struct {
   965  	encoding.Codec
   966  }
   967  
   968  func (bytesCodec) Marshal(v any) ([]byte, error) {
   969  	vv, ok := v.(proto.Message)
   970  	if !ok {
   971  		return nil, fmt.Errorf("failed to marshal, message is %T, want proto.Message", v)
   972  	}
   973  	return proto.Marshal(vv)
   974  }
   975  
   976  func (bytesCodec) Unmarshal(data []byte, v any) error {
   977  	switch v := v.(type) {
   978  	case *[]byte:
   979  		// If gRPC could recycle the data []byte after unmarshaling (through
   980  		// buffer pools), we would need to make a copy here.
   981  		*v = data
   982  		return nil
   983  	case proto.Message:
   984  		return proto.Unmarshal(data, v)
   985  	default:
   986  		return fmt.Errorf("can not unmarshal type %T", v)
   987  	}
   988  }
   989  
   990  func (bytesCodec) Name() string {
   991  	// If this isn't "", then gRPC sets the content-subtype of the call to this
   992  	// value and we get errors.
   993  	return ""
   994  }
   995  
   996  func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) {
   997  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.NewRangeReader")
   998  	defer func() { trace.EndSpan(ctx, err) }()
   999  
  1000  	s := callSettings(c.settings, opts...)
  1001  
  1002  	s.gax = append(s.gax, gax.WithGRPCOptions(
  1003  		grpc.ForceCodec(bytesCodec{}),
  1004  	))
  1005  
  1006  	if s.userProject != "" {
  1007  		ctx = setUserProjectMetadata(ctx, s.userProject)
  1008  	}
  1009  
  1010  	b := bucketResourceName(globalProjectAlias, params.bucket)
  1011  	req := &storagepb.ReadObjectRequest{
  1012  		Bucket:                    b,
  1013  		Object:                    params.object,
  1014  		CommonObjectRequestParams: toProtoCommonObjectRequestParams(params.encryptionKey),
  1015  	}
  1016  	// The default is a negative value, which means latest.
  1017  	if params.gen >= 0 {
  1018  		req.Generation = params.gen
  1019  	}
  1020  
  1021  	var databuf []byte
  1022  
  1023  	// Define a function that initiates a Read with offset and length, assuming
  1024  	// we have already read seen bytes.
  1025  	reopen := func(seen int64) (*readStreamResponse, context.CancelFunc, error) {
  1026  		// If the context has already expired, return immediately without making
  1027  		// we call.
  1028  		if err := ctx.Err(); err != nil {
  1029  			return nil, nil, err
  1030  		}
  1031  
  1032  		cc, cancel := context.WithCancel(ctx)
  1033  
  1034  		req.ReadOffset = params.offset + seen
  1035  
  1036  		// Only set a ReadLimit if length is greater than zero, because <= 0 means
  1037  		// to read it all.
  1038  		if params.length > 0 {
  1039  			req.ReadLimit = params.length - seen
  1040  		}
  1041  
  1042  		if err := applyCondsProto("gRPCReader.reopen", params.gen, params.conds, req); err != nil {
  1043  			cancel()
  1044  			return nil, nil, err
  1045  		}
  1046  
  1047  		var stream storagepb.Storage_ReadObjectClient
  1048  		var msg *storagepb.ReadObjectResponse
  1049  		var err error
  1050  
  1051  		err = run(cc, func(ctx context.Context) error {
  1052  			stream, err = c.raw.ReadObject(cc, req, s.gax...)
  1053  			if err != nil {
  1054  				return err
  1055  			}
  1056  
  1057  			// Receive the message into databuf as a wire-encoded message so we can
  1058  			// use a custom decoder to avoid an extra copy at the protobuf layer.
  1059  			err := stream.RecvMsg(&databuf)
  1060  			// These types of errors show up on the Recv call, rather than the
  1061  			// initialization of the stream via ReadObject above.
  1062  			if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
  1063  				return ErrObjectNotExist
  1064  			}
  1065  			if err != nil {
  1066  				return err
  1067  			}
  1068  			// Use a custom decoder that uses protobuf unmarshalling for all
  1069  			// fields except the checksummed data.
  1070  			// Subsequent receives in Read calls will skip all protobuf
  1071  			// unmarshalling and directly read the content from the gRPC []byte
  1072  			// response, since only the first call will contain other fields.
  1073  			msg, err = readFullObjectResponse(databuf)
  1074  
  1075  			return err
  1076  		}, s.retry, s.idempotent)
  1077  		if err != nil {
  1078  			// Close the stream context we just created to ensure we don't leak
  1079  			// resources.
  1080  			cancel()
  1081  			return nil, nil, err
  1082  		}
  1083  
  1084  		return &readStreamResponse{stream, msg}, cancel, nil
  1085  	}
  1086  
  1087  	res, cancel, err := reopen(0)
  1088  	if err != nil {
  1089  		return nil, err
  1090  	}
  1091  
  1092  	// The first message was Recv'd on stream open, use it to populate the
  1093  	// object metadata.
  1094  	msg := res.response
  1095  	obj := msg.GetMetadata()
  1096  	// This is the size of the entire object, even if only a range was requested.
  1097  	size := obj.GetSize()
  1098  
  1099  	// Only support checksums when reading an entire object, not a range.
  1100  	var (
  1101  		wantCRC  uint32
  1102  		checkCRC bool
  1103  	)
  1104  	if checksums := msg.GetObjectChecksums(); checksums != nil && checksums.Crc32C != nil && params.offset == 0 && params.length < 0 {
  1105  		wantCRC = checksums.GetCrc32C()
  1106  		checkCRC = true
  1107  	}
  1108  
  1109  	r = &Reader{
  1110  		Attrs: ReaderObjectAttrs{
  1111  			Size:            size,
  1112  			ContentType:     obj.GetContentType(),
  1113  			ContentEncoding: obj.GetContentEncoding(),
  1114  			CacheControl:    obj.GetCacheControl(),
  1115  			LastModified:    obj.GetUpdateTime().AsTime(),
  1116  			Metageneration:  obj.GetMetageneration(),
  1117  			Generation:      obj.GetGeneration(),
  1118  		},
  1119  		reader: &gRPCReader{
  1120  			stream: res.stream,
  1121  			reopen: reopen,
  1122  			cancel: cancel,
  1123  			size:   size,
  1124  			// Store the content from the first Recv in the
  1125  			// client buffer for reading later.
  1126  			leftovers: msg.GetChecksummedData().GetContent(),
  1127  			settings:  s,
  1128  			zeroRange: params.length == 0,
  1129  			databuf:   databuf,
  1130  			wantCRC:   wantCRC,
  1131  			checkCRC:  checkCRC,
  1132  		},
  1133  		checkCRC: checkCRC,
  1134  	}
  1135  
  1136  	cr := msg.GetContentRange()
  1137  	if cr != nil {
  1138  		r.Attrs.StartOffset = cr.GetStart()
  1139  		r.remain = cr.GetEnd() - cr.GetStart()
  1140  	} else {
  1141  		r.remain = size
  1142  	}
  1143  
  1144  	// For a zero-length request, explicitly close the stream and set remaining
  1145  	// bytes to zero.
  1146  	if params.length == 0 {
  1147  		r.remain = 0
  1148  		r.reader.Close()
  1149  	}
  1150  
  1151  	return r, nil
  1152  }
  1153  
  1154  func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error) {
  1155  	s := callSettings(c.settings, opts...)
  1156  
  1157  	var offset int64
  1158  	errorf := params.setError
  1159  	progress := params.progress
  1160  	setObj := params.setObj
  1161  
  1162  	pr, pw := io.Pipe()
  1163  	gw := newGRPCWriter(c, params, pr)
  1164  	gw.settings = s
  1165  	if s.userProject != "" {
  1166  		gw.ctx = setUserProjectMetadata(gw.ctx, s.userProject)
  1167  	}
  1168  
  1169  	// This function reads the data sent to the pipe and sends sets of messages
  1170  	// on the gRPC client-stream as the buffer is filled.
  1171  	go func() {
  1172  		defer close(params.donec)
  1173  
  1174  		// Loop until there is an error or the Object has been finalized.
  1175  		for {
  1176  			// Note: This blocks until either the buffer is full or EOF is read.
  1177  			recvd, doneReading, err := gw.read()
  1178  			if err != nil {
  1179  				err = checkCanceled(err)
  1180  				errorf(err)
  1181  				pr.CloseWithError(err)
  1182  				return
  1183  			}
  1184  
  1185  			if params.attrs.Retention != nil {
  1186  				// TO-DO: remove once ObjectRetention is available - see b/308194853
  1187  				err = status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC")
  1188  				errorf(err)
  1189  				pr.CloseWithError(err)
  1190  				return
  1191  			}
  1192  			// The chunk buffer is full, but there is no end in sight. This
  1193  			// means that either:
  1194  			// 1. A resumable upload will need to be used to send
  1195  			// multiple chunks, until we are done reading data. Start a
  1196  			// resumable upload if it has not already been started.
  1197  			// 2. ChunkSize of zero may also have a full buffer, but a resumable
  1198  			// session should not be initiated in this case.
  1199  			if !doneReading && gw.upid == "" && params.chunkSize != 0 {
  1200  				err = gw.startResumableUpload()
  1201  				if err != nil {
  1202  					err = checkCanceled(err)
  1203  					errorf(err)
  1204  					pr.CloseWithError(err)
  1205  					return
  1206  				}
  1207  			}
  1208  
  1209  			o, off, err := gw.uploadBuffer(recvd, offset, doneReading)
  1210  			if err != nil {
  1211  				err = checkCanceled(err)
  1212  				errorf(err)
  1213  				pr.CloseWithError(err)
  1214  				return
  1215  			}
  1216  
  1217  			// At this point, the current buffer has been uploaded. For resumable
  1218  			// uploads and chunkSize = 0, capture the committed offset here in case
  1219  			// the upload was not finalized and another chunk is to be uploaded. Call
  1220  			// the progress function for resumable uploads only.
  1221  			if gw.upid != "" || gw.chunkSize == 0 {
  1222  				offset = off
  1223  			}
  1224  			if gw.upid != "" {
  1225  				progress(offset)
  1226  			}
  1227  
  1228  			// When we are done reading data without errors, set the object and
  1229  			// finish.
  1230  			if doneReading {
  1231  				// Build Object from server's response.
  1232  				setObj(newObjectFromProto(o))
  1233  				return
  1234  			}
  1235  		}
  1236  	}()
  1237  
  1238  	return pw, nil
  1239  }
  1240  
  1241  // IAM methods.
  1242  
  1243  func (c *grpcStorageClient) GetIamPolicy(ctx context.Context, resource string, version int32, opts ...storageOption) (*iampb.Policy, error) {
  1244  	// TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter.
  1245  	s := callSettings(c.settings, opts...)
  1246  	req := &iampb.GetIamPolicyRequest{
  1247  		Resource: bucketResourceName(globalProjectAlias, resource),
  1248  		Options: &iampb.GetPolicyOptions{
  1249  			RequestedPolicyVersion: version,
  1250  		},
  1251  	}
  1252  	var rp *iampb.Policy
  1253  	err := run(ctx, func(ctx context.Context) error {
  1254  		var err error
  1255  		rp, err = c.raw.GetIamPolicy(ctx, req, s.gax...)
  1256  		return err
  1257  	}, s.retry, s.idempotent)
  1258  
  1259  	return rp, err
  1260  }
  1261  
  1262  func (c *grpcStorageClient) SetIamPolicy(ctx context.Context, resource string, policy *iampb.Policy, opts ...storageOption) error {
  1263  	// TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter.
  1264  	s := callSettings(c.settings, opts...)
  1265  
  1266  	req := &iampb.SetIamPolicyRequest{
  1267  		Resource: bucketResourceName(globalProjectAlias, resource),
  1268  		Policy:   policy,
  1269  	}
  1270  
  1271  	return run(ctx, func(ctx context.Context) error {
  1272  		_, err := c.raw.SetIamPolicy(ctx, req, s.gax...)
  1273  		return err
  1274  	}, s.retry, s.idempotent)
  1275  }
  1276  
  1277  func (c *grpcStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) {
  1278  	// TODO: Need a way to set UserProject, potentially in X-Goog-User-Project system parameter.
  1279  	s := callSettings(c.settings, opts...)
  1280  	req := &iampb.TestIamPermissionsRequest{
  1281  		Resource:    bucketResourceName(globalProjectAlias, resource),
  1282  		Permissions: permissions,
  1283  	}
  1284  	var res *iampb.TestIamPermissionsResponse
  1285  	err := run(ctx, func(ctx context.Context) error {
  1286  		var err error
  1287  		res, err = c.raw.TestIamPermissions(ctx, req, s.gax...)
  1288  		return err
  1289  	}, s.retry, s.idempotent)
  1290  	if err != nil {
  1291  		return nil, err
  1292  	}
  1293  	return res.Permissions, nil
  1294  }
  1295  
  1296  // HMAC Key methods.
  1297  
  1298  func (c *grpcStorageClient) GetHMACKey(ctx context.Context, project, accessID string, opts ...storageOption) (*HMACKey, error) {
  1299  	s := callSettings(c.settings, opts...)
  1300  	req := &storagepb.GetHmacKeyRequest{
  1301  		AccessId: accessID,
  1302  		Project:  toProjectResource(project),
  1303  	}
  1304  	if s.userProject != "" {
  1305  		ctx = setUserProjectMetadata(ctx, s.userProject)
  1306  	}
  1307  	var metadata *storagepb.HmacKeyMetadata
  1308  	err := run(ctx, func(ctx context.Context) error {
  1309  		var err error
  1310  		metadata, err = c.raw.GetHmacKey(ctx, req, s.gax...)
  1311  		return err
  1312  	}, s.retry, s.idempotent)
  1313  	if err != nil {
  1314  		return nil, err
  1315  	}
  1316  	return toHMACKeyFromProto(metadata), nil
  1317  }
  1318  
  1319  func (c *grpcStorageClient) ListHMACKeys(ctx context.Context, project, serviceAccountEmail string, showDeletedKeys bool, opts ...storageOption) *HMACKeysIterator {
  1320  	s := callSettings(c.settings, opts...)
  1321  	req := &storagepb.ListHmacKeysRequest{
  1322  		Project:             toProjectResource(project),
  1323  		ServiceAccountEmail: serviceAccountEmail,
  1324  		ShowDeletedKeys:     showDeletedKeys,
  1325  	}
  1326  	if s.userProject != "" {
  1327  		ctx = setUserProjectMetadata(ctx, s.userProject)
  1328  	}
  1329  	it := &HMACKeysIterator{
  1330  		ctx:       ctx,
  1331  		projectID: project,
  1332  		retry:     s.retry,
  1333  	}
  1334  	fetch := func(pageSize int, pageToken string) (token string, err error) {
  1335  		var hmacKeys []*storagepb.HmacKeyMetadata
  1336  		err = run(it.ctx, func(ctx context.Context) error {
  1337  			gitr := c.raw.ListHmacKeys(ctx, req, s.gax...)
  1338  			hmacKeys, token, err = gitr.InternalFetch(pageSize, pageToken)
  1339  			return err
  1340  		}, s.retry, s.idempotent)
  1341  		if err != nil {
  1342  			return "", err
  1343  		}
  1344  		for _, hkmd := range hmacKeys {
  1345  			hk := toHMACKeyFromProto(hkmd)
  1346  			it.hmacKeys = append(it.hmacKeys, hk)
  1347  		}
  1348  
  1349  		return token, nil
  1350  	}
  1351  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
  1352  		fetch,
  1353  		func() int { return len(it.hmacKeys) - it.index },
  1354  		func() interface{} {
  1355  			prev := it.hmacKeys
  1356  			it.hmacKeys = it.hmacKeys[:0]
  1357  			it.index = 0
  1358  			return prev
  1359  		})
  1360  	return it
  1361  }
  1362  
  1363  func (c *grpcStorageClient) UpdateHMACKey(ctx context.Context, project, serviceAccountEmail, accessID string, attrs *HMACKeyAttrsToUpdate, opts ...storageOption) (*HMACKey, error) {
  1364  	s := callSettings(c.settings, opts...)
  1365  	hk := &storagepb.HmacKeyMetadata{
  1366  		AccessId:            accessID,
  1367  		Project:             toProjectResource(project),
  1368  		ServiceAccountEmail: serviceAccountEmail,
  1369  		State:               string(attrs.State),
  1370  		Etag:                attrs.Etag,
  1371  	}
  1372  	var paths []string
  1373  	fieldMask := &fieldmaskpb.FieldMask{
  1374  		Paths: paths,
  1375  	}
  1376  	if attrs.State != "" {
  1377  		fieldMask.Paths = append(fieldMask.Paths, "state")
  1378  	}
  1379  	req := &storagepb.UpdateHmacKeyRequest{
  1380  		HmacKey:    hk,
  1381  		UpdateMask: fieldMask,
  1382  	}
  1383  	if s.userProject != "" {
  1384  		ctx = setUserProjectMetadata(ctx, s.userProject)
  1385  	}
  1386  	var metadata *storagepb.HmacKeyMetadata
  1387  	err := run(ctx, func(ctx context.Context) error {
  1388  		var err error
  1389  		metadata, err = c.raw.UpdateHmacKey(ctx, req, s.gax...)
  1390  		return err
  1391  	}, s.retry, s.idempotent)
  1392  	if err != nil {
  1393  		return nil, err
  1394  	}
  1395  	return toHMACKeyFromProto(metadata), nil
  1396  }
  1397  
  1398  func (c *grpcStorageClient) CreateHMACKey(ctx context.Context, project, serviceAccountEmail string, opts ...storageOption) (*HMACKey, error) {
  1399  	s := callSettings(c.settings, opts...)
  1400  	req := &storagepb.CreateHmacKeyRequest{
  1401  		Project:             toProjectResource(project),
  1402  		ServiceAccountEmail: serviceAccountEmail,
  1403  	}
  1404  	if s.userProject != "" {
  1405  		ctx = setUserProjectMetadata(ctx, s.userProject)
  1406  	}
  1407  	var res *storagepb.CreateHmacKeyResponse
  1408  	err := run(ctx, func(ctx context.Context) error {
  1409  		var err error
  1410  		res, err = c.raw.CreateHmacKey(ctx, req, s.gax...)
  1411  		return err
  1412  	}, s.retry, s.idempotent)
  1413  	if err != nil {
  1414  		return nil, err
  1415  	}
  1416  	key := toHMACKeyFromProto(res.Metadata)
  1417  	key.Secret = base64.StdEncoding.EncodeToString(res.SecretKeyBytes)
  1418  
  1419  	return key, nil
  1420  }
  1421  
  1422  func (c *grpcStorageClient) DeleteHMACKey(ctx context.Context, project string, accessID string, opts ...storageOption) error {
  1423  	s := callSettings(c.settings, opts...)
  1424  	req := &storagepb.DeleteHmacKeyRequest{
  1425  		AccessId: accessID,
  1426  		Project:  toProjectResource(project),
  1427  	}
  1428  	if s.userProject != "" {
  1429  		ctx = setUserProjectMetadata(ctx, s.userProject)
  1430  	}
  1431  	return run(ctx, func(ctx context.Context) error {
  1432  		return c.raw.DeleteHmacKey(ctx, req, s.gax...)
  1433  	}, s.retry, s.idempotent)
  1434  }
  1435  
  1436  // Notification methods.
  1437  
  1438  func (c *grpcStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) {
  1439  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.ListNotifications")
  1440  	defer func() { trace.EndSpan(ctx, err) }()
  1441  
  1442  	s := callSettings(c.settings, opts...)
  1443  	if s.userProject != "" {
  1444  		ctx = setUserProjectMetadata(ctx, s.userProject)
  1445  	}
  1446  	req := &storagepb.ListNotificationConfigsRequest{
  1447  		Parent: bucketResourceName(globalProjectAlias, bucket),
  1448  	}
  1449  	var notifications []*storagepb.NotificationConfig
  1450  	err = run(ctx, func(ctx context.Context) error {
  1451  		gitr := c.raw.ListNotificationConfigs(ctx, req, s.gax...)
  1452  		for {
  1453  			// PageSize is not set and fallbacks to the API default pageSize of 100.
  1454  			items, nextPageToken, err := gitr.InternalFetch(int(req.GetPageSize()), req.GetPageToken())
  1455  			if err != nil {
  1456  				return err
  1457  			}
  1458  			notifications = append(notifications, items...)
  1459  			// If there are no more results, nextPageToken is empty and err is nil.
  1460  			if nextPageToken == "" {
  1461  				return err
  1462  			}
  1463  			req.PageToken = nextPageToken
  1464  		}
  1465  	}, s.retry, s.idempotent)
  1466  	if err != nil {
  1467  		return nil, err
  1468  	}
  1469  
  1470  	return notificationsToMapFromProto(notifications), nil
  1471  }
  1472  
  1473  func (c *grpcStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) {
  1474  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.CreateNotification")
  1475  	defer func() { trace.EndSpan(ctx, err) }()
  1476  
  1477  	s := callSettings(c.settings, opts...)
  1478  	req := &storagepb.CreateNotificationConfigRequest{
  1479  		Parent:             bucketResourceName(globalProjectAlias, bucket),
  1480  		NotificationConfig: toProtoNotification(n),
  1481  	}
  1482  	var pbn *storagepb.NotificationConfig
  1483  	err = run(ctx, func(ctx context.Context) error {
  1484  		var err error
  1485  		pbn, err = c.raw.CreateNotificationConfig(ctx, req, s.gax...)
  1486  		return err
  1487  	}, s.retry, s.idempotent)
  1488  	if err != nil {
  1489  		return nil, err
  1490  	}
  1491  	return toNotificationFromProto(pbn), err
  1492  }
  1493  
  1494  func (c *grpcStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) {
  1495  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.DeleteNotification")
  1496  	defer func() { trace.EndSpan(ctx, err) }()
  1497  
  1498  	s := callSettings(c.settings, opts...)
  1499  	req := &storagepb.DeleteNotificationConfigRequest{Name: id}
  1500  	return run(ctx, func(ctx context.Context) error {
  1501  		return c.raw.DeleteNotificationConfig(ctx, req, s.gax...)
  1502  	}, s.retry, s.idempotent)
  1503  }
  1504  
  1505  // setUserProjectMetadata appends a project ID to the outgoing Context metadata
  1506  // via the x-goog-user-project system parameter defined at
  1507  // https://cloud.google.com/apis/docs/system-parameters. This is only for
  1508  // billing purposes, and is generally optional, except for requester-pays
  1509  // buckets.
  1510  func setUserProjectMetadata(ctx context.Context, project string) context.Context {
  1511  	return metadata.AppendToOutgoingContext(ctx, "x-goog-user-project", project)
  1512  }
  1513  
  1514  type readStreamResponse struct {
  1515  	stream   storagepb.Storage_ReadObjectClient
  1516  	response *storagepb.ReadObjectResponse
  1517  }
  1518  
  1519  type gRPCReader struct {
  1520  	seen, size int64
  1521  	zeroRange  bool
  1522  	stream     storagepb.Storage_ReadObjectClient
  1523  	reopen     func(seen int64) (*readStreamResponse, context.CancelFunc, error)
  1524  	leftovers  []byte
  1525  	databuf    []byte
  1526  	cancel     context.CancelFunc
  1527  	settings   *settings
  1528  	checkCRC   bool   // should we check the CRC?
  1529  	wantCRC    uint32 // the CRC32c value the server sent in the header
  1530  	gotCRC     uint32 // running crc
  1531  }
  1532  
  1533  // Update the running CRC with the data in the slice, if CRC checking was enabled.
  1534  func (r *gRPCReader) updateCRC(b []byte) {
  1535  	if r.checkCRC {
  1536  		r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, b)
  1537  	}
  1538  }
  1539  
  1540  // Checks whether the CRC matches at the conclusion of a read, if CRC checking was enabled.
  1541  func (r *gRPCReader) runCRCCheck() error {
  1542  	if r.checkCRC && r.gotCRC != r.wantCRC {
  1543  		return fmt.Errorf("storage: bad CRC on read: got %d, want %d", r.gotCRC, r.wantCRC)
  1544  	}
  1545  	return nil
  1546  }
  1547  
  1548  // Read reads bytes into the user's buffer from an open gRPC stream.
  1549  func (r *gRPCReader) Read(p []byte) (int, error) {
  1550  	// The entire object has been read by this reader, check the checksum if
  1551  	// necessary and return EOF.
  1552  	if r.size == r.seen || r.zeroRange {
  1553  		if err := r.runCRCCheck(); err != nil {
  1554  			return 0, err
  1555  		}
  1556  		return 0, io.EOF
  1557  	}
  1558  
  1559  	// No stream to read from, either never initialized or Close was called.
  1560  	// Note: There is a potential concurrency issue if multiple routines are
  1561  	// using the same reader. One encounters an error and the stream is closed
  1562  	// and then reopened while the other routine attempts to read from it.
  1563  	if r.stream == nil {
  1564  		return 0, fmt.Errorf("storage: reader has been closed")
  1565  	}
  1566  
  1567  	var n int
  1568  	// Read leftovers and return what was available to conform to the Reader
  1569  	// interface: https://pkg.go.dev/io#Reader.
  1570  	if len(r.leftovers) > 0 {
  1571  		n = copy(p, r.leftovers)
  1572  		r.seen += int64(n)
  1573  		r.updateCRC(p[:n])
  1574  		r.leftovers = r.leftovers[n:]
  1575  		return n, nil
  1576  	}
  1577  
  1578  	// Attempt to Recv the next message on the stream.
  1579  	content, err := r.recv()
  1580  	if err != nil {
  1581  		return 0, err
  1582  	}
  1583  
  1584  	// TODO: Determine if we need to capture incremental CRC32C for this
  1585  	// chunk. The Object CRC32C checksum is captured when directed to read
  1586  	// the entire Object. If directed to read a range, we may need to
  1587  	// calculate the range's checksum for verification if the checksum is
  1588  	// present in the response here.
  1589  	// TODO: Figure out if we need to support decompressive transcoding
  1590  	// https://cloud.google.com/storage/docs/transcoding.
  1591  	n = copy(p[n:], content)
  1592  	leftover := len(content) - n
  1593  	if leftover > 0 {
  1594  		// Wasn't able to copy all of the data in the message, store for
  1595  		// future Read calls.
  1596  		r.leftovers = content[n:]
  1597  	}
  1598  	r.seen += int64(n)
  1599  	r.updateCRC(p[:n])
  1600  
  1601  	return n, nil
  1602  }
  1603  
  1604  // WriteTo writes all the data requested by the Reader into w, implementing
  1605  // io.WriterTo.
  1606  func (r *gRPCReader) WriteTo(w io.Writer) (int64, error) {
  1607  	// The entire object has been read by this reader, check the checksum if
  1608  	// necessary and return nil.
  1609  	if r.size == r.seen || r.zeroRange {
  1610  		if err := r.runCRCCheck(); err != nil {
  1611  			return 0, err
  1612  		}
  1613  		return 0, nil
  1614  	}
  1615  
  1616  	// No stream to read from, either never initialized or Close was called.
  1617  	// Note: There is a potential concurrency issue if multiple routines are
  1618  	// using the same reader. One encounters an error and the stream is closed
  1619  	// and then reopened while the other routine attempts to read from it.
  1620  	if r.stream == nil {
  1621  		return 0, fmt.Errorf("storage: reader has been closed")
  1622  	}
  1623  
  1624  	// Track bytes written during before call.
  1625  	var alreadySeen = r.seen
  1626  
  1627  	// Write any leftovers to the stream. There will be some leftovers from the
  1628  	// original NewRangeReader call.
  1629  	if len(r.leftovers) > 0 {
  1630  		// Write() will write the entire leftovers slice unless there is an error.
  1631  		written, err := w.Write(r.leftovers)
  1632  		r.seen += int64(written)
  1633  		r.updateCRC(r.leftovers)
  1634  		r.leftovers = nil
  1635  		if err != nil {
  1636  			return r.seen - alreadySeen, err
  1637  		}
  1638  	}
  1639  
  1640  	// Loop and receive additional messages until the entire data is written.
  1641  	for {
  1642  		// Attempt to receive the next message on the stream.
  1643  		// Will terminate with io.EOF once data has all come through.
  1644  		// recv() handles stream reopening and retry logic so no need for retries here.
  1645  		msg, err := r.recv()
  1646  		if err != nil {
  1647  			if err == io.EOF {
  1648  				// We are done; check the checksum if necessary and return.
  1649  				err = r.runCRCCheck()
  1650  			}
  1651  			return r.seen - alreadySeen, err
  1652  		}
  1653  
  1654  		// TODO: Determine if we need to capture incremental CRC32C for this
  1655  		// chunk. The Object CRC32C checksum is captured when directed to read
  1656  		// the entire Object. If directed to read a range, we may need to
  1657  		// calculate the range's checksum for verification if the checksum is
  1658  		// present in the response here.
  1659  		// TODO: Figure out if we need to support decompressive transcoding
  1660  		// https://cloud.google.com/storage/docs/transcoding.
  1661  		written, err := w.Write(msg)
  1662  		r.seen += int64(written)
  1663  		r.updateCRC(msg)
  1664  		if err != nil {
  1665  			return r.seen - alreadySeen, err
  1666  		}
  1667  	}
  1668  
  1669  }
  1670  
  1671  // Close cancels the read stream's context in order for it to be closed and
  1672  // collected.
  1673  func (r *gRPCReader) Close() error {
  1674  	if r.cancel != nil {
  1675  		r.cancel()
  1676  	}
  1677  	r.stream = nil
  1678  	return nil
  1679  }
  1680  
  1681  // recv attempts to Recv the next message on the stream and extract the object
  1682  // data that it contains. In the event that a retryable error is encountered,
  1683  // the stream will be closed, reopened, and RecvMsg again.
  1684  // This will attempt to Recv until one of the following is true:
  1685  //
  1686  // * Recv is successful
  1687  // * A non-retryable error is encountered
  1688  // * The Reader's context is canceled
  1689  //
  1690  // The last error received is the one that is returned, which could be from
  1691  // an attempt to reopen the stream.
  1692  func (r *gRPCReader) recv() ([]byte, error) {
  1693  	err := r.stream.RecvMsg(&r.databuf)
  1694  
  1695  	var shouldRetry = ShouldRetry
  1696  	if r.settings.retry != nil && r.settings.retry.shouldRetry != nil {
  1697  		shouldRetry = r.settings.retry.shouldRetry
  1698  	}
  1699  	if err != nil && shouldRetry(err) {
  1700  		// This will "close" the existing stream and immediately attempt to
  1701  		// reopen the stream, but will backoff if further attempts are necessary.
  1702  		// Reopening the stream Recvs the first message, so if retrying is
  1703  		// successful, the next logical chunk will be returned.
  1704  		msg, err := r.reopenStream()
  1705  		return msg.GetChecksummedData().GetContent(), err
  1706  	}
  1707  
  1708  	if err != nil {
  1709  		return nil, err
  1710  	}
  1711  
  1712  	return readObjectResponseContent(r.databuf)
  1713  }
  1714  
  1715  // ReadObjectResponse field and subfield numbers.
  1716  const (
  1717  	checksummedDataField        = protowire.Number(1)
  1718  	checksummedDataContentField = protowire.Number(1)
  1719  	checksummedDataCRC32CField  = protowire.Number(2)
  1720  	objectChecksumsField        = protowire.Number(2)
  1721  	contentRangeField           = protowire.Number(3)
  1722  	metadataField               = protowire.Number(4)
  1723  )
  1724  
  1725  // readObjectResponseContent returns the checksummed_data.content field of a
  1726  // ReadObjectResponse message, or an error if the message is invalid.
  1727  // This can be used on recvs of objects after the first recv, since only the
  1728  // first message will contain non-data fields.
  1729  func readObjectResponseContent(b []byte) ([]byte, error) {
  1730  	checksummedData, err := readProtoBytes(b, checksummedDataField)
  1731  	if err != nil {
  1732  		return b, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData: %v", err)
  1733  	}
  1734  	content, err := readProtoBytes(checksummedData, checksummedDataContentField)
  1735  	if err != nil {
  1736  		return content, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Content: %v", err)
  1737  	}
  1738  
  1739  	return content, nil
  1740  }
  1741  
  1742  // readFullObjectResponse returns the ReadObjectResponse that is encoded in the
  1743  // wire-encoded message buffer b, or an error if the message is invalid.
  1744  // This must be used on the first recv of an object as it may contain all fields
  1745  // of ReadObjectResponse, and we use or pass on those fields to the user.
  1746  // This function is essentially identical to proto.Unmarshal, except it aliases
  1747  // the data in the input []byte. If the proto library adds a feature to
  1748  // Unmarshal that does that, this function can be dropped.
  1749  func readFullObjectResponse(b []byte) (*storagepb.ReadObjectResponse, error) {
  1750  	msg := &storagepb.ReadObjectResponse{}
  1751  
  1752  	// Loop over the entire message, extracting fields as we go. This does not
  1753  	// handle field concatenation, in which the contents of a single field
  1754  	// are split across multiple protobuf tags.
  1755  	off := 0
  1756  	for off < len(b) {
  1757  		// Consume the next tag. This will tell us which field is next in the
  1758  		// buffer, its type, and how much space it takes up.
  1759  		fieldNum, fieldType, fieldLength := protowire.ConsumeTag(b[off:])
  1760  		if fieldLength < 0 {
  1761  			return nil, protowire.ParseError(fieldLength)
  1762  		}
  1763  		off += fieldLength
  1764  
  1765  		// Unmarshal the field according to its type. Only fields that are not
  1766  		// nil will be present.
  1767  		switch {
  1768  		case fieldNum == checksummedDataField && fieldType == protowire.BytesType:
  1769  			// The ChecksummedData field was found. Initialize the struct.
  1770  			msg.ChecksummedData = &storagepb.ChecksummedData{}
  1771  
  1772  			// Get the bytes corresponding to the checksummed data.
  1773  			fieldContent, n := protowire.ConsumeBytes(b[off:])
  1774  			if n < 0 {
  1775  				return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData: %v", protowire.ParseError(n))
  1776  			}
  1777  			off += n
  1778  
  1779  			// Get the nested fields. We need to do this manually as it contains
  1780  			// the object content bytes.
  1781  			contentOff := 0
  1782  			for contentOff < len(fieldContent) {
  1783  				gotNum, gotTyp, n := protowire.ConsumeTag(fieldContent[contentOff:])
  1784  				if n < 0 {
  1785  					return nil, protowire.ParseError(n)
  1786  				}
  1787  				contentOff += n
  1788  
  1789  				switch {
  1790  				case gotNum == checksummedDataContentField && gotTyp == protowire.BytesType:
  1791  					// Get the content bytes.
  1792  					bytes, n := protowire.ConsumeBytes(fieldContent[contentOff:])
  1793  					if n < 0 {
  1794  						return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Content: %v", protowire.ParseError(n))
  1795  					}
  1796  					msg.ChecksummedData.Content = bytes
  1797  					contentOff += n
  1798  				case gotNum == checksummedDataCRC32CField && gotTyp == protowire.Fixed32Type:
  1799  					v, n := protowire.ConsumeFixed32(fieldContent[contentOff:])
  1800  					if n < 0 {
  1801  						return nil, fmt.Errorf("invalid ReadObjectResponse.ChecksummedData.Crc32C: %v", protowire.ParseError(n))
  1802  					}
  1803  					msg.ChecksummedData.Crc32C = &v
  1804  					contentOff += n
  1805  				default:
  1806  					n = protowire.ConsumeFieldValue(gotNum, gotTyp, fieldContent[contentOff:])
  1807  					if n < 0 {
  1808  						return nil, protowire.ParseError(n)
  1809  					}
  1810  					contentOff += n
  1811  				}
  1812  			}
  1813  		case fieldNum == objectChecksumsField && fieldType == protowire.BytesType:
  1814  			// The field was found. Initialize the struct.
  1815  			msg.ObjectChecksums = &storagepb.ObjectChecksums{}
  1816  
  1817  			// Get the bytes corresponding to the checksums.
  1818  			bytes, n := protowire.ConsumeBytes(b[off:])
  1819  			if n < 0 {
  1820  				return nil, fmt.Errorf("invalid ReadObjectResponse.ObjectChecksums: %v", protowire.ParseError(n))
  1821  			}
  1822  			off += n
  1823  
  1824  			// Unmarshal.
  1825  			if err := proto.Unmarshal(bytes, msg.ObjectChecksums); err != nil {
  1826  				return nil, err
  1827  			}
  1828  		case fieldNum == contentRangeField && fieldType == protowire.BytesType:
  1829  			msg.ContentRange = &storagepb.ContentRange{}
  1830  
  1831  			bytes, n := protowire.ConsumeBytes(b[off:])
  1832  			if n < 0 {
  1833  				return nil, fmt.Errorf("invalid ReadObjectResponse.ContentRange: %v", protowire.ParseError(n))
  1834  			}
  1835  			off += n
  1836  
  1837  			if err := proto.Unmarshal(bytes, msg.ContentRange); err != nil {
  1838  				return nil, err
  1839  			}
  1840  		case fieldNum == metadataField && fieldType == protowire.BytesType:
  1841  			msg.Metadata = &storagepb.Object{}
  1842  
  1843  			bytes, n := protowire.ConsumeBytes(b[off:])
  1844  			if n < 0 {
  1845  				return nil, fmt.Errorf("invalid ReadObjectResponse.Metadata: %v", protowire.ParseError(n))
  1846  			}
  1847  			off += n
  1848  
  1849  			if err := proto.Unmarshal(bytes, msg.Metadata); err != nil {
  1850  				return nil, err
  1851  			}
  1852  		default:
  1853  			fieldLength = protowire.ConsumeFieldValue(fieldNum, fieldType, b[off:])
  1854  			if fieldLength < 0 {
  1855  				return nil, fmt.Errorf("default: %v", protowire.ParseError(fieldLength))
  1856  			}
  1857  			off += fieldLength
  1858  		}
  1859  	}
  1860  
  1861  	return msg, nil
  1862  }
  1863  
  1864  // readProtoBytes returns the contents of the protobuf field with number num
  1865  // and type bytes from a wire-encoded message. If the field cannot be found,
  1866  // the returned slice will be nil and no error will be returned.
  1867  //
  1868  // It does not handle field concatenation, in which the contents of a single field
  1869  // are split across multiple protobuf tags. Encoded data containing split fields
  1870  // of this form is technically permissable, but uncommon.
  1871  func readProtoBytes(b []byte, num protowire.Number) ([]byte, error) {
  1872  	off := 0
  1873  	for off < len(b) {
  1874  		gotNum, gotTyp, n := protowire.ConsumeTag(b[off:])
  1875  		if n < 0 {
  1876  			return nil, protowire.ParseError(n)
  1877  		}
  1878  		off += n
  1879  		if gotNum == num && gotTyp == protowire.BytesType {
  1880  			b, n := protowire.ConsumeBytes(b[off:])
  1881  			if n < 0 {
  1882  				return nil, protowire.ParseError(n)
  1883  			}
  1884  			return b, nil
  1885  		}
  1886  		n = protowire.ConsumeFieldValue(gotNum, gotTyp, b[off:])
  1887  		if n < 0 {
  1888  			return nil, protowire.ParseError(n)
  1889  		}
  1890  		off += n
  1891  	}
  1892  	return nil, nil
  1893  }
  1894  
  1895  // reopenStream "closes" the existing stream and attempts to reopen a stream and
  1896  // sets the Reader's stream and cancelStream properties in the process.
  1897  func (r *gRPCReader) reopenStream() (*storagepb.ReadObjectResponse, error) {
  1898  	// Close existing stream and initialize new stream with updated offset.
  1899  	r.Close()
  1900  
  1901  	res, cancel, err := r.reopen(r.seen)
  1902  	if err != nil {
  1903  		return nil, err
  1904  	}
  1905  	r.stream = res.stream
  1906  	r.cancel = cancel
  1907  	return res.response, nil
  1908  }
  1909  
  1910  func newGRPCWriter(c *grpcStorageClient, params *openWriterParams, r io.Reader) *gRPCWriter {
  1911  	size := params.chunkSize
  1912  
  1913  	// Round up chunksize to nearest 256KiB
  1914  	if size%googleapi.MinUploadChunkSize != 0 {
  1915  		size += googleapi.MinUploadChunkSize - (size % googleapi.MinUploadChunkSize)
  1916  	}
  1917  
  1918  	// A completely bufferless upload is not possible as it is in JSON because
  1919  	// the buffer must be provided to the message. However use the minimum size
  1920  	// possible in this case.
  1921  	if params.chunkSize == 0 {
  1922  		size = googleapi.MinUploadChunkSize
  1923  	}
  1924  
  1925  	return &gRPCWriter{
  1926  		buf:                   make([]byte, size),
  1927  		c:                     c,
  1928  		ctx:                   params.ctx,
  1929  		reader:                r,
  1930  		bucket:                params.bucket,
  1931  		attrs:                 params.attrs,
  1932  		conds:                 params.conds,
  1933  		encryptionKey:         params.encryptionKey,
  1934  		sendCRC32C:            params.sendCRC32C,
  1935  		chunkSize:             params.chunkSize,
  1936  		forceEmptyContentType: params.forceEmptyContentType,
  1937  	}
  1938  }
  1939  
  1940  // gRPCWriter is a wrapper around the the gRPC client-stream API that manages
  1941  // sending chunks of data provided by the user over the stream.
  1942  type gRPCWriter struct {
  1943  	c      *grpcStorageClient
  1944  	buf    []byte
  1945  	reader io.Reader
  1946  
  1947  	ctx context.Context
  1948  
  1949  	bucket        string
  1950  	attrs         *ObjectAttrs
  1951  	conds         *Conditions
  1952  	encryptionKey []byte
  1953  	settings      *settings
  1954  
  1955  	sendCRC32C            bool
  1956  	chunkSize             int
  1957  	forceEmptyContentType bool
  1958  
  1959  	// The gRPC client-stream used for sending buffers.
  1960  	stream storagepb.Storage_BidiWriteObjectClient
  1961  
  1962  	// The Resumable Upload ID started by a gRPC-based Writer.
  1963  	upid string
  1964  }
  1965  
  1966  // startResumableUpload initializes a Resumable Upload with gRPC and sets the
  1967  // upload ID on the Writer.
  1968  func (w *gRPCWriter) startResumableUpload() error {
  1969  	spec, err := w.writeObjectSpec()
  1970  	if err != nil {
  1971  		return err
  1972  	}
  1973  	req := &storagepb.StartResumableWriteRequest{
  1974  		WriteObjectSpec:           spec,
  1975  		CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
  1976  	}
  1977  	// TODO: Currently the checksums are only sent on the request to initialize
  1978  	// the upload, but in the future, we must also support sending it
  1979  	// on the *last* message of the stream.
  1980  	req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
  1981  	return run(w.ctx, func(ctx context.Context) error {
  1982  		upres, err := w.c.raw.StartResumableWrite(w.ctx, req)
  1983  		w.upid = upres.GetUploadId()
  1984  		return err
  1985  	}, w.settings.retry, w.settings.idempotent)
  1986  }
  1987  
  1988  // queryProgress is a helper that queries the status of the resumable upload
  1989  // associated with the given upload ID.
  1990  func (w *gRPCWriter) queryProgress() (int64, error) {
  1991  	var persistedSize int64
  1992  	err := run(w.ctx, func(ctx context.Context) error {
  1993  		q, err := w.c.raw.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{
  1994  			UploadId: w.upid,
  1995  		})
  1996  		persistedSize = q.GetPersistedSize()
  1997  		return err
  1998  	}, w.settings.retry, true)
  1999  
  2000  	// q.GetCommittedSize() will return 0 if q is nil.
  2001  	return persistedSize, err
  2002  }
  2003  
  2004  // uploadBuffer uploads the buffer at the given offset using a bi-directional
  2005  // Write stream. It will open a new stream if necessary (on the first call or
  2006  // after resuming from failure). The resulting write offset after uploading the
  2007  // buffer is returned, as well as well as the final Object if the upload is
  2008  // completed.
  2009  //
  2010  // Returns object, persisted size, and any error that is not retriable.
  2011  func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, error) {
  2012  	var shouldRetry = ShouldRetry
  2013  	if w.settings.retry != nil && w.settings.retry.shouldRetry != nil {
  2014  		shouldRetry = w.settings.retry.shouldRetry
  2015  	}
  2016  
  2017  	var err error
  2018  	var lastWriteOfEntireObject bool
  2019  
  2020  	sent := 0
  2021  	writeOffset := start
  2022  
  2023  	toWrite := w.buf[:recvd]
  2024  
  2025  	// Send a request with as many bytes as possible.
  2026  	// Loop until all bytes are sent.
  2027  sendBytes: // label this loop so that we can use a continue statement from a nested block
  2028  	for {
  2029  		bytesNotYetSent := recvd - sent
  2030  		remainingDataFitsInSingleReq := bytesNotYetSent <= maxPerMessageWriteSize
  2031  
  2032  		if remainingDataFitsInSingleReq && doneReading {
  2033  			lastWriteOfEntireObject = true
  2034  		}
  2035  
  2036  		// Send the maximum amount of bytes we can, unless we don't have that many.
  2037  		bytesToSendInCurrReq := maxPerMessageWriteSize
  2038  		if remainingDataFitsInSingleReq {
  2039  			bytesToSendInCurrReq = bytesNotYetSent
  2040  		}
  2041  
  2042  		// Prepare chunk section for upload.
  2043  		data := toWrite[sent : sent+bytesToSendInCurrReq]
  2044  
  2045  		req := &storagepb.BidiWriteObjectRequest{
  2046  			Data: &storagepb.BidiWriteObjectRequest_ChecksummedData{
  2047  				ChecksummedData: &storagepb.ChecksummedData{
  2048  					Content: data,
  2049  				},
  2050  			},
  2051  			WriteOffset: writeOffset,
  2052  			FinishWrite: lastWriteOfEntireObject,
  2053  			Flush:       remainingDataFitsInSingleReq && !lastWriteOfEntireObject,
  2054  			StateLookup: remainingDataFitsInSingleReq && !lastWriteOfEntireObject,
  2055  		}
  2056  
  2057  		// Open a new stream if necessary and set the first_message field on
  2058  		// the request. The first message on the WriteObject stream must either
  2059  		// be the Object or the Resumable Upload ID.
  2060  		if w.stream == nil {
  2061  			hds := []string{"x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket))}
  2062  			ctx := gax.InsertMetadataIntoOutgoingContext(w.ctx, hds...)
  2063  
  2064  			w.stream, err = w.c.raw.BidiWriteObject(ctx)
  2065  			if err != nil {
  2066  				return nil, 0, err
  2067  			}
  2068  
  2069  			if w.upid != "" { // resumable upload
  2070  				req.FirstMessage = &storagepb.BidiWriteObjectRequest_UploadId{UploadId: w.upid}
  2071  			} else { // non-resumable
  2072  				spec, err := w.writeObjectSpec()
  2073  				if err != nil {
  2074  					return nil, 0, err
  2075  				}
  2076  				req.FirstMessage = &storagepb.BidiWriteObjectRequest_WriteObjectSpec{
  2077  					WriteObjectSpec: spec,
  2078  				}
  2079  				req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(w.encryptionKey)
  2080  				// For a non-resumable upload, checksums must be sent in this message.
  2081  				// TODO: Currently the checksums are only sent on the first message
  2082  				// of the stream, but in the future, we must also support sending it
  2083  				// on the *last* message of the stream (instead of the first).
  2084  				req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
  2085  			}
  2086  		}
  2087  
  2088  		err = w.stream.Send(req)
  2089  		if err == io.EOF {
  2090  			// err was io.EOF. The client-side of a stream only gets an EOF on Send
  2091  			// when the backend closes the stream and wants to return an error
  2092  			// status.
  2093  
  2094  			// Receive from the stream Recv() until it returns a non-nil error
  2095  			// to receive the server's status as an error. We may get multiple
  2096  			// messages before the error due to buffering.
  2097  			err = nil
  2098  			for err == nil {
  2099  				_, err = w.stream.Recv()
  2100  			}
  2101  			// Drop the stream reference as a new one will need to be created if
  2102  			// we retry.
  2103  			w.stream = nil
  2104  
  2105  			// Retriable errors mean we should start over and attempt to
  2106  			// resend the entire buffer via a new stream.
  2107  			// If not retriable, falling through will return the error received.
  2108  			if shouldRetry(err) {
  2109  				// TODO: Add test case for failure modes of querying progress.
  2110  				writeOffset, err = w.determineOffset(start)
  2111  				if err != nil {
  2112  					return nil, 0, err
  2113  				}
  2114  				sent = int(writeOffset) - int(start)
  2115  
  2116  				// Continue sending requests, opening a new stream and resending
  2117  				// any bytes not yet persisted as per QueryWriteStatus
  2118  				continue sendBytes
  2119  			}
  2120  		}
  2121  		if err != nil {
  2122  			return nil, 0, err
  2123  		}
  2124  
  2125  		// Update the immediate stream's sent total and the upload offset with
  2126  		// the data sent.
  2127  		sent += len(data)
  2128  		writeOffset += int64(len(data))
  2129  
  2130  		// Not done sending data, do not attempt to commit it yet, loop around
  2131  		// and send more data.
  2132  		if recvd-sent > 0 {
  2133  			continue sendBytes
  2134  		}
  2135  
  2136  		// The buffer has been uploaded and there is still more data to be
  2137  		// uploaded, but this is not a resumable upload session. Therefore,
  2138  		// don't check persisted data.
  2139  		if !lastWriteOfEntireObject && w.chunkSize == 0 {
  2140  			return nil, writeOffset, nil
  2141  		}
  2142  
  2143  		// Done sending the data in the buffer (remainingDataFitsInSingleReq
  2144  		// should == true if we reach this code).
  2145  		// If we are done sending the whole object, close the stream and get the final
  2146  		// object. Otherwise, receive from the stream to confirm the persisted data.
  2147  		if !lastWriteOfEntireObject {
  2148  			resp, err := w.stream.Recv()
  2149  
  2150  			// Retriable errors mean we should start over and attempt to
  2151  			// resend the entire buffer via a new stream.
  2152  			// If not retriable, falling through will return the error received
  2153  			// from closing the stream.
  2154  			if shouldRetry(err) {
  2155  				writeOffset, err = w.determineOffset(start)
  2156  				if err != nil {
  2157  					return nil, 0, err
  2158  				}
  2159  				sent = int(writeOffset) - int(start)
  2160  
  2161  				// Drop the stream reference as a new one will need to be created.
  2162  				w.stream = nil
  2163  
  2164  				continue sendBytes
  2165  			}
  2166  			if err != nil {
  2167  				return nil, 0, err
  2168  			}
  2169  
  2170  			if resp.GetPersistedSize() != writeOffset {
  2171  				// Retry if not all bytes were persisted.
  2172  				writeOffset = resp.GetPersistedSize()
  2173  				sent = int(writeOffset) - int(start)
  2174  				continue sendBytes
  2175  			}
  2176  		} else {
  2177  			// If the object is done uploading, close the send stream to signal
  2178  			// to the server that we are done sending so that we can receive
  2179  			// from the stream without blocking.
  2180  			err = w.stream.CloseSend()
  2181  			if err != nil {
  2182  				// CloseSend() retries the send internally. It never returns an
  2183  				// error in the current implementation, but we check it anyway in
  2184  				// case that it does in the future.
  2185  				return nil, 0, err
  2186  			}
  2187  
  2188  			// Stream receives do not block once send is closed, but we may not
  2189  			// receive the response with the object right away; loop until we
  2190  			// receive the object or error out.
  2191  			var obj *storagepb.Object
  2192  			for obj == nil {
  2193  				resp, err := w.stream.Recv()
  2194  				if shouldRetry(err) {
  2195  					writeOffset, err = w.determineOffset(start)
  2196  					if err != nil {
  2197  						return nil, 0, err
  2198  					}
  2199  					sent = int(writeOffset) - int(start)
  2200  					w.stream = nil
  2201  					continue sendBytes
  2202  				}
  2203  				if err != nil {
  2204  					return nil, 0, err
  2205  				}
  2206  
  2207  				obj = resp.GetResource()
  2208  			}
  2209  
  2210  			// Even though we received the object response, continue reading
  2211  			// until we receive a non-nil error, to ensure the stream does not
  2212  			// leak even if the context isn't cancelled. See:
  2213  			// https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
  2214  			for err == nil {
  2215  				_, err = w.stream.Recv()
  2216  			}
  2217  
  2218  			return obj, writeOffset, nil
  2219  		}
  2220  
  2221  		return nil, writeOffset, nil
  2222  	}
  2223  }
  2224  
  2225  // determineOffset either returns the offset given to it in the case of a simple
  2226  // upload, or queries the write status in the case a resumable upload is being
  2227  // used.
  2228  func (w *gRPCWriter) determineOffset(offset int64) (int64, error) {
  2229  	// For a Resumable Upload, we must start from however much data
  2230  	// was committed.
  2231  	if w.upid != "" {
  2232  		committed, err := w.queryProgress()
  2233  		if err != nil {
  2234  			return 0, err
  2235  		}
  2236  		offset = committed
  2237  	}
  2238  	return offset, nil
  2239  }
  2240  
  2241  // writeObjectSpec constructs a WriteObjectSpec proto using the Writer's
  2242  // ObjectAttrs and applies its Conditions. This is only used for gRPC.
  2243  func (w *gRPCWriter) writeObjectSpec() (*storagepb.WriteObjectSpec, error) {
  2244  	// To avoid modifying the ObjectAttrs embeded in the calling writer, deref
  2245  	// the ObjectAttrs pointer to make a copy, then assign the desired name to
  2246  	// the attribute.
  2247  	attrs := *w.attrs
  2248  
  2249  	spec := &storagepb.WriteObjectSpec{
  2250  		Resource: attrs.toProtoObject(w.bucket),
  2251  	}
  2252  	// WriteObject doesn't support the generation condition, so use default.
  2253  	if err := applyCondsProto("WriteObject", defaultGen, w.conds, spec); err != nil {
  2254  		return nil, err
  2255  	}
  2256  	return spec, nil
  2257  }
  2258  
  2259  // read copies the data in the reader to the given buffer and reports how much
  2260  // data was read into the buffer and if there is no more data to read (EOF).
  2261  // Furthermore, if the attrs.ContentType is unset, the first bytes of content
  2262  // will be sniffed for a matching content type unless forceEmptyContentType is enabled.
  2263  func (w *gRPCWriter) read() (int, bool, error) {
  2264  	if w.attrs.ContentType == "" && !w.forceEmptyContentType {
  2265  		w.reader, w.attrs.ContentType = gax.DetermineContentType(w.reader)
  2266  	}
  2267  	// Set n to -1 to start the Read loop.
  2268  	var n, recvd int = -1, 0
  2269  	var err error
  2270  	for err == nil && n != 0 {
  2271  		// The routine blocks here until data is received.
  2272  		n, err = w.reader.Read(w.buf[recvd:])
  2273  		recvd += n
  2274  	}
  2275  	var done bool
  2276  	if err == io.EOF {
  2277  		done = true
  2278  		err = nil
  2279  	}
  2280  	return recvd, done, err
  2281  }
  2282  
  2283  func checkCanceled(err error) error {
  2284  	if status.Code(err) == codes.Canceled {
  2285  		return context.Canceled
  2286  	}
  2287  
  2288  	return err
  2289  }
  2290  

View as plain text