...

Source file src/cloud.google.com/go/storage/http_client.go

Documentation: cloud.google.com/go/storage

     1  // Copyright 2022 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package storage
    16  
    17  import (
    18  	"context"
    19  	"encoding/base64"
    20  	"errors"
    21  	"fmt"
    22  	"hash/crc32"
    23  	"io"
    24  	"io/ioutil"
    25  	"net/http"
    26  	"net/url"
    27  	"os"
    28  	"reflect"
    29  	"strconv"
    30  	"strings"
    31  	"time"
    32  
    33  	"cloud.google.com/go/iam/apiv1/iampb"
    34  	"cloud.google.com/go/internal/optional"
    35  	"cloud.google.com/go/internal/trace"
    36  	"github.com/googleapis/gax-go/v2/callctx"
    37  	"golang.org/x/oauth2/google"
    38  	"google.golang.org/api/googleapi"
    39  	"google.golang.org/api/iterator"
    40  	"google.golang.org/api/option"
    41  	"google.golang.org/api/option/internaloption"
    42  	raw "google.golang.org/api/storage/v1"
    43  	"google.golang.org/api/transport"
    44  	htransport "google.golang.org/api/transport/http"
    45  )
    46  
    47  // httpStorageClient is the HTTP-JSON API implementation of the transport-agnostic
    48  // storageClient interface.
    49  type httpStorageClient struct {
    50  	creds    *google.Credentials
    51  	hc       *http.Client
    52  	xmlHost  string
    53  	raw      *raw.Service
    54  	scheme   string
    55  	settings *settings
    56  	config   *storageConfig
    57  }
    58  
    59  // newHTTPStorageClient initializes a new storageClient that uses the HTTP-JSON
    60  // Storage API.
    61  func newHTTPStorageClient(ctx context.Context, opts ...storageOption) (storageClient, error) {
    62  	s := initSettings(opts...)
    63  	o := s.clientOption
    64  	config := newStorageConfig(o...)
    65  
    66  	var creds *google.Credentials
    67  	// In general, it is recommended to use raw.NewService instead of htransport.NewClient
    68  	// since raw.NewService configures the correct default endpoints when initializing the
    69  	// internal http client. However, in our case, "NewRangeReader" in reader.go needs to
    70  	// access the http client directly to make requests, so we create the client manually
    71  	// here so it can be re-used by both reader.go and raw.NewService. This means we need to
    72  	// manually configure the default endpoint options on the http client. Furthermore, we
    73  	// need to account for STORAGE_EMULATOR_HOST override when setting the default endpoints.
    74  	if host := os.Getenv("STORAGE_EMULATOR_HOST"); host == "" {
    75  		// Prepend default options to avoid overriding options passed by the user.
    76  		o = append([]option.ClientOption{option.WithScopes(ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform"), option.WithUserAgent(userAgent)}, o...)
    77  
    78  		o = append(o, internaloption.WithDefaultEndpointTemplate("https://storage.UNIVERSE_DOMAIN/storage/v1/"),
    79  			internaloption.WithDefaultMTLSEndpoint("https://storage.mtls.googleapis.com/storage/v1/"),
    80  			internaloption.WithDefaultUniverseDomain("googleapis.com"),
    81  		)
    82  		// Don't error out here. The user may have passed in their own HTTP
    83  		// client which does not auth with ADC or other common conventions.
    84  		c, err := transport.Creds(ctx, o...)
    85  		if err == nil {
    86  			creds = c
    87  			o = append(o, internaloption.WithCredentials(creds))
    88  		}
    89  	} else {
    90  		var hostURL *url.URL
    91  
    92  		if strings.Contains(host, "://") {
    93  			h, err := url.Parse(host)
    94  			if err != nil {
    95  				return nil, err
    96  			}
    97  			hostURL = h
    98  		} else {
    99  			// Add scheme for user if not supplied in STORAGE_EMULATOR_HOST
   100  			// URL is only parsed correctly if it has a scheme, so we build it ourselves
   101  			hostURL = &url.URL{Scheme: "http", Host: host}
   102  		}
   103  
   104  		hostURL.Path = "storage/v1/"
   105  		endpoint := hostURL.String()
   106  
   107  		// Append the emulator host as default endpoint for the user
   108  		o = append([]option.ClientOption{option.WithoutAuthentication()}, o...)
   109  
   110  		o = append(o, internaloption.WithDefaultEndpointTemplate(endpoint))
   111  		o = append(o, internaloption.WithDefaultMTLSEndpoint(endpoint))
   112  	}
   113  	s.clientOption = o
   114  
   115  	// htransport selects the correct endpoint among WithEndpoint (user override), WithDefaultEndpointTemplate, and WithDefaultMTLSEndpoint.
   116  	hc, ep, err := htransport.NewClient(ctx, s.clientOption...)
   117  	if err != nil {
   118  		return nil, fmt.Errorf("dialing: %w", err)
   119  	}
   120  	// RawService should be created with the chosen endpoint to take account of user override.
   121  	rawService, err := raw.NewService(ctx, option.WithEndpoint(ep), option.WithHTTPClient(hc))
   122  	if err != nil {
   123  		return nil, fmt.Errorf("storage client: %w", err)
   124  	}
   125  	// Update xmlHost and scheme with the chosen endpoint.
   126  	u, err := url.Parse(ep)
   127  	if err != nil {
   128  		return nil, fmt.Errorf("supplied endpoint %q is not valid: %w", ep, err)
   129  	}
   130  
   131  	return &httpStorageClient{
   132  		creds:    creds,
   133  		hc:       hc,
   134  		xmlHost:  u.Host,
   135  		raw:      rawService,
   136  		scheme:   u.Scheme,
   137  		settings: s,
   138  		config:   &config,
   139  	}, nil
   140  }
   141  
   142  func (c *httpStorageClient) Close() error {
   143  	c.hc.CloseIdleConnections()
   144  	return nil
   145  }
   146  
   147  // Top-level methods.
   148  
   149  func (c *httpStorageClient) GetServiceAccount(ctx context.Context, project string, opts ...storageOption) (string, error) {
   150  	s := callSettings(c.settings, opts...)
   151  	call := c.raw.Projects.ServiceAccount.Get(project)
   152  	var res *raw.ServiceAccount
   153  	err := run(ctx, func(ctx context.Context) error {
   154  		var err error
   155  		res, err = call.Context(ctx).Do()
   156  		return err
   157  	}, s.retry, s.idempotent)
   158  	if err != nil {
   159  		return "", err
   160  	}
   161  	return res.EmailAddress, nil
   162  }
   163  
   164  func (c *httpStorageClient) CreateBucket(ctx context.Context, project, bucket string, attrs *BucketAttrs, enableObjectRetention *bool, opts ...storageOption) (*BucketAttrs, error) {
   165  	s := callSettings(c.settings, opts...)
   166  	var bkt *raw.Bucket
   167  	if attrs != nil {
   168  		bkt = attrs.toRawBucket()
   169  	} else {
   170  		bkt = &raw.Bucket{}
   171  	}
   172  	bkt.Name = bucket
   173  	// If there is lifecycle information but no location, explicitly set
   174  	// the location. This is a GCS quirk/bug.
   175  	if bkt.Location == "" && bkt.Lifecycle != nil {
   176  		bkt.Location = "US"
   177  	}
   178  	req := c.raw.Buckets.Insert(project, bkt)
   179  	setClientHeader(req.Header())
   180  	if attrs != nil && attrs.PredefinedACL != "" {
   181  		req.PredefinedAcl(attrs.PredefinedACL)
   182  	}
   183  	if attrs != nil && attrs.PredefinedDefaultObjectACL != "" {
   184  		req.PredefinedDefaultObjectAcl(attrs.PredefinedDefaultObjectACL)
   185  	}
   186  	if enableObjectRetention != nil {
   187  		req.EnableObjectRetention(*enableObjectRetention)
   188  	}
   189  	var battrs *BucketAttrs
   190  	err := run(ctx, func(ctx context.Context) error {
   191  		b, err := req.Context(ctx).Do()
   192  		if err != nil {
   193  			return err
   194  		}
   195  		battrs, err = newBucket(b)
   196  		return err
   197  	}, s.retry, s.idempotent)
   198  	return battrs, err
   199  }
   200  
   201  func (c *httpStorageClient) ListBuckets(ctx context.Context, project string, opts ...storageOption) *BucketIterator {
   202  	s := callSettings(c.settings, opts...)
   203  	it := &BucketIterator{
   204  		ctx:       ctx,
   205  		projectID: project,
   206  	}
   207  
   208  	fetch := func(pageSize int, pageToken string) (token string, err error) {
   209  		req := c.raw.Buckets.List(it.projectID)
   210  		setClientHeader(req.Header())
   211  		req.Projection("full")
   212  		req.Prefix(it.Prefix)
   213  		req.PageToken(pageToken)
   214  		if pageSize > 0 {
   215  			req.MaxResults(int64(pageSize))
   216  		}
   217  		var resp *raw.Buckets
   218  		err = run(it.ctx, func(ctx context.Context) error {
   219  			resp, err = req.Context(ctx).Do()
   220  			return err
   221  		}, s.retry, s.idempotent)
   222  		if err != nil {
   223  			return "", err
   224  		}
   225  		for _, item := range resp.Items {
   226  			b, err := newBucket(item)
   227  			if err != nil {
   228  				return "", err
   229  			}
   230  			it.buckets = append(it.buckets, b)
   231  		}
   232  		return resp.NextPageToken, nil
   233  	}
   234  
   235  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
   236  		fetch,
   237  		func() int { return len(it.buckets) },
   238  		func() interface{} { b := it.buckets; it.buckets = nil; return b })
   239  
   240  	return it
   241  }
   242  
   243  // Bucket methods.
   244  
   245  func (c *httpStorageClient) DeleteBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
   246  	s := callSettings(c.settings, opts...)
   247  	req := c.raw.Buckets.Delete(bucket)
   248  	setClientHeader(req.Header())
   249  	if err := applyBucketConds("httpStorageClient.DeleteBucket", conds, req); err != nil {
   250  		return err
   251  	}
   252  	if s.userProject != "" {
   253  		req.UserProject(s.userProject)
   254  	}
   255  
   256  	return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
   257  }
   258  
   259  func (c *httpStorageClient) GetBucket(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
   260  	s := callSettings(c.settings, opts...)
   261  	req := c.raw.Buckets.Get(bucket).Projection("full")
   262  	setClientHeader(req.Header())
   263  	err := applyBucketConds("httpStorageClient.GetBucket", conds, req)
   264  	if err != nil {
   265  		return nil, err
   266  	}
   267  	if s.userProject != "" {
   268  		req.UserProject(s.userProject)
   269  	}
   270  
   271  	var resp *raw.Bucket
   272  	err = run(ctx, func(ctx context.Context) error {
   273  		resp, err = req.Context(ctx).Do()
   274  		return err
   275  	}, s.retry, s.idempotent)
   276  
   277  	var e *googleapi.Error
   278  	if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound {
   279  		return nil, ErrBucketNotExist
   280  	}
   281  	if err != nil {
   282  		return nil, err
   283  	}
   284  	return newBucket(resp)
   285  }
   286  func (c *httpStorageClient) UpdateBucket(ctx context.Context, bucket string, uattrs *BucketAttrsToUpdate, conds *BucketConditions, opts ...storageOption) (*BucketAttrs, error) {
   287  	s := callSettings(c.settings, opts...)
   288  	rb := uattrs.toRawBucket()
   289  	req := c.raw.Buckets.Patch(bucket, rb).Projection("full")
   290  	setClientHeader(req.Header())
   291  	err := applyBucketConds("httpStorageClient.UpdateBucket", conds, req)
   292  	if err != nil {
   293  		return nil, err
   294  	}
   295  	if s.userProject != "" {
   296  		req.UserProject(s.userProject)
   297  	}
   298  	if uattrs != nil && uattrs.PredefinedACL != "" {
   299  		req.PredefinedAcl(uattrs.PredefinedACL)
   300  	}
   301  	if uattrs != nil && uattrs.PredefinedDefaultObjectACL != "" {
   302  		req.PredefinedDefaultObjectAcl(uattrs.PredefinedDefaultObjectACL)
   303  	}
   304  
   305  	var rawBucket *raw.Bucket
   306  	err = run(ctx, func(ctx context.Context) error {
   307  		rawBucket, err = req.Context(ctx).Do()
   308  		return err
   309  	}, s.retry, s.idempotent)
   310  	if err != nil {
   311  		return nil, err
   312  	}
   313  	return newBucket(rawBucket)
   314  }
   315  
   316  func (c *httpStorageClient) LockBucketRetentionPolicy(ctx context.Context, bucket string, conds *BucketConditions, opts ...storageOption) error {
   317  	s := callSettings(c.settings, opts...)
   318  
   319  	var metageneration int64
   320  	if conds != nil {
   321  		metageneration = conds.MetagenerationMatch
   322  	}
   323  	req := c.raw.Buckets.LockRetentionPolicy(bucket, metageneration)
   324  
   325  	return run(ctx, func(ctx context.Context) error {
   326  		_, err := req.Context(ctx).Do()
   327  		return err
   328  	}, s.retry, s.idempotent)
   329  }
   330  func (c *httpStorageClient) ListObjects(ctx context.Context, bucket string, q *Query, opts ...storageOption) *ObjectIterator {
   331  	s := callSettings(c.settings, opts...)
   332  	it := &ObjectIterator{
   333  		ctx: ctx,
   334  	}
   335  	if q != nil {
   336  		it.query = *q
   337  	}
   338  	fetch := func(pageSize int, pageToken string) (string, error) {
   339  		req := c.raw.Objects.List(bucket)
   340  		if it.query.SoftDeleted {
   341  			req.SoftDeleted(it.query.SoftDeleted)
   342  		}
   343  		setClientHeader(req.Header())
   344  		projection := it.query.Projection
   345  		if projection == ProjectionDefault {
   346  			projection = ProjectionFull
   347  		}
   348  		req.Projection(projection.String())
   349  		req.Delimiter(it.query.Delimiter)
   350  		req.Prefix(it.query.Prefix)
   351  		req.StartOffset(it.query.StartOffset)
   352  		req.EndOffset(it.query.EndOffset)
   353  		req.Versions(it.query.Versions)
   354  		req.IncludeTrailingDelimiter(it.query.IncludeTrailingDelimiter)
   355  		req.MatchGlob(it.query.MatchGlob)
   356  		req.IncludeFoldersAsPrefixes(it.query.IncludeFoldersAsPrefixes)
   357  		if selection := it.query.toFieldSelection(); selection != "" {
   358  			req.Fields("nextPageToken", googleapi.Field(selection))
   359  		}
   360  		req.PageToken(pageToken)
   361  		if s.userProject != "" {
   362  			req.UserProject(s.userProject)
   363  		}
   364  		if pageSize > 0 {
   365  			req.MaxResults(int64(pageSize))
   366  		}
   367  		var resp *raw.Objects
   368  		var err error
   369  		err = run(it.ctx, func(ctx context.Context) error {
   370  			resp, err = req.Context(ctx).Do()
   371  			return err
   372  		}, s.retry, s.idempotent)
   373  		if err != nil {
   374  			var e *googleapi.Error
   375  			if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound {
   376  				err = ErrBucketNotExist
   377  			}
   378  			return "", err
   379  		}
   380  		for _, item := range resp.Items {
   381  			it.items = append(it.items, newObject(item))
   382  		}
   383  		for _, prefix := range resp.Prefixes {
   384  			it.items = append(it.items, &ObjectAttrs{Prefix: prefix})
   385  		}
   386  		return resp.NextPageToken, nil
   387  	}
   388  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
   389  		fetch,
   390  		func() int { return len(it.items) },
   391  		func() interface{} { b := it.items; it.items = nil; return b })
   392  
   393  	return it
   394  }
   395  
   396  // Object metadata methods.
   397  
   398  func (c *httpStorageClient) DeleteObject(ctx context.Context, bucket, object string, gen int64, conds *Conditions, opts ...storageOption) error {
   399  	s := callSettings(c.settings, opts...)
   400  	req := c.raw.Objects.Delete(bucket, object).Context(ctx)
   401  	if err := applyConds("Delete", gen, conds, req); err != nil {
   402  		return err
   403  	}
   404  	if s.userProject != "" {
   405  		req.UserProject(s.userProject)
   406  	}
   407  	err := run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
   408  	var e *googleapi.Error
   409  	if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound {
   410  		return ErrObjectNotExist
   411  	}
   412  	return err
   413  }
   414  
   415  func (c *httpStorageClient) GetObject(ctx context.Context, params *getObjectParams, opts ...storageOption) (*ObjectAttrs, error) {
   416  	s := callSettings(c.settings, opts...)
   417  	req := c.raw.Objects.Get(params.bucket, params.object).Projection("full").Context(ctx)
   418  	if err := applyConds("Attrs", params.gen, params.conds, req); err != nil {
   419  		return nil, err
   420  	}
   421  	if s.userProject != "" {
   422  		req.UserProject(s.userProject)
   423  	}
   424  	if err := setEncryptionHeaders(req.Header(), params.encryptionKey, false); err != nil {
   425  		return nil, err
   426  	}
   427  	if params.softDeleted {
   428  		req.SoftDeleted(params.softDeleted)
   429  	}
   430  
   431  	var obj *raw.Object
   432  	var err error
   433  	err = run(ctx, func(ctx context.Context) error {
   434  		obj, err = req.Context(ctx).Do()
   435  		return err
   436  	}, s.retry, s.idempotent)
   437  	var e *googleapi.Error
   438  	if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound {
   439  		return nil, ErrObjectNotExist
   440  	}
   441  	if err != nil {
   442  		return nil, err
   443  	}
   444  	return newObject(obj), nil
   445  }
   446  
   447  func (c *httpStorageClient) UpdateObject(ctx context.Context, params *updateObjectParams, opts ...storageOption) (*ObjectAttrs, error) {
   448  	uattrs := params.uattrs
   449  	s := callSettings(c.settings, opts...)
   450  
   451  	var attrs ObjectAttrs
   452  	// Lists of fields to send, and set to null, in the JSON.
   453  	var forceSendFields, nullFields []string
   454  	if uattrs.ContentType != nil {
   455  		attrs.ContentType = optional.ToString(uattrs.ContentType)
   456  		// For ContentType, sending the empty string is a no-op.
   457  		// Instead we send a null.
   458  		if attrs.ContentType == "" {
   459  			nullFields = append(nullFields, "ContentType")
   460  		} else {
   461  			forceSendFields = append(forceSendFields, "ContentType")
   462  		}
   463  	}
   464  	if uattrs.ContentLanguage != nil {
   465  		attrs.ContentLanguage = optional.ToString(uattrs.ContentLanguage)
   466  		// For ContentLanguage it's an error to send the empty string.
   467  		// Instead we send a null.
   468  		if attrs.ContentLanguage == "" {
   469  			nullFields = append(nullFields, "ContentLanguage")
   470  		} else {
   471  			forceSendFields = append(forceSendFields, "ContentLanguage")
   472  		}
   473  	}
   474  	if uattrs.ContentEncoding != nil {
   475  		attrs.ContentEncoding = optional.ToString(uattrs.ContentEncoding)
   476  		forceSendFields = append(forceSendFields, "ContentEncoding")
   477  	}
   478  	if uattrs.ContentDisposition != nil {
   479  		attrs.ContentDisposition = optional.ToString(uattrs.ContentDisposition)
   480  		forceSendFields = append(forceSendFields, "ContentDisposition")
   481  	}
   482  	if uattrs.CacheControl != nil {
   483  		attrs.CacheControl = optional.ToString(uattrs.CacheControl)
   484  		forceSendFields = append(forceSendFields, "CacheControl")
   485  	}
   486  	if uattrs.EventBasedHold != nil {
   487  		attrs.EventBasedHold = optional.ToBool(uattrs.EventBasedHold)
   488  		forceSendFields = append(forceSendFields, "EventBasedHold")
   489  	}
   490  	if uattrs.TemporaryHold != nil {
   491  		attrs.TemporaryHold = optional.ToBool(uattrs.TemporaryHold)
   492  		forceSendFields = append(forceSendFields, "TemporaryHold")
   493  	}
   494  	if !uattrs.CustomTime.IsZero() {
   495  		attrs.CustomTime = uattrs.CustomTime
   496  		forceSendFields = append(forceSendFields, "CustomTime")
   497  	}
   498  	if uattrs.Metadata != nil {
   499  		attrs.Metadata = uattrs.Metadata
   500  		if len(attrs.Metadata) == 0 {
   501  			// Sending the empty map is a no-op. We send null instead.
   502  			nullFields = append(nullFields, "Metadata")
   503  		} else {
   504  			forceSendFields = append(forceSendFields, "Metadata")
   505  		}
   506  	}
   507  	if uattrs.ACL != nil {
   508  		attrs.ACL = uattrs.ACL
   509  		// It's an error to attempt to delete the ACL, so
   510  		// we don't append to nullFields here.
   511  		forceSendFields = append(forceSendFields, "Acl")
   512  	}
   513  	if uattrs.Retention != nil {
   514  		// For ObjectRetention it's an error to send empty fields.
   515  		// Instead we send a null as the user's intention is to remove.
   516  		if uattrs.Retention.Mode == "" && uattrs.Retention.RetainUntil.IsZero() {
   517  			nullFields = append(nullFields, "Retention")
   518  		} else {
   519  			attrs.Retention = uattrs.Retention
   520  			forceSendFields = append(forceSendFields, "Retention")
   521  		}
   522  	}
   523  	rawObj := attrs.toRawObject(params.bucket)
   524  	rawObj.ForceSendFields = forceSendFields
   525  	rawObj.NullFields = nullFields
   526  	call := c.raw.Objects.Patch(params.bucket, params.object, rawObj).Projection("full")
   527  	if err := applyConds("Update", params.gen, params.conds, call); err != nil {
   528  		return nil, err
   529  	}
   530  	if s.userProject != "" {
   531  		call.UserProject(s.userProject)
   532  	}
   533  	if uattrs.PredefinedACL != "" {
   534  		call.PredefinedAcl(uattrs.PredefinedACL)
   535  	}
   536  	if err := setEncryptionHeaders(call.Header(), params.encryptionKey, false); err != nil {
   537  		return nil, err
   538  	}
   539  
   540  	if params.overrideRetention != nil {
   541  		call.OverrideUnlockedRetention(*params.overrideRetention)
   542  	}
   543  
   544  	var obj *raw.Object
   545  	var err error
   546  	err = run(ctx, func(ctx context.Context) error { obj, err = call.Context(ctx).Do(); return err }, s.retry, s.idempotent)
   547  	var e *googleapi.Error
   548  	if errors.As(err, &e) && e.Code == http.StatusNotFound {
   549  		return nil, ErrObjectNotExist
   550  	}
   551  	if err != nil {
   552  		return nil, err
   553  	}
   554  	return newObject(obj), nil
   555  }
   556  
   557  func (c *httpStorageClient) RestoreObject(ctx context.Context, params *restoreObjectParams, opts ...storageOption) (*ObjectAttrs, error) {
   558  	s := callSettings(c.settings, opts...)
   559  	req := c.raw.Objects.Restore(params.bucket, params.object, params.gen).Context(ctx)
   560  	// Do not set the generation here since it's not an optional condition; it gets set above.
   561  	if err := applyConds("RestoreObject", defaultGen, params.conds, req); err != nil {
   562  		return nil, err
   563  	}
   564  	if s.userProject != "" {
   565  		req.UserProject(s.userProject)
   566  	}
   567  	if params.copySourceACL {
   568  		req.CopySourceAcl(params.copySourceACL)
   569  	}
   570  	if err := setEncryptionHeaders(req.Header(), params.encryptionKey, false); err != nil {
   571  		return nil, err
   572  	}
   573  
   574  	var obj *raw.Object
   575  	var err error
   576  	err = run(ctx, func(ctx context.Context) error { obj, err = req.Context(ctx).Do(); return err }, s.retry, s.idempotent)
   577  	var e *googleapi.Error
   578  	if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound {
   579  		return nil, ErrObjectNotExist
   580  	}
   581  	return newObject(obj), err
   582  }
   583  
   584  // Default Object ACL methods.
   585  
   586  func (c *httpStorageClient) DeleteDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
   587  	s := callSettings(c.settings, opts...)
   588  	req := c.raw.DefaultObjectAccessControls.Delete(bucket, string(entity))
   589  	configureACLCall(ctx, s.userProject, req)
   590  	return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
   591  }
   592  
   593  func (c *httpStorageClient) ListDefaultObjectACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
   594  	s := callSettings(c.settings, opts...)
   595  	var acls *raw.ObjectAccessControls
   596  	var err error
   597  	req := c.raw.DefaultObjectAccessControls.List(bucket)
   598  	configureACLCall(ctx, s.userProject, req)
   599  	err = run(ctx, func(ctx context.Context) error {
   600  		acls, err = req.Context(ctx).Do()
   601  		return err
   602  	}, s.retry, true)
   603  	if err != nil {
   604  		return nil, err
   605  	}
   606  	return toObjectACLRules(acls.Items), nil
   607  }
   608  func (c *httpStorageClient) UpdateDefaultObjectACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
   609  	s := callSettings(c.settings, opts...)
   610  	type setRequest interface {
   611  		Do(opts ...googleapi.CallOption) (*raw.ObjectAccessControl, error)
   612  		Header() http.Header
   613  	}
   614  	acl := &raw.ObjectAccessControl{
   615  		Bucket: bucket,
   616  		Entity: string(entity),
   617  		Role:   string(role),
   618  	}
   619  	var err error
   620  	req := c.raw.DefaultObjectAccessControls.Update(bucket, string(entity), acl)
   621  	configureACLCall(ctx, s.userProject, req)
   622  	return run(ctx, func(ctx context.Context) error {
   623  		_, err = req.Context(ctx).Do()
   624  		return err
   625  	}, s.retry, s.idempotent)
   626  }
   627  
   628  // Bucket ACL methods.
   629  
   630  func (c *httpStorageClient) DeleteBucketACL(ctx context.Context, bucket string, entity ACLEntity, opts ...storageOption) error {
   631  	s := callSettings(c.settings, opts...)
   632  	req := c.raw.BucketAccessControls.Delete(bucket, string(entity))
   633  	configureACLCall(ctx, s.userProject, req)
   634  	return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
   635  }
   636  
   637  func (c *httpStorageClient) ListBucketACLs(ctx context.Context, bucket string, opts ...storageOption) ([]ACLRule, error) {
   638  	s := callSettings(c.settings, opts...)
   639  	var acls *raw.BucketAccessControls
   640  	var err error
   641  	req := c.raw.BucketAccessControls.List(bucket)
   642  	configureACLCall(ctx, s.userProject, req)
   643  	err = run(ctx, func(ctx context.Context) error {
   644  		acls, err = req.Context(ctx).Do()
   645  		return err
   646  	}, s.retry, true)
   647  	if err != nil {
   648  		return nil, err
   649  	}
   650  	return toBucketACLRules(acls.Items), nil
   651  }
   652  
   653  func (c *httpStorageClient) UpdateBucketACL(ctx context.Context, bucket string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
   654  	s := callSettings(c.settings, opts...)
   655  	acl := &raw.BucketAccessControl{
   656  		Bucket: bucket,
   657  		Entity: string(entity),
   658  		Role:   string(role),
   659  	}
   660  	req := c.raw.BucketAccessControls.Update(bucket, string(entity), acl)
   661  	configureACLCall(ctx, s.userProject, req)
   662  	var err error
   663  	return run(ctx, func(ctx context.Context) error {
   664  		_, err = req.Context(ctx).Do()
   665  		return err
   666  	}, s.retry, s.idempotent)
   667  }
   668  
   669  // configureACLCall sets the context, user project and headers on the apiary library call.
   670  // This will panic if the call does not have the correct methods.
   671  func configureACLCall(ctx context.Context, userProject string, call interface{ Header() http.Header }) {
   672  	vc := reflect.ValueOf(call)
   673  	vc.MethodByName("Context").Call([]reflect.Value{reflect.ValueOf(ctx)})
   674  	if userProject != "" {
   675  		vc.MethodByName("UserProject").Call([]reflect.Value{reflect.ValueOf(userProject)})
   676  	}
   677  	setClientHeader(call.Header())
   678  }
   679  
   680  // Object ACL methods.
   681  
   682  func (c *httpStorageClient) DeleteObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, opts ...storageOption) error {
   683  	s := callSettings(c.settings, opts...)
   684  	req := c.raw.ObjectAccessControls.Delete(bucket, object, string(entity))
   685  	configureACLCall(ctx, s.userProject, req)
   686  	return run(ctx, func(ctx context.Context) error { return req.Context(ctx).Do() }, s.retry, s.idempotent)
   687  }
   688  
   689  // ListObjectACLs retrieves object ACL entries. By default, it operates on the latest generation of this object.
   690  // Selecting a specific generation of this object is not currently supported by the client.
   691  func (c *httpStorageClient) ListObjectACLs(ctx context.Context, bucket, object string, opts ...storageOption) ([]ACLRule, error) {
   692  	s := callSettings(c.settings, opts...)
   693  	var acls *raw.ObjectAccessControls
   694  	var err error
   695  	req := c.raw.ObjectAccessControls.List(bucket, object)
   696  	configureACLCall(ctx, s.userProject, req)
   697  	err = run(ctx, func(ctx context.Context) error {
   698  		acls, err = req.Context(ctx).Do()
   699  		return err
   700  	}, s.retry, s.idempotent)
   701  	if err != nil {
   702  		return nil, err
   703  	}
   704  	return toObjectACLRules(acls.Items), nil
   705  }
   706  
   707  func (c *httpStorageClient) UpdateObjectACL(ctx context.Context, bucket, object string, entity ACLEntity, role ACLRole, opts ...storageOption) error {
   708  	s := callSettings(c.settings, opts...)
   709  	type setRequest interface {
   710  		Do(opts ...googleapi.CallOption) (*raw.ObjectAccessControl, error)
   711  		Header() http.Header
   712  	}
   713  
   714  	acl := &raw.ObjectAccessControl{
   715  		Bucket: bucket,
   716  		Entity: string(entity),
   717  		Role:   string(role),
   718  	}
   719  	var err error
   720  	req := c.raw.ObjectAccessControls.Update(bucket, object, string(entity), acl)
   721  	configureACLCall(ctx, s.userProject, req)
   722  	return run(ctx, func(ctx context.Context) error {
   723  		_, err = req.Context(ctx).Do()
   724  		return err
   725  	}, s.retry, s.idempotent)
   726  }
   727  
   728  // Media operations.
   729  
   730  func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjectRequest, opts ...storageOption) (*ObjectAttrs, error) {
   731  	s := callSettings(c.settings, opts...)
   732  	rawReq := &raw.ComposeRequest{}
   733  	// Compose requires a non-empty Destination, so we always set it,
   734  	// even if the caller-provided ObjectAttrs is the zero value.
   735  	rawReq.Destination = req.dstObject.attrs.toRawObject(req.dstBucket)
   736  	if req.sendCRC32C {
   737  		rawReq.Destination.Crc32c = encodeUint32(req.dstObject.attrs.CRC32C)
   738  	}
   739  	for _, src := range req.srcs {
   740  		srcObj := &raw.ComposeRequestSourceObjects{
   741  			Name: src.name,
   742  		}
   743  		if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil {
   744  			return nil, err
   745  		}
   746  		rawReq.SourceObjects = append(rawReq.SourceObjects, srcObj)
   747  	}
   748  
   749  	call := c.raw.Objects.Compose(req.dstBucket, req.dstObject.name, rawReq)
   750  	if err := applyConds("ComposeFrom destination", defaultGen, req.dstObject.conds, call); err != nil {
   751  		return nil, err
   752  	}
   753  	if s.userProject != "" {
   754  		call.UserProject(s.userProject)
   755  	}
   756  	if req.predefinedACL != "" {
   757  		call.DestinationPredefinedAcl(req.predefinedACL)
   758  	}
   759  	if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil {
   760  		return nil, err
   761  	}
   762  	var obj *raw.Object
   763  	setClientHeader(call.Header())
   764  
   765  	var err error
   766  	retryCall := func(ctx context.Context) error { obj, err = call.Context(ctx).Do(); return err }
   767  
   768  	if err := run(ctx, retryCall, s.retry, s.idempotent); err != nil {
   769  		return nil, err
   770  	}
   771  	return newObject(obj), nil
   772  }
   773  func (c *httpStorageClient) RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error) {
   774  	s := callSettings(c.settings, opts...)
   775  	rawObject := req.dstObject.attrs.toRawObject("")
   776  	call := c.raw.Objects.Rewrite(req.srcObject.bucket, req.srcObject.name, req.dstObject.bucket, req.dstObject.name, rawObject)
   777  
   778  	call.Projection("full")
   779  	if req.token != "" {
   780  		call.RewriteToken(req.token)
   781  	}
   782  	if req.dstObject.keyName != "" {
   783  		call.DestinationKmsKeyName(req.dstObject.keyName)
   784  	}
   785  	if req.predefinedACL != "" {
   786  		call.DestinationPredefinedAcl(req.predefinedACL)
   787  	}
   788  	if err := applyConds("Copy destination", defaultGen, req.dstObject.conds, call); err != nil {
   789  		return nil, err
   790  	}
   791  	if err := applySourceConds(req.srcObject.gen, req.srcObject.conds, call); err != nil {
   792  		return nil, err
   793  	}
   794  	if s.userProject != "" {
   795  		call.UserProject(s.userProject)
   796  	}
   797  	// Set destination encryption headers.
   798  	if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil {
   799  		return nil, err
   800  	}
   801  	// Set source encryption headers.
   802  	if err := setEncryptionHeaders(call.Header(), req.srcObject.encryptionKey, true); err != nil {
   803  		return nil, err
   804  	}
   805  
   806  	if req.maxBytesRewrittenPerCall != 0 {
   807  		call.MaxBytesRewrittenPerCall(req.maxBytesRewrittenPerCall)
   808  	}
   809  
   810  	var res *raw.RewriteResponse
   811  	var err error
   812  	setClientHeader(call.Header())
   813  
   814  	retryCall := func(ctx context.Context) error { res, err = call.Context(ctx).Do(); return err }
   815  
   816  	if err := run(ctx, retryCall, s.retry, s.idempotent); err != nil {
   817  		return nil, err
   818  	}
   819  
   820  	r := &rewriteObjectResponse{
   821  		done:     res.Done,
   822  		written:  res.TotalBytesRewritten,
   823  		size:     res.ObjectSize,
   824  		token:    res.RewriteToken,
   825  		resource: newObject(res.Resource),
   826  	}
   827  
   828  	return r, nil
   829  }
   830  
   831  func (c *httpStorageClient) NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (r *Reader, err error) {
   832  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.NewRangeReader")
   833  	defer func() { trace.EndSpan(ctx, err) }()
   834  
   835  	s := callSettings(c.settings, opts...)
   836  
   837  	if c.config.useJSONforReads {
   838  		return c.newRangeReaderJSON(ctx, params, s)
   839  	}
   840  	return c.newRangeReaderXML(ctx, params, s)
   841  }
   842  
   843  func (c *httpStorageClient) newRangeReaderXML(ctx context.Context, params *newRangeReaderParams, s *settings) (r *Reader, err error) {
   844  	u := &url.URL{
   845  		Scheme:  c.scheme,
   846  		Host:    c.xmlHost,
   847  		Path:    fmt.Sprintf("/%s/%s", params.bucket, params.object),
   848  		RawPath: fmt.Sprintf("/%s/%s", params.bucket, url.PathEscape(params.object)),
   849  	}
   850  	verb := "GET"
   851  	if params.length == 0 {
   852  		verb = "HEAD"
   853  	}
   854  	req, err := http.NewRequest(verb, u.String(), nil)
   855  	if err != nil {
   856  		return nil, err
   857  	}
   858  
   859  	if s.userProject != "" {
   860  		req.Header.Set("X-Goog-User-Project", s.userProject)
   861  	}
   862  
   863  	if err := setRangeReaderHeaders(req.Header, params); err != nil {
   864  		return nil, err
   865  	}
   866  
   867  	// Set custom headers passed in via the context. This is only required for XML;
   868  	// for gRPC & JSON this is handled in the GAPIC and Apiary layers respectively.
   869  	ctxHeaders := callctx.HeadersFromContext(ctx)
   870  	for k, vals := range ctxHeaders {
   871  		for _, v := range vals {
   872  			req.Header.Add(k, v)
   873  		}
   874  	}
   875  
   876  	reopen := readerReopen(ctx, req.Header, params, s,
   877  		func(ctx context.Context) (*http.Response, error) { return c.hc.Do(req.WithContext(ctx)) },
   878  		func() error { return setConditionsHeaders(req.Header, params.conds) },
   879  		func() { req.URL.RawQuery = fmt.Sprintf("generation=%d", params.gen) })
   880  
   881  	res, err := reopen(0)
   882  	if err != nil {
   883  		return nil, err
   884  	}
   885  	return parseReadResponse(res, params, reopen)
   886  }
   887  
   888  func (c *httpStorageClient) newRangeReaderJSON(ctx context.Context, params *newRangeReaderParams, s *settings) (r *Reader, err error) {
   889  	call := c.raw.Objects.Get(params.bucket, params.object)
   890  
   891  	setClientHeader(call.Header())
   892  	call.Projection("full")
   893  
   894  	if s.userProject != "" {
   895  		call.UserProject(s.userProject)
   896  	}
   897  
   898  	if err := setRangeReaderHeaders(call.Header(), params); err != nil {
   899  		return nil, err
   900  	}
   901  
   902  	reopen := readerReopen(ctx, call.Header(), params, s, func(ctx context.Context) (*http.Response, error) { return call.Context(ctx).Download() },
   903  		func() error { return applyConds("NewReader", params.gen, params.conds, call) },
   904  		func() { call.Generation(params.gen) })
   905  
   906  	res, err := reopen(0)
   907  	if err != nil {
   908  		return nil, err
   909  	}
   910  	return parseReadResponse(res, params, reopen)
   911  }
   912  
   913  func (c *httpStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error) {
   914  	s := callSettings(c.settings, opts...)
   915  	errorf := params.setError
   916  	setObj := params.setObj
   917  	progress := params.progress
   918  	attrs := params.attrs
   919  
   920  	mediaOpts := []googleapi.MediaOption{
   921  		googleapi.ChunkSize(params.chunkSize),
   922  	}
   923  	if c := attrs.ContentType; c != "" || params.forceEmptyContentType {
   924  		mediaOpts = append(mediaOpts, googleapi.ContentType(c))
   925  	}
   926  	if params.chunkRetryDeadline != 0 {
   927  		mediaOpts = append(mediaOpts, googleapi.ChunkRetryDeadline(params.chunkRetryDeadline))
   928  	}
   929  
   930  	pr, pw := io.Pipe()
   931  
   932  	go func() {
   933  		defer close(params.donec)
   934  
   935  		rawObj := attrs.toRawObject(params.bucket)
   936  		if params.sendCRC32C {
   937  			rawObj.Crc32c = encodeUint32(attrs.CRC32C)
   938  		}
   939  		if attrs.MD5 != nil {
   940  			rawObj.Md5Hash = base64.StdEncoding.EncodeToString(attrs.MD5)
   941  		}
   942  		call := c.raw.Objects.Insert(params.bucket, rawObj).
   943  			Media(pr, mediaOpts...).
   944  			Projection("full").
   945  			Context(params.ctx).
   946  			Name(params.attrs.Name)
   947  		call.ProgressUpdater(func(n, _ int64) { progress(n) })
   948  
   949  		if attrs.KMSKeyName != "" {
   950  			call.KmsKeyName(attrs.KMSKeyName)
   951  		}
   952  		if attrs.PredefinedACL != "" {
   953  			call.PredefinedAcl(attrs.PredefinedACL)
   954  		}
   955  		if err := setEncryptionHeaders(call.Header(), params.encryptionKey, false); err != nil {
   956  			errorf(err)
   957  			pr.CloseWithError(err)
   958  			return
   959  		}
   960  		var resp *raw.Object
   961  		err := applyConds("NewWriter", defaultGen, params.conds, call)
   962  		if err == nil {
   963  			if s.userProject != "" {
   964  				call.UserProject(s.userProject)
   965  			}
   966  			// TODO(tritone): Remove this code when Uploads begin to support
   967  			// retry attempt header injection with "client header" injection.
   968  			setClientHeader(call.Header())
   969  
   970  			// The internals that perform call.Do automatically retry both the initial
   971  			// call to set up the upload as well as calls to upload individual chunks
   972  			// for a resumable upload (as long as the chunk size is non-zero). Hence
   973  			// there is no need to add retries here.
   974  
   975  			// Retry only when the operation is idempotent or the retry policy is RetryAlways.
   976  			var useRetry bool
   977  			if (s.retry == nil || s.retry.policy == RetryIdempotent) && s.idempotent {
   978  				useRetry = true
   979  			} else if s.retry != nil && s.retry.policy == RetryAlways {
   980  				useRetry = true
   981  			}
   982  			if useRetry {
   983  				if s.retry != nil {
   984  					call.WithRetry(s.retry.backoff, s.retry.shouldRetry)
   985  				} else {
   986  					call.WithRetry(nil, nil)
   987  				}
   988  			}
   989  			resp, err = call.Do()
   990  		}
   991  		if err != nil {
   992  			errorf(err)
   993  			pr.CloseWithError(err)
   994  			return
   995  		}
   996  		setObj(newObject(resp))
   997  	}()
   998  
   999  	return pw, nil
  1000  }
  1001  
  1002  // IAM methods.
  1003  
  1004  func (c *httpStorageClient) GetIamPolicy(ctx context.Context, resource string, version int32, opts ...storageOption) (*iampb.Policy, error) {
  1005  	s := callSettings(c.settings, opts...)
  1006  	call := c.raw.Buckets.GetIamPolicy(resource).OptionsRequestedPolicyVersion(int64(version))
  1007  	setClientHeader(call.Header())
  1008  	if s.userProject != "" {
  1009  		call.UserProject(s.userProject)
  1010  	}
  1011  	var rp *raw.Policy
  1012  	err := run(ctx, func(ctx context.Context) error {
  1013  		var err error
  1014  		rp, err = call.Context(ctx).Do()
  1015  		return err
  1016  	}, s.retry, s.idempotent)
  1017  	if err != nil {
  1018  		return nil, err
  1019  	}
  1020  	return iamFromStoragePolicy(rp), nil
  1021  }
  1022  
  1023  func (c *httpStorageClient) SetIamPolicy(ctx context.Context, resource string, policy *iampb.Policy, opts ...storageOption) error {
  1024  	s := callSettings(c.settings, opts...)
  1025  
  1026  	rp := iamToStoragePolicy(policy)
  1027  	call := c.raw.Buckets.SetIamPolicy(resource, rp)
  1028  	setClientHeader(call.Header())
  1029  	if s.userProject != "" {
  1030  		call.UserProject(s.userProject)
  1031  	}
  1032  
  1033  	return run(ctx, func(ctx context.Context) error {
  1034  		_, err := call.Context(ctx).Do()
  1035  		return err
  1036  	}, s.retry, s.idempotent)
  1037  }
  1038  
  1039  func (c *httpStorageClient) TestIamPermissions(ctx context.Context, resource string, permissions []string, opts ...storageOption) ([]string, error) {
  1040  	s := callSettings(c.settings, opts...)
  1041  	call := c.raw.Buckets.TestIamPermissions(resource, permissions)
  1042  	setClientHeader(call.Header())
  1043  	if s.userProject != "" {
  1044  		call.UserProject(s.userProject)
  1045  	}
  1046  	var res *raw.TestIamPermissionsResponse
  1047  	err := run(ctx, func(ctx context.Context) error {
  1048  		var err error
  1049  		res, err = call.Context(ctx).Do()
  1050  		return err
  1051  	}, s.retry, s.idempotent)
  1052  	if err != nil {
  1053  		return nil, err
  1054  	}
  1055  	return res.Permissions, nil
  1056  }
  1057  
  1058  // HMAC Key methods.
  1059  
  1060  func (c *httpStorageClient) GetHMACKey(ctx context.Context, project, accessID string, opts ...storageOption) (*HMACKey, error) {
  1061  	s := callSettings(c.settings, opts...)
  1062  	call := c.raw.Projects.HmacKeys.Get(project, accessID)
  1063  	if s.userProject != "" {
  1064  		call = call.UserProject(s.userProject)
  1065  	}
  1066  
  1067  	var metadata *raw.HmacKeyMetadata
  1068  	var err error
  1069  	if err := run(ctx, func(ctx context.Context) error {
  1070  		metadata, err = call.Context(ctx).Do()
  1071  		return err
  1072  	}, s.retry, s.idempotent); err != nil {
  1073  		return nil, err
  1074  	}
  1075  	hk := &raw.HmacKey{
  1076  		Metadata: metadata,
  1077  	}
  1078  	return toHMACKeyFromRaw(hk, false)
  1079  }
  1080  
  1081  func (c *httpStorageClient) ListHMACKeys(ctx context.Context, project, serviceAccountEmail string, showDeletedKeys bool, opts ...storageOption) *HMACKeysIterator {
  1082  	s := callSettings(c.settings, opts...)
  1083  	it := &HMACKeysIterator{
  1084  		ctx:       ctx,
  1085  		raw:       c.raw.Projects.HmacKeys,
  1086  		projectID: project,
  1087  		retry:     s.retry,
  1088  	}
  1089  	fetch := func(pageSize int, pageToken string) (token string, err error) {
  1090  		call := c.raw.Projects.HmacKeys.List(project)
  1091  		setClientHeader(call.Header())
  1092  		if pageToken != "" {
  1093  			call = call.PageToken(pageToken)
  1094  		}
  1095  		if pageSize > 0 {
  1096  			call = call.MaxResults(int64(pageSize))
  1097  		}
  1098  		if showDeletedKeys {
  1099  			call = call.ShowDeletedKeys(true)
  1100  		}
  1101  		if s.userProject != "" {
  1102  			call = call.UserProject(s.userProject)
  1103  		}
  1104  		if serviceAccountEmail != "" {
  1105  			call = call.ServiceAccountEmail(serviceAccountEmail)
  1106  		}
  1107  
  1108  		var resp *raw.HmacKeysMetadata
  1109  		err = run(it.ctx, func(ctx context.Context) error {
  1110  			resp, err = call.Context(ctx).Do()
  1111  			return err
  1112  		}, s.retry, s.idempotent)
  1113  		if err != nil {
  1114  			return "", err
  1115  		}
  1116  
  1117  		for _, metadata := range resp.Items {
  1118  			hk := &raw.HmacKey{
  1119  				Metadata: metadata,
  1120  			}
  1121  			hkey, err := toHMACKeyFromRaw(hk, true)
  1122  			if err != nil {
  1123  				return "", err
  1124  			}
  1125  			it.hmacKeys = append(it.hmacKeys, hkey)
  1126  		}
  1127  		return resp.NextPageToken, nil
  1128  	}
  1129  
  1130  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
  1131  		fetch,
  1132  		func() int { return len(it.hmacKeys) - it.index },
  1133  		func() interface{} {
  1134  			prev := it.hmacKeys
  1135  			it.hmacKeys = it.hmacKeys[:0]
  1136  			it.index = 0
  1137  			return prev
  1138  		})
  1139  	return it
  1140  }
  1141  
  1142  func (c *httpStorageClient) UpdateHMACKey(ctx context.Context, project, serviceAccountEmail, accessID string, attrs *HMACKeyAttrsToUpdate, opts ...storageOption) (*HMACKey, error) {
  1143  	s := callSettings(c.settings, opts...)
  1144  	call := c.raw.Projects.HmacKeys.Update(project, accessID, &raw.HmacKeyMetadata{
  1145  		Etag:  attrs.Etag,
  1146  		State: string(attrs.State),
  1147  	})
  1148  	if s.userProject != "" {
  1149  		call = call.UserProject(s.userProject)
  1150  	}
  1151  
  1152  	var metadata *raw.HmacKeyMetadata
  1153  	var err error
  1154  	if err := run(ctx, func(ctx context.Context) error {
  1155  		metadata, err = call.Context(ctx).Do()
  1156  		return err
  1157  	}, s.retry, s.idempotent); err != nil {
  1158  		return nil, err
  1159  	}
  1160  	hk := &raw.HmacKey{
  1161  		Metadata: metadata,
  1162  	}
  1163  	return toHMACKeyFromRaw(hk, false)
  1164  }
  1165  
  1166  func (c *httpStorageClient) CreateHMACKey(ctx context.Context, project, serviceAccountEmail string, opts ...storageOption) (*HMACKey, error) {
  1167  	s := callSettings(c.settings, opts...)
  1168  	call := c.raw.Projects.HmacKeys.Create(project, serviceAccountEmail)
  1169  	if s.userProject != "" {
  1170  		call = call.UserProject(s.userProject)
  1171  	}
  1172  
  1173  	var hk *raw.HmacKey
  1174  	if err := run(ctx, func(ctx context.Context) error {
  1175  		h, err := call.Context(ctx).Do()
  1176  		hk = h
  1177  		return err
  1178  	}, s.retry, s.idempotent); err != nil {
  1179  		return nil, err
  1180  	}
  1181  	return toHMACKeyFromRaw(hk, true)
  1182  }
  1183  
  1184  func (c *httpStorageClient) DeleteHMACKey(ctx context.Context, project string, accessID string, opts ...storageOption) error {
  1185  	s := callSettings(c.settings, opts...)
  1186  	call := c.raw.Projects.HmacKeys.Delete(project, accessID)
  1187  	if s.userProject != "" {
  1188  		call = call.UserProject(s.userProject)
  1189  	}
  1190  	return run(ctx, func(ctx context.Context) error {
  1191  		return call.Context(ctx).Do()
  1192  	}, s.retry, s.idempotent)
  1193  }
  1194  
  1195  // Notification methods.
  1196  
  1197  // ListNotifications returns all the Notifications configured for this bucket, as a map indexed by notification ID.
  1198  //
  1199  // Note: This API does not support pagination. However, entity limits cap the number of notifications on a single bucket,
  1200  // so all results will be returned in the first response. See https://cloud.google.com/storage/quotas#buckets.
  1201  func (c *httpStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) {
  1202  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.ListNotifications")
  1203  	defer func() { trace.EndSpan(ctx, err) }()
  1204  
  1205  	s := callSettings(c.settings, opts...)
  1206  	call := c.raw.Notifications.List(bucket)
  1207  	if s.userProject != "" {
  1208  		call.UserProject(s.userProject)
  1209  	}
  1210  	var res *raw.Notifications
  1211  	err = run(ctx, func(ctx context.Context) error {
  1212  		res, err = call.Context(ctx).Do()
  1213  		return err
  1214  	}, s.retry, true)
  1215  	if err != nil {
  1216  		return nil, err
  1217  	}
  1218  	return notificationsToMap(res.Items), nil
  1219  }
  1220  
  1221  func (c *httpStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) {
  1222  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.CreateNotification")
  1223  	defer func() { trace.EndSpan(ctx, err) }()
  1224  
  1225  	s := callSettings(c.settings, opts...)
  1226  	call := c.raw.Notifications.Insert(bucket, toRawNotification(n))
  1227  	if s.userProject != "" {
  1228  		call.UserProject(s.userProject)
  1229  	}
  1230  	var rn *raw.Notification
  1231  	err = run(ctx, func(ctx context.Context) error {
  1232  		rn, err = call.Context(ctx).Do()
  1233  		return err
  1234  	}, s.retry, s.idempotent)
  1235  	if err != nil {
  1236  		return nil, err
  1237  	}
  1238  	return toNotification(rn), nil
  1239  }
  1240  
  1241  func (c *httpStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) {
  1242  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.DeleteNotification")
  1243  	defer func() { trace.EndSpan(ctx, err) }()
  1244  
  1245  	s := callSettings(c.settings, opts...)
  1246  	call := c.raw.Notifications.Delete(bucket, id)
  1247  	if s.userProject != "" {
  1248  		call.UserProject(s.userProject)
  1249  	}
  1250  	return run(ctx, func(ctx context.Context) error {
  1251  		return call.Context(ctx).Do()
  1252  	}, s.retry, s.idempotent)
  1253  }
  1254  
  1255  type httpReader struct {
  1256  	body     io.ReadCloser
  1257  	seen     int64
  1258  	reopen   func(seen int64) (*http.Response, error)
  1259  	checkCRC bool   // should we check the CRC?
  1260  	wantCRC  uint32 // the CRC32c value the server sent in the header
  1261  	gotCRC   uint32 // running crc
  1262  }
  1263  
  1264  func (r *httpReader) Read(p []byte) (int, error) {
  1265  	n := 0
  1266  	for len(p[n:]) > 0 {
  1267  		m, err := r.body.Read(p[n:])
  1268  		n += m
  1269  		r.seen += int64(m)
  1270  		if r.checkCRC {
  1271  			r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, p[:n])
  1272  		}
  1273  		if err == nil {
  1274  			return n, nil
  1275  		}
  1276  		if err == io.EOF {
  1277  			// Check CRC here. It would be natural to check it in Close, but
  1278  			// everybody defers Close on the assumption that it doesn't return
  1279  			// anything worth looking at.
  1280  			if r.checkCRC {
  1281  				if r.gotCRC != r.wantCRC {
  1282  					return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
  1283  						r.gotCRC, r.wantCRC)
  1284  				}
  1285  			}
  1286  			return n, err
  1287  		}
  1288  		// Read failed (likely due to connection issues), but we will try to reopen
  1289  		// the pipe and continue. Send a ranged read request that takes into account
  1290  		// the number of bytes we've already seen.
  1291  		res, err := r.reopen(r.seen)
  1292  		if err != nil {
  1293  			// reopen already retries
  1294  			return n, err
  1295  		}
  1296  		r.body.Close()
  1297  		r.body = res.Body
  1298  	}
  1299  	return n, nil
  1300  }
  1301  
  1302  func (r *httpReader) Close() error {
  1303  	return r.body.Close()
  1304  }
  1305  
  1306  func setRangeReaderHeaders(h http.Header, params *newRangeReaderParams) error {
  1307  	if params.readCompressed {
  1308  		h.Set("Accept-Encoding", "gzip")
  1309  	}
  1310  	if err := setEncryptionHeaders(h, params.encryptionKey, false); err != nil {
  1311  		return err
  1312  	}
  1313  	return nil
  1314  }
  1315  
  1316  // readerReopen initiates a Read with offset and length, assuming we
  1317  // have already read seen bytes.
  1318  func readerReopen(ctx context.Context, header http.Header, params *newRangeReaderParams, s *settings,
  1319  	doDownload func(context.Context) (*http.Response, error), applyConditions func() error, setGeneration func()) func(int64) (*http.Response, error) {
  1320  	return func(seen int64) (*http.Response, error) {
  1321  		// If the context has already expired, return immediately without making a
  1322  		// call.
  1323  		if err := ctx.Err(); err != nil {
  1324  			return nil, err
  1325  		}
  1326  		start := params.offset + seen
  1327  		if params.length < 0 && start < 0 {
  1328  			header.Set("Range", fmt.Sprintf("bytes=%d", start))
  1329  		} else if params.length < 0 && start > 0 {
  1330  			header.Set("Range", fmt.Sprintf("bytes=%d-", start))
  1331  		} else if params.length > 0 {
  1332  			// The end character isn't affected by how many bytes we've seen.
  1333  			header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, params.offset+params.length-1))
  1334  		}
  1335  		// We wait to assign conditions here because the generation number can change in between reopen() runs.
  1336  		if err := applyConditions(); err != nil {
  1337  			return nil, err
  1338  		}
  1339  		// If an object generation is specified, include generation as query string parameters.
  1340  		if params.gen >= 0 {
  1341  			setGeneration()
  1342  		}
  1343  
  1344  		var err error
  1345  		var res *http.Response
  1346  		err = run(ctx, func(ctx context.Context) error {
  1347  			res, err = doDownload(ctx)
  1348  			if err != nil {
  1349  				var e *googleapi.Error
  1350  				if errors.As(err, &e) {
  1351  					if e.Code == http.StatusNotFound {
  1352  						return ErrObjectNotExist
  1353  					}
  1354  				}
  1355  				return err
  1356  			}
  1357  
  1358  			if res.StatusCode == http.StatusNotFound {
  1359  				// this check is necessary only for XML
  1360  				res.Body.Close()
  1361  				return ErrObjectNotExist
  1362  			}
  1363  			if res.StatusCode < 200 || res.StatusCode > 299 {
  1364  				body, _ := ioutil.ReadAll(res.Body)
  1365  				res.Body.Close()
  1366  				return &googleapi.Error{
  1367  					Code:   res.StatusCode,
  1368  					Header: res.Header,
  1369  					Body:   string(body),
  1370  				}
  1371  			}
  1372  
  1373  			partialContentNotSatisfied :=
  1374  				!decompressiveTranscoding(res) &&
  1375  					start > 0 && params.length != 0 &&
  1376  					res.StatusCode != http.StatusPartialContent
  1377  
  1378  			if partialContentNotSatisfied {
  1379  				res.Body.Close()
  1380  				return errors.New("storage: partial request not satisfied")
  1381  			}
  1382  
  1383  			// With "Content-Encoding": "gzip" aka decompressive transcoding, GCS serves
  1384  			// back the whole file regardless of the range count passed in as per:
  1385  			//      https://cloud.google.com/storage/docs/transcoding#range,
  1386  			// thus we have to manually move the body forward by seen bytes.
  1387  			if decompressiveTranscoding(res) && seen > 0 {
  1388  				_, _ = io.CopyN(ioutil.Discard, res.Body, seen)
  1389  			}
  1390  
  1391  			// If a generation hasn't been specified, and this is the first response we get, let's record the
  1392  			// generation. In future requests we'll use this generation as a precondition to avoid data races.
  1393  			if params.gen < 0 && res.Header.Get("X-Goog-Generation") != "" {
  1394  				gen64, err := strconv.ParseInt(res.Header.Get("X-Goog-Generation"), 10, 64)
  1395  				if err != nil {
  1396  					return err
  1397  				}
  1398  				params.gen = gen64
  1399  			}
  1400  			return nil
  1401  		}, s.retry, s.idempotent)
  1402  		if err != nil {
  1403  			return nil, err
  1404  		}
  1405  		return res, nil
  1406  	}
  1407  }
  1408  
  1409  func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen func(int64) (*http.Response, error)) (*Reader, error) {
  1410  	var err error
  1411  	var (
  1412  		size        int64 // total size of object, even if a range was requested.
  1413  		checkCRC    bool
  1414  		crc         uint32
  1415  		startOffset int64 // non-zero if range request.
  1416  	)
  1417  	if res.StatusCode == http.StatusPartialContent {
  1418  		cr := strings.TrimSpace(res.Header.Get("Content-Range"))
  1419  		if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") {
  1420  			return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
  1421  		}
  1422  		// Content range is formatted <first byte>-<last byte>/<total size>. We take
  1423  		// the total size.
  1424  		size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64)
  1425  		if err != nil {
  1426  			return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
  1427  		}
  1428  
  1429  		dashIndex := strings.Index(cr, "-")
  1430  		if dashIndex >= 0 {
  1431  			startOffset, err = strconv.ParseInt(cr[len("bytes="):dashIndex], 10, 64)
  1432  			if err != nil {
  1433  				return nil, fmt.Errorf("storage: invalid Content-Range %q: %w", cr, err)
  1434  			}
  1435  		}
  1436  	} else {
  1437  		size = res.ContentLength
  1438  		// Check the CRC iff all of the following hold:
  1439  		// - We asked for content (length != 0).
  1440  		// - We got all the content (status != PartialContent).
  1441  		// - The server sent a CRC header.
  1442  		// - The Go http stack did not uncompress the file.
  1443  		// - We were not served compressed data that was uncompressed on download.
  1444  		// The problem with the last two cases is that the CRC will not match -- GCS
  1445  		// computes it on the compressed contents, but we compute it on the
  1446  		// uncompressed contents.
  1447  		if params.length != 0 && !res.Uncompressed && !uncompressedByServer(res) {
  1448  			crc, checkCRC = parseCRC32c(res)
  1449  		}
  1450  	}
  1451  
  1452  	remain := res.ContentLength
  1453  	body := res.Body
  1454  	// If the user requested zero bytes, explicitly close and remove the request
  1455  	// body.
  1456  	if params.length == 0 {
  1457  		remain = 0
  1458  		body.Close()
  1459  		body = emptyBody
  1460  	}
  1461  	var metaGen int64
  1462  	if res.Header.Get("X-Goog-Metageneration") != "" {
  1463  		metaGen, err = strconv.ParseInt(res.Header.Get("X-Goog-Metageneration"), 10, 64)
  1464  		if err != nil {
  1465  			return nil, err
  1466  		}
  1467  	}
  1468  
  1469  	var lm time.Time
  1470  	if res.Header.Get("Last-Modified") != "" {
  1471  		lm, err = http.ParseTime(res.Header.Get("Last-Modified"))
  1472  		if err != nil {
  1473  			return nil, err
  1474  		}
  1475  	}
  1476  
  1477  	attrs := ReaderObjectAttrs{
  1478  		Size:            size,
  1479  		ContentType:     res.Header.Get("Content-Type"),
  1480  		ContentEncoding: res.Header.Get("Content-Encoding"),
  1481  		CacheControl:    res.Header.Get("Cache-Control"),
  1482  		LastModified:    lm,
  1483  		StartOffset:     startOffset,
  1484  		Generation:      params.gen,
  1485  		Metageneration:  metaGen,
  1486  	}
  1487  	return &Reader{
  1488  		Attrs:    attrs,
  1489  		size:     size,
  1490  		remain:   remain,
  1491  		checkCRC: checkCRC,
  1492  		reader: &httpReader{
  1493  			reopen:   reopen,
  1494  			body:     body,
  1495  			wantCRC:  crc,
  1496  			checkCRC: checkCRC,
  1497  		},
  1498  	}, nil
  1499  }
  1500  

View as plain text