...

Source file src/cloud.google.com/go/storage/retry_conformance_test.go

Documentation: cloud.google.com/go/storage

     1  // Copyright 2019 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package storage
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"crypto/rand"
    21  	"encoding/json"
    22  	"fmt"
    23  	"io"
    24  	"net/http"
    25  	"net/url"
    26  	"os"
    27  	"strings"
    28  	"testing"
    29  	"time"
    30  
    31  	"cloud.google.com/go/internal/uid"
    32  	storage_v1_tests "cloud.google.com/go/storage/internal/test/conformance"
    33  	"github.com/googleapis/gax-go/v2"
    34  	"github.com/googleapis/gax-go/v2/callctx"
    35  	"google.golang.org/api/iterator"
    36  )
    37  
    38  const (
    39  	projectID           = "my-project-id"
    40  	serviceAccountEmail = "my-sevice-account@my-project-id.iam.gserviceaccount.com"
    41  	MiB                 = 1 << 10 << 10
    42  )
    43  
    44  var (
    45  	// Resource vars for retry tests
    46  	bucketIDs       = uid.NewSpace("bucket", nil)
    47  	objectIDs       = uid.NewSpace("object", nil)
    48  	notificationIDs = uid.NewSpace("notification", nil)
    49  
    50  	size9MiB           = 9 * MiB
    51  	randomBytesToWrite = []byte("abcdef")
    52  	// A 3 MiB object is large enough to span several messages and trigger a
    53  	// resumable upload in the Go library (with chunksize set to 2MiB). We use
    54  	// this in tests that require a larger object that can be less than 9MiB.
    55  	randomBytes3MiB = generateRandomBytes(3 * MiB)
    56  	// A 9 MiB object is required for "storage.resumable.upload"-specific tests,
    57  	// because there is a test that test errors after the first 8MiB.
    58  	randomBytes9MiB = generateRandomBytes(size9MiB)
    59  )
    60  
    61  type retryFunc func(ctx context.Context, c *Client, fs *resources, preconditions bool) error
    62  
    63  // Methods to retry. This is a map whose keys are a string describing a standard
    64  // API call (e.g. storage.objects.get) and values are a list of functions which
    65  // wrap library methods that implement these calls. There may be multiple values
    66  // because multiple library methods may use the same call (e.g. get could be a
    67  // read or just a metadata get).
    68  //
    69  // There may be missing methods with respect to the json API as not all methods
    70  // are used in the client library. The following are not used:
    71  // storage.bucket_acl.get
    72  // storage.bucket_acl.insert
    73  // storage.bucket_acl.patch
    74  // storage.buckets.update
    75  // storage.default_object_acl.get
    76  // storage.default_object_acl.insert
    77  // storage.default_object_acl.patch
    78  // storage.notifications.get
    79  // storage.object_acl.get
    80  // storage.object_acl.insert
    81  // storage.object_acl.patch
    82  // storage.objects.copy
    83  // storage.objects.update
    84  var methods = map[string][]retryFunc{
    85  	// Idempotent operations
    86  	"storage.bucket_acl.list": {
    87  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
    88  			_, err := c.Bucket(fs.bucket.Name).ACL().List(ctx)
    89  			return err
    90  		},
    91  	},
    92  	"storage.buckets.delete": {
    93  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
    94  			// Delete files from bucket before deleting bucket
    95  			it := c.Bucket(fs.bucket.Name).Objects(ctx, nil)
    96  			for {
    97  				attrs, err := it.Next()
    98  				if err == iterator.Done {
    99  					break
   100  				}
   101  				if err != nil {
   102  					return err
   103  				}
   104  				if err := c.Bucket(fs.bucket.Name).Object(attrs.Name).Delete(ctx); err != nil {
   105  					return err
   106  				}
   107  			}
   108  			return c.Bucket(fs.bucket.Name).Delete(ctx)
   109  		},
   110  	},
   111  	"storage.buckets.get": {
   112  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   113  			_, err := c.Bucket(fs.bucket.Name).Attrs(ctx)
   114  			return err
   115  		},
   116  	},
   117  	"storage.buckets.getIamPolicy": {
   118  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   119  			_, err := c.Bucket(fs.bucket.Name).IAM().Policy(ctx)
   120  			return err
   121  		},
   122  	},
   123  	"storage.buckets.insert": {
   124  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   125  			b := bucketIDs.New()
   126  			return c.Bucket(b).Create(ctx, projectID, nil)
   127  		},
   128  	},
   129  	"storage.buckets.list": {
   130  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   131  			it := c.Buckets(ctx, projectID)
   132  			for {
   133  				_, err := it.Next()
   134  				if err == iterator.Done {
   135  					return nil
   136  				}
   137  				if err != nil {
   138  					return err
   139  				}
   140  			}
   141  		},
   142  	},
   143  	"storage.buckets.lockRetentionPolicy": {
   144  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   145  			// buckets.lockRetentionPolicy is always idempotent, but is a special case because IfMetagenerationMatch is always required
   146  			return c.Bucket(fs.bucket.Name).If(BucketConditions{MetagenerationMatch: fs.bucket.MetaGeneration}).LockRetentionPolicy(ctx)
   147  		},
   148  	},
   149  	"storage.buckets.testIamPermissions": {
   150  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   151  			_, err := c.Bucket(fs.bucket.Name).IAM().TestPermissions(ctx, nil)
   152  			return err
   153  		},
   154  	},
   155  	"storage.default_object_acl.list": {
   156  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   157  			_, err := c.Bucket(fs.bucket.Name).DefaultObjectACL().List(ctx)
   158  			return err
   159  		},
   160  	},
   161  	"storage.hmacKey.delete": {
   162  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   163  			// key must be inactive to delete:
   164  			c.HMACKeyHandle(projectID, fs.hmacKey.AccessID).Update(ctx, HMACKeyAttrsToUpdate{State: "INACTIVE"})
   165  			return c.HMACKeyHandle(projectID, fs.hmacKey.AccessID).Delete(ctx)
   166  		},
   167  	},
   168  	"storage.hmacKey.get": {
   169  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   170  			_, err := c.HMACKeyHandle(projectID, fs.hmacKey.AccessID).Get(ctx)
   171  			return err
   172  		},
   173  	},
   174  	"storage.hmacKey.list": {
   175  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   176  			it := c.ListHMACKeys(ctx, projectID)
   177  			for {
   178  				_, err := it.Next()
   179  				if err == iterator.Done {
   180  					return nil
   181  				}
   182  				if err != nil {
   183  					return err
   184  				}
   185  			}
   186  		},
   187  	},
   188  	"storage.notifications.delete": {
   189  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   190  			return c.Bucket(fs.bucket.Name).DeleteNotification(ctx, fs.notification.ID)
   191  		},
   192  	},
   193  	"storage.notifications.list": {
   194  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   195  			_, err := c.Bucket(fs.bucket.Name).Notifications(ctx)
   196  			return err
   197  		},
   198  	},
   199  	"storage.object_acl.list": {
   200  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   201  			_, err := c.Bucket(fs.bucket.Name).Object(fs.object.Name).ACL().List(ctx)
   202  			return err
   203  		},
   204  	},
   205  	"storage.objects.get": {
   206  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   207  			_, err := c.Bucket(fs.bucket.Name).Object(fs.object.Name).Attrs(ctx)
   208  			return err
   209  		},
   210  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   211  			r, err := c.Bucket(fs.bucket.Name).Object(fs.object.Name).NewReader(ctx)
   212  			if err != nil {
   213  				return err
   214  			}
   215  			wr, err := r.WriteTo(io.Discard)
   216  			if got, want := wr, len(randomBytesToWrite); got != int64(want) {
   217  				return fmt.Errorf("body length mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
   218  			}
   219  			return err
   220  		},
   221  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   222  			// This tests downloads by calling Reader.Read rather than Reader.WriteTo.
   223  			r, err := c.Bucket(fs.bucket.Name).Object(fs.object.Name).NewReader(ctx)
   224  			if err != nil {
   225  				return err
   226  			}
   227  			// Use ReadAll because it calls Read implicitly, not WriteTo.
   228  			b, err := io.ReadAll(r)
   229  			if got, want := len(b), len(randomBytesToWrite); got != want {
   230  				return fmt.Errorf("body length mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
   231  			}
   232  			return err
   233  		},
   234  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   235  			// Test JSON reads.
   236  			client, ok := c.tc.(*httpStorageClient)
   237  			if ok {
   238  				client.config.readAPIWasSet = true
   239  				client.config.useJSONforReads = true
   240  				defer func() {
   241  					client.config.readAPIWasSet = false
   242  					client.config.useJSONforReads = false
   243  				}()
   244  			}
   245  
   246  			r, err := c.Bucket(fs.bucket.Name).Object(fs.object.Name).NewReader(ctx)
   247  			if err != nil {
   248  				return err
   249  			}
   250  			wr, err := io.Copy(io.Discard, r)
   251  			if got, want := wr, len(randomBytesToWrite); got != int64(want) {
   252  				return fmt.Errorf("body length mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
   253  			}
   254  			return err
   255  		},
   256  	},
   257  	"storage.objects.download": {
   258  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   259  			// Before running the test method, populate a large test object.
   260  			objName := objectIDs.New()
   261  			if err := uploadTestObject(fs.bucket.Name, objName, randomBytes3MiB); err != nil {
   262  				return fmt.Errorf("failed to create large object pre test, err: %v", err)
   263  			}
   264  			// Download the large test object for the S8 download method group.
   265  			r, err := c.Bucket(fs.bucket.Name).Object(objName).NewReader(ctx)
   266  			if err != nil {
   267  				return err
   268  			}
   269  			defer r.Close()
   270  			data, err := io.ReadAll(r)
   271  			if err != nil {
   272  				return fmt.Errorf("failed to ReadAll, err: %v", err)
   273  			}
   274  			if got, want := len(data), 3*MiB; got != want {
   275  				return fmt.Errorf("body length mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
   276  			}
   277  			if got, want := data, randomBytes3MiB; !bytes.Equal(got, want) {
   278  				return fmt.Errorf("body mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
   279  			}
   280  			return nil
   281  		},
   282  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   283  			// Test download via Reader.WriteTo.
   284  			// Before running the test method, populate a large test object of 9 MiB.
   285  			objName := objectIDs.New()
   286  			if err := uploadTestObject(fs.bucket.Name, objName, randomBytes3MiB); err != nil {
   287  				return fmt.Errorf("failed to create 9 MiB large object pre test, err: %v", err)
   288  			}
   289  			// Download the large test object for the S8 download method group.
   290  			r, err := c.Bucket(fs.bucket.Name).Object(objName).NewReader(ctx)
   291  			if err != nil {
   292  				return err
   293  			}
   294  			defer r.Close()
   295  			var data bytes.Buffer
   296  			_, err = r.WriteTo(&data)
   297  			if err != nil {
   298  				return fmt.Errorf("failed to ReadAll, err: %v", err)
   299  			}
   300  			if got, want := data.Len(), 3*MiB; got != want {
   301  				return fmt.Errorf("body length mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
   302  			}
   303  			if got, want := data.Bytes(), randomBytes3MiB; !bytes.Equal(got, want) {
   304  				return fmt.Errorf("body mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
   305  			}
   306  			return nil
   307  		},
   308  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   309  			// Test JSON reads.
   310  			// Before running the test method, populate a large test object.
   311  			objName := objectIDs.New()
   312  			if err := uploadTestObject(fs.bucket.Name, objName, randomBytes3MiB); err != nil {
   313  				return fmt.Errorf("failed to create large object pre test, err: %v", err)
   314  			}
   315  
   316  			client, ok := c.tc.(*httpStorageClient)
   317  			if ok {
   318  				client.config.readAPIWasSet = true
   319  				client.config.useJSONforReads = true
   320  				defer func() {
   321  					client.config.readAPIWasSet = false
   322  					client.config.useJSONforReads = false
   323  				}()
   324  			}
   325  
   326  			// Download the large test object for the S8 download method group.
   327  			r, err := c.Bucket(fs.bucket.Name).Object(objName).NewReader(ctx)
   328  			if err != nil {
   329  				return err
   330  			}
   331  			defer r.Close()
   332  			data, err := io.ReadAll(r)
   333  			if err != nil {
   334  				return fmt.Errorf("failed to ReadAll, err: %v", err)
   335  			}
   336  			if got, want := len(data), 3*MiB; got != want {
   337  				return fmt.Errorf("body length mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
   338  			}
   339  			if got, want := data, randomBytes3MiB; !bytes.Equal(got, want) {
   340  				return fmt.Errorf("body mismatch\ngot:\n%v\n\nwant:\n%v", got, want)
   341  			}
   342  			return nil
   343  		},
   344  	},
   345  	"storage.objects.list": {
   346  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   347  			it := c.Bucket(fs.bucket.Name).Objects(ctx, nil)
   348  			for {
   349  				_, err := it.Next()
   350  				if err == iterator.Done {
   351  					return nil
   352  				}
   353  				if err != nil {
   354  					return err
   355  				}
   356  			}
   357  		},
   358  	},
   359  	"storage.serviceaccount.get": {
   360  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   361  			_, err := c.ServiceAccount(ctx, projectID)
   362  			return err
   363  		},
   364  	},
   365  	// Conditionally idempotent operations
   366  	"storage.buckets.patch": {
   367  		func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
   368  			uattrs := BucketAttrsToUpdate{StorageClass: "ARCHIVE"}
   369  			bkt := c.Bucket(fs.bucket.Name)
   370  			if preconditions {
   371  				bkt = c.Bucket(fs.bucket.Name).If(BucketConditions{MetagenerationMatch: fs.bucket.MetaGeneration})
   372  			}
   373  			_, err := bkt.Update(ctx, uattrs)
   374  			return err
   375  		},
   376  	},
   377  	"storage.buckets.setIamPolicy": {
   378  		func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
   379  			bkt := c.Bucket(fs.bucket.Name)
   380  			policy, err := bkt.IAM().Policy(ctx)
   381  			if err != nil {
   382  				return err
   383  			}
   384  
   385  			if !preconditions {
   386  				policy.InternalProto.Etag = nil
   387  			}
   388  
   389  			return bkt.IAM().SetPolicy(ctx, policy)
   390  		},
   391  	},
   392  	"storage.hmacKey.update": {
   393  		func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
   394  			key := c.HMACKeyHandle(projectID, fs.hmacKey.AccessID)
   395  			uattrs := HMACKeyAttrsToUpdate{State: "INACTIVE"}
   396  
   397  			if preconditions {
   398  				uattrs.Etag = fs.hmacKey.Etag
   399  			}
   400  
   401  			_, err := key.Update(ctx, uattrs)
   402  			return err
   403  		},
   404  	},
   405  	"storage.objects.compose": {
   406  		func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
   407  			dstName := "new-object"
   408  			src := c.Bucket(fs.bucket.Name).Object(fs.object.Name)
   409  			dst := c.Bucket(fs.bucket.Name).Object(dstName)
   410  
   411  			if preconditions {
   412  				dst = c.Bucket(fs.bucket.Name).Object(dstName).If(Conditions{DoesNotExist: true})
   413  			}
   414  
   415  			_, err := dst.ComposerFrom(src).Run(ctx)
   416  			return err
   417  		},
   418  	},
   419  	"storage.objects.delete": {
   420  		func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
   421  			obj := c.Bucket(fs.bucket.Name).Object(fs.object.Name)
   422  
   423  			if preconditions {
   424  				obj = c.Bucket(fs.bucket.Name).Object(fs.object.Name).If(Conditions{GenerationMatch: fs.object.Generation})
   425  			}
   426  			return obj.Delete(ctx)
   427  		},
   428  	},
   429  	"storage.objects.insert": {
   430  		// Single-shot upload that is sent in a single message.
   431  		func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
   432  			obj := c.Bucket(fs.bucket.Name).Object("new-object.txt")
   433  
   434  			if preconditions {
   435  				obj = obj.If(Conditions{DoesNotExist: true})
   436  			}
   437  
   438  			objW := obj.NewWriter(ctx)
   439  			if _, err := io.Copy(objW, strings.NewReader("object body")); err != nil {
   440  				return fmt.Errorf("io.Copy: %v", err)
   441  			}
   442  			if err := objW.Close(); err != nil {
   443  				return fmt.Errorf("Writer.Close: %v", err)
   444  			}
   445  			return nil
   446  		},
   447  		// Single-shot upload that spans several GRPC message sends (we send
   448  		// storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES=2MiB per msg).
   449  		// This is needed to cover all error code paths.
   450  		func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
   451  			obj := c.Bucket(fs.bucket.Name).Object("new-object.txt")
   452  
   453  			if preconditions {
   454  				obj = obj.If(Conditions{DoesNotExist: true})
   455  			}
   456  
   457  			objW := obj.NewWriter(ctx)
   458  			if _, err := objW.Write(randomBytes3MiB); err != nil {
   459  				return fmt.Errorf("io.Copy: %v", err)
   460  			}
   461  			if err := objW.Close(); err != nil {
   462  				return fmt.Errorf("Writer.Close: %v", err)
   463  			}
   464  			return nil
   465  		},
   466  		// Resumable upload.
   467  		func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
   468  			obj := c.Bucket(fs.bucket.Name).Object("new-object.txt")
   469  
   470  			if preconditions {
   471  				obj = obj.If(Conditions{DoesNotExist: true})
   472  			}
   473  
   474  			objW := obj.NewWriter(ctx)
   475  			objW.ChunkSize = 2 * MiB
   476  
   477  			if _, err := objW.Write(randomBytes3MiB); err != nil {
   478  				return fmt.Errorf("io.Copy: %v", err)
   479  			}
   480  			if err := objW.Close(); err != nil {
   481  				return fmt.Errorf("Writer.Close: %v", err)
   482  			}
   483  			return nil
   484  		},
   485  	},
   486  	"storage.resumable.upload": {
   487  		func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
   488  			obj := c.Bucket(fs.bucket.Name).Object(objectIDs.New())
   489  			if preconditions {
   490  				obj = obj.If(Conditions{DoesNotExist: true})
   491  			}
   492  			w := obj.NewWriter(ctx)
   493  			// Set Writer.ChunkSize to 2 MiB to perform resumable uploads.
   494  			w.ChunkSize = 2097152
   495  
   496  			if _, err := w.Write(randomBytes9MiB); err != nil {
   497  				return fmt.Errorf("writing object: %v", err)
   498  			}
   499  			if err := w.Close(); err != nil {
   500  				return fmt.Errorf("closing object: %v", err)
   501  			}
   502  			return nil
   503  		},
   504  	},
   505  	"storage.objects.patch": {
   506  		func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
   507  			uattrs := ObjectAttrsToUpdate{Metadata: map[string]string{"foo": "bar"}}
   508  			obj := c.Bucket(fs.bucket.Name).Object(fs.object.Name)
   509  			if preconditions {
   510  				obj = obj.If(Conditions{MetagenerationMatch: fs.object.Metageneration})
   511  			}
   512  			_, err := obj.Update(ctx, uattrs)
   513  			return err
   514  		},
   515  	},
   516  	"storage.objects.rewrite": {
   517  		func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
   518  			dstName := "new-object"
   519  			src := c.Bucket(fs.bucket.Name).Object(fs.object.Name)
   520  			dst := c.Bucket(fs.bucket.Name).Object(dstName)
   521  
   522  			if preconditions {
   523  				dst = c.Bucket(fs.bucket.Name).Object(dstName).If(Conditions{DoesNotExist: true})
   524  			}
   525  
   526  			_, err := dst.CopierFrom(src).Run(ctx)
   527  			return err
   528  		},
   529  	},
   530  	// Non-idempotent operations
   531  	"storage.bucket_acl.delete": {
   532  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   533  			return c.Bucket(fs.bucket.Name).ACL().Delete(ctx, AllUsers)
   534  		},
   535  	},
   536  	"storage.bucket_acl.update": {
   537  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   538  			return c.Bucket(fs.bucket.Name).ACL().Set(ctx, AllAuthenticatedUsers, RoleOwner)
   539  		},
   540  	},
   541  	"storage.default_object_acl.delete": {
   542  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   543  			return c.Bucket(fs.bucket.Name).DefaultObjectACL().Delete(ctx, AllAuthenticatedUsers)
   544  		},
   545  	},
   546  	"storage.default_object_acl.update": {
   547  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   548  			return c.Bucket(fs.bucket.Name).DefaultObjectACL().Set(ctx, AllAuthenticatedUsers, RoleOwner)
   549  		},
   550  	},
   551  	"storage.hmacKey.create": {
   552  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   553  			_, err := c.CreateHMACKey(ctx, projectID, serviceAccountEmail)
   554  			return err
   555  		},
   556  	},
   557  	"storage.notifications.insert": {
   558  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   559  			notification := Notification{
   560  				TopicID:        "my-topic",
   561  				TopicProjectID: projectID,
   562  				PayloadFormat:  "json",
   563  			}
   564  			_, err := c.Bucket(fs.bucket.Name).AddNotification(ctx, &notification)
   565  			return err
   566  		},
   567  	},
   568  	"storage.object_acl.delete": {
   569  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   570  			return c.Bucket(fs.bucket.Name).Object(fs.object.Name).ACL().Delete(ctx, AllAuthenticatedUsers)
   571  		},
   572  	},
   573  	"storage.object_acl.update": {
   574  		func(ctx context.Context, c *Client, fs *resources, _ bool) error {
   575  			return c.Bucket(fs.bucket.Name).Object(fs.object.Name).ACL().Set(ctx, AllAuthenticatedUsers, RoleOwner)
   576  		},
   577  	},
   578  }
   579  
   580  func TestRetryConformance(t *testing.T) {
   581  	// This endpoint is used only to call the testbench retry test API, which is HTTP
   582  	// based. The endpoint called by the client library is determined inside of the
   583  	// client constructor and will differ depending on the transport.
   584  	host := os.Getenv("STORAGE_EMULATOR_HOST")
   585  	if host == "" {
   586  		t.Skip("This test must use the testbench emulator; set STORAGE_EMULATOR_HOST to run.")
   587  	}
   588  	endpoint, err := url.Parse(host)
   589  	if err != nil {
   590  		t.Fatalf("error parsing emulator host (make sure it includes the scheme such as http://host): %v", err)
   591  	}
   592  
   593  	ctx := context.Background()
   594  
   595  	// Create non-wrapped client to use for setup steps.
   596  	client, err := NewClient(ctx)
   597  	if err != nil {
   598  		t.Fatalf("storage.NewClient: %v", err)
   599  	}
   600  
   601  	_, _, testFiles := parseFiles(t)
   602  
   603  	for _, testFile := range testFiles {
   604  		for _, retryTest := range testFile.RetryTests {
   605  			for _, instructions := range retryTest.Cases {
   606  				for _, method := range retryTest.Methods {
   607  					methodName := method.Name
   608  					if method.Group != "" {
   609  						methodName = method.Group
   610  					}
   611  					if len(methods[methodName]) == 0 {
   612  						t.Logf("No tests for operation %v", methodName)
   613  					}
   614  					for i, fn := range methods[methodName] {
   615  						transports := []string{"http", "grpc"}
   616  						for _, transport := range transports {
   617  							testName := fmt.Sprintf("%v-%v-%v-%v-%v", transport, retryTest.Id, instructions.Instructions, methodName, i)
   618  							t.Run(testName, func(t *testing.T) {
   619  								// Create the retry subtest
   620  								subtest := &emulatorTest{T: t, name: testName, host: endpoint}
   621  								subtest.create(map[string][]string{
   622  									method.Name: instructions.Instructions,
   623  								}, transport)
   624  
   625  								// Create necessary test resources in the emulator
   626  								subtest.populateResources(ctx, client, method.Resources)
   627  
   628  								// Test
   629  								// Set retry test id through headers per test call
   630  								ctx := context.Background()
   631  								ctx = callctx.SetHeaders(ctx, "x-retry-test-id", subtest.id)
   632  								err = fn(ctx, subtest.transportClient, &subtest.resources, retryTest.PreconditionProvided)
   633  								if retryTest.ExpectSuccess && err != nil {
   634  									t.Errorf("want success, got %v", err)
   635  								}
   636  								if !retryTest.ExpectSuccess && err == nil {
   637  									t.Errorf("want failure, got success")
   638  								}
   639  
   640  								// Verify that all instructions were used up during the test
   641  								// (indicates that the client sent the correct requests).
   642  								subtest.check()
   643  
   644  								// Close out test in emulator.
   645  								subtest.delete()
   646  							})
   647  						}
   648  					}
   649  				}
   650  			}
   651  		}
   652  	}
   653  }
   654  
   655  type emulatorTest struct {
   656  	*testing.T
   657  	name            string
   658  	id              string // ID to pass as a header in the test execution
   659  	resources       resources
   660  	host            *url.URL // set the path when using; path is not guaranteed between calls
   661  	transportClient *Client
   662  }
   663  
   664  // Holds the resources for a particular test case. Only the necessary fields will
   665  // be populated; others will be nil.
   666  type resources struct {
   667  	bucket       *BucketAttrs
   668  	object       *ObjectAttrs
   669  	notification *Notification
   670  	hmacKey      *HMACKey
   671  }
   672  
   673  // Creates given test resources with the provided client
   674  func (et *emulatorTest) populateResources(ctx context.Context, c *Client, resources []storage_v1_tests.Resource) {
   675  	for _, resource := range resources {
   676  		switch resource {
   677  		case storage_v1_tests.Resource_BUCKET:
   678  			bkt := c.Bucket(bucketIDs.New())
   679  			if err := bkt.Create(ctx, projectID, &BucketAttrs{}); err != nil {
   680  				et.Fatalf("creating bucket: %v", err)
   681  			}
   682  			attrs, err := bkt.Attrs(ctx)
   683  			if err != nil {
   684  				et.Fatalf("getting bucket attrs: %v", err)
   685  			}
   686  			et.resources.bucket = attrs
   687  		case storage_v1_tests.Resource_OBJECT:
   688  			// Assumes bucket has been populated first.
   689  			obj := c.Bucket(et.resources.bucket.Name).Object(objectIDs.New())
   690  			w := obj.NewWriter(ctx)
   691  			if _, err := w.Write(randomBytesToWrite); err != nil {
   692  				et.Fatalf("writing object: %v", err)
   693  			}
   694  			if err := w.Close(); err != nil {
   695  				et.Fatalf("closing object: %v", err)
   696  			}
   697  			attrs, err := obj.Attrs(ctx)
   698  			if err != nil {
   699  				et.Fatalf("getting object attrs: %v", err)
   700  			}
   701  			et.resources.object = attrs
   702  		case storage_v1_tests.Resource_NOTIFICATION:
   703  			// Assumes bucket has been populated first.
   704  			n, err := c.Bucket(et.resources.bucket.Name).AddNotification(ctx, &Notification{
   705  				TopicProjectID: projectID,
   706  				TopicID:        notificationIDs.New(),
   707  				PayloadFormat:  JSONPayload,
   708  			})
   709  			if err != nil {
   710  				et.Fatalf("adding notification: %v", err)
   711  			}
   712  			et.resources.notification = n
   713  		case storage_v1_tests.Resource_HMAC_KEY:
   714  			key, err := c.CreateHMACKey(ctx, projectID, serviceAccountEmail)
   715  			if err != nil {
   716  				et.Fatalf("creating HMAC key: %v", err)
   717  			}
   718  			et.resources.hmacKey = key
   719  		}
   720  	}
   721  }
   722  
   723  // Generates size random bytes.
   724  func generateRandomBytes(n int) []byte {
   725  	b := make([]byte, n)
   726  	_, _ = rand.Read(b)
   727  	return b
   728  }
   729  
   730  // Upload test object with given bytes.
   731  func uploadTestObject(bucketName, objName string, n []byte) error {
   732  	// Create non-wrapped client to create test object.
   733  	ctx := context.Background()
   734  	c, err := NewClient(ctx)
   735  	if err != nil {
   736  		return fmt.Errorf("storage.NewClient: %v", err)
   737  	}
   738  	obj := c.Bucket(bucketName).Object(objName)
   739  	w := obj.NewWriter(ctx)
   740  	if _, err := w.Write(n); err != nil {
   741  		return fmt.Errorf("writing test object: %v", err)
   742  	}
   743  	if err := w.Close(); err != nil {
   744  		return fmt.Errorf("closing object: %v", err)
   745  	}
   746  	return nil
   747  }
   748  
   749  // Creates a retry test resource in the emulator
   750  func (et *emulatorTest) create(instructions map[string][]string, transport string) {
   751  	c := http.DefaultClient
   752  	data := struct {
   753  		Instructions map[string][]string `json:"instructions"`
   754  		Transport    string              `json:"transport"`
   755  	}{
   756  		Instructions: instructions,
   757  		Transport:    transport,
   758  	}
   759  
   760  	buf := new(bytes.Buffer)
   761  	if err := json.NewEncoder(buf).Encode(data); err != nil {
   762  		et.Fatalf("encoding request: %v", err)
   763  	}
   764  
   765  	et.host.Path = "retry_test"
   766  	resp, err := c.Post(et.host.String(), "application/json", buf)
   767  	if resp != nil && resp.StatusCode == 501 {
   768  		et.T.Skip("This retry test case is not yet supported in the testbench.")
   769  	}
   770  	if err != nil || resp.StatusCode != 200 {
   771  		et.Fatalf("creating retry test: err: %v, resp: %+v", err, resp)
   772  	}
   773  	defer func() {
   774  		closeErr := resp.Body.Close()
   775  		if err == nil {
   776  			err = closeErr
   777  		}
   778  	}()
   779  	testRes := struct {
   780  		TestID string `json:"id"`
   781  	}{}
   782  	if err := json.NewDecoder(resp.Body).Decode(&testRes); err != nil {
   783  		et.Fatalf("decoding test ID: %v", err)
   784  	}
   785  
   786  	et.id = testRes.TestID
   787  	et.host.Path = ""
   788  
   789  	// Create transportClient for http or grpc
   790  	ctx := context.Background()
   791  	transportClient, err := NewClient(ctx)
   792  	if err != nil {
   793  		et.Fatalf("HTTP transportClient: %v", err)
   794  	}
   795  	if transport == "grpc" {
   796  		transportClient, err = NewGRPCClient(ctx)
   797  		if err != nil {
   798  			et.Fatalf("GRPC transportClient: %v", err)
   799  		}
   800  	}
   801  	// Reduce backoff to get faster test execution.
   802  	transportClient.SetRetry(WithBackoff(gax.Backoff{Initial: 10 * time.Millisecond}))
   803  	et.transportClient = transportClient
   804  }
   805  
   806  // Verifies that all instructions for a given retry testID have been used up
   807  func (et *emulatorTest) check() {
   808  	et.host.Path = strings.Join([]string{"retry_test", et.id}, "/")
   809  	c := http.DefaultClient
   810  	resp, err := c.Get(et.host.String())
   811  	if err != nil || resp.StatusCode != 200 {
   812  		et.Errorf("getting retry test: err: %v, resp: %+v", err, resp)
   813  	}
   814  	defer func() {
   815  		closeErr := resp.Body.Close()
   816  		if err == nil {
   817  			err = closeErr
   818  		}
   819  	}()
   820  	testRes := struct {
   821  		Instructions map[string][]string
   822  		Completed    bool
   823  	}{}
   824  	if err := json.NewDecoder(resp.Body).Decode(&testRes); err != nil {
   825  		et.Errorf("decoding response: %v", err)
   826  	}
   827  	if !testRes.Completed {
   828  		et.Errorf("test not completed; unused instructions: %+v", testRes.Instructions)
   829  	}
   830  }
   831  
   832  // Deletes a retry test resource
   833  func (et *emulatorTest) delete() {
   834  	et.host.Path = strings.Join([]string{"retry_test", et.id}, "/")
   835  	c := http.DefaultClient
   836  	req, err := http.NewRequest("DELETE", et.host.String(), nil)
   837  	if err != nil {
   838  		et.Errorf("creating request: %v", err)
   839  	}
   840  	resp, err := c.Do(req)
   841  	if err != nil || resp.StatusCode != 200 {
   842  		et.Errorf("deleting test: err: %v, resp: %+v", err, resp)
   843  	}
   844  }
   845  

View as plain text