...

Source file src/cloud.google.com/go/pubsub/integration_test.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2014 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"bufio"
    19  	"bytes"
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"io/ioutil"
    24  	"os"
    25  	"strings"
    26  	"sync"
    27  	"sync/atomic"
    28  	"testing"
    29  	"time"
    30  
    31  	"cloud.google.com/go/iam"
    32  	"cloud.google.com/go/internal"
    33  	"cloud.google.com/go/internal/testutil"
    34  	"cloud.google.com/go/internal/uid"
    35  	"cloud.google.com/go/internal/version"
    36  	kms "cloud.google.com/go/kms/apiv1"
    37  	"cloud.google.com/go/kms/apiv1/kmspb"
    38  	pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
    39  	testutil2 "cloud.google.com/go/pubsub/internal/testutil"
    40  	"github.com/google/go-cmp/cmp"
    41  	"github.com/google/go-cmp/cmp/cmpopts"
    42  	gax "github.com/googleapis/gax-go/v2"
    43  	"golang.org/x/oauth2/google"
    44  	"google.golang.org/api/iterator"
    45  	"google.golang.org/api/option"
    46  	"google.golang.org/grpc"
    47  	"google.golang.org/grpc/codes"
    48  	"google.golang.org/grpc/metadata"
    49  	"google.golang.org/grpc/status"
    50  	"google.golang.org/protobuf/encoding/protowire"
    51  	"google.golang.org/protobuf/proto"
    52  )
    53  
    54  var (
    55  	topicIDs  = uid.NewSpace("topic", nil)
    56  	subIDs    = uid.NewSpace("sub", nil)
    57  	schemaIDs = uid.NewSpace("schema", nil)
    58  )
    59  
    60  // messageData is used to hold the contents of a message so that it can be compared against the contents
    61  // of another message without regard to irrelevant fields.
    62  type messageData struct {
    63  	ID         string
    64  	Data       string
    65  	Attributes map[string]string
    66  }
    67  
    68  func extractMessageData(m *Message) messageData {
    69  	return messageData{
    70  		ID:         m.ID,
    71  		Data:       string(m.Data),
    72  		Attributes: m.Attributes,
    73  	}
    74  }
    75  
    76  func withGRPCHeadersAssertion(t *testing.T, opts ...option.ClientOption) []option.ClientOption {
    77  	grpcHeadersEnforcer := &testutil.HeadersEnforcer{
    78  		OnFailure: t.Errorf,
    79  		Checkers: []*testutil.HeaderChecker{
    80  			testutil.XGoogClientHeaderChecker,
    81  		},
    82  	}
    83  	return append(grpcHeadersEnforcer.CallOptions(), opts...)
    84  }
    85  
    86  func integrationTestClient(ctx context.Context, t *testing.T, opts ...option.ClientOption) *Client {
    87  	if testing.Short() {
    88  		t.Skip("Integration tests skipped in short mode")
    89  	}
    90  	projID := testutil.ProjID()
    91  	if projID == "" {
    92  		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
    93  	}
    94  	ts := testutil.TokenSource(ctx, ScopePubSub, ScopeCloudPlatform)
    95  	if ts == nil {
    96  		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
    97  	}
    98  	opts = append(withGRPCHeadersAssertion(t, option.WithTokenSource(ts)), opts...)
    99  	client, err := NewClient(ctx, projID, opts...)
   100  	if err != nil {
   101  		t.Fatalf("Creating client error: %v", err)
   102  	}
   103  	return client
   104  }
   105  
   106  func integrationTestSchemaClient(ctx context.Context, t *testing.T, opts ...option.ClientOption) *SchemaClient {
   107  	if testing.Short() {
   108  		t.Skip("Integration tests skipped in short mode")
   109  	}
   110  	projID := testutil.ProjID()
   111  	if projID == "" {
   112  		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
   113  	}
   114  	ts := testutil.TokenSource(ctx, ScopePubSub, ScopeCloudPlatform)
   115  	if ts == nil {
   116  		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
   117  	}
   118  	opts = append(withGRPCHeadersAssertion(t, option.WithTokenSource(ts)), opts...)
   119  	sc, err := NewSchemaClient(ctx, projID, opts...)
   120  	if err != nil {
   121  		t.Fatalf("Creating client error: %v", err)
   122  	}
   123  	return sc
   124  }
   125  
   126  func TestIntegration_Admin(t *testing.T) {
   127  	t.Parallel()
   128  	ctx := context.Background()
   129  	client := integrationTestClient(ctx, t)
   130  	defer client.Close()
   131  
   132  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
   133  	if err != nil {
   134  		t.Errorf("CreateTopic error: %v", err)
   135  	}
   136  	defer topic.Stop()
   137  	exists, err := topic.Exists(ctx)
   138  	if err != nil {
   139  		t.Fatalf("TopicExists error: %v", err)
   140  	}
   141  	if !exists {
   142  		t.Errorf("topic %v should exist, but it doesn't", topic)
   143  	}
   144  
   145  	var sub *Subscription
   146  	if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil {
   147  		t.Errorf("CreateSub error: %v", err)
   148  	}
   149  	exists, err = sub.Exists(ctx)
   150  	if err != nil {
   151  		t.Fatalf("SubExists error: %v", err)
   152  	}
   153  	if !exists {
   154  		t.Errorf("subscription %s should exist, but it doesn't", sub.ID())
   155  	}
   156  
   157  	if msg, ok := testIAM(ctx, topic.IAM(), "pubsub.topics.get"); !ok {
   158  		t.Errorf("topic IAM: %s", msg)
   159  	}
   160  	if msg, ok := testIAM(ctx, sub.IAM(), "pubsub.subscriptions.get"); !ok {
   161  		t.Errorf("sub IAM: %s", msg)
   162  	}
   163  
   164  	snap, err := sub.CreateSnapshot(ctx, "")
   165  	if err != nil {
   166  		t.Fatalf("CreateSnapshot error: %v", err)
   167  	}
   168  
   169  	labels := map[string]string{"foo": "bar"}
   170  	sc, err := snap.SetLabels(ctx, labels)
   171  	if err != nil {
   172  		t.Fatalf("Snapshot.SetLabels error: %v", err)
   173  	}
   174  	if diff := testutil.Diff(sc.Labels, labels); diff != "" {
   175  		t.Fatalf("\ngot: - want: +\n%s", diff)
   176  	}
   177  
   178  	timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute)
   179  	defer cancel()
   180  	err = internal.Retry(timeoutCtx, gax.Backoff{}, func() (bool, error) {
   181  		snapIt := client.Snapshots(timeoutCtx)
   182  		for {
   183  			s, err := snapIt.Next()
   184  			if err == nil && s.name == snap.name {
   185  				return true, nil
   186  			}
   187  			if err == iterator.Done {
   188  				return false, fmt.Errorf("cannot find snapshot: %q", snap.name)
   189  			}
   190  			if err != nil {
   191  				return false, err
   192  			}
   193  		}
   194  	})
   195  	if err != nil {
   196  		t.Error(err)
   197  	}
   198  
   199  	err = internal.Retry(timeoutCtx, gax.Backoff{}, func() (bool, error) {
   200  		err := sub.SeekToSnapshot(timeoutCtx, snap.Snapshot)
   201  		return err == nil, err
   202  	})
   203  	if err != nil {
   204  		t.Error(err)
   205  	}
   206  
   207  	err = internal.Retry(timeoutCtx, gax.Backoff{}, func() (bool, error) {
   208  		err := sub.SeekToTime(timeoutCtx, time.Now())
   209  		return err == nil, err
   210  	})
   211  	if err != nil {
   212  		t.Error(err)
   213  	}
   214  
   215  	err = internal.Retry(timeoutCtx, gax.Backoff{}, func() (bool, error) {
   216  		snapHandle := client.Snapshot(snap.ID())
   217  		err := snapHandle.Delete(timeoutCtx)
   218  		return err == nil, err
   219  	})
   220  	if err != nil {
   221  		t.Error(err)
   222  	}
   223  
   224  	if err := sub.Delete(ctx); err != nil {
   225  		t.Errorf("DeleteSub error: %v", err)
   226  	}
   227  
   228  	if err := topic.Delete(ctx); err != nil {
   229  		t.Errorf("DeleteTopic error: %v", err)
   230  	}
   231  }
   232  
   233  func TestIntegration_PublishReceive(t *testing.T) {
   234  	ctx := context.Background()
   235  	client := integrationTestClient(ctx, t)
   236  
   237  	for _, sync := range []bool{false, true} {
   238  		for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited
   239  			testPublishAndReceive(t, client, maxMsgs, sync, false, 10, 0)
   240  		}
   241  
   242  		// Tests for large messages (larger than the 4MB gRPC limit).
   243  		testPublishAndReceive(t, client, 0, sync, false, 1, 5*1024*1024)
   244  	}
   245  }
   246  
   247  // withGoogleClientInfo sets the name and version of the application in
   248  // the `x-goog-api-client` header passed on each request and returns the
   249  // updated context.
   250  func withGoogleClientInfo(ctx context.Context) context.Context {
   251  	ctxMD, _ := metadata.FromOutgoingContext(ctx)
   252  	kv := []string{
   253  		"gl-go",
   254  		version.Go(),
   255  		"gax",
   256  		gax.Version,
   257  		"grpc",
   258  		grpc.Version,
   259  	}
   260  
   261  	allMDs := append([]metadata.MD{ctxMD}, metadata.Pairs("x-goog-api-client", gax.XGoogHeader(kv...)))
   262  	return metadata.NewOutgoingContext(ctx, metadata.Join(allMDs...))
   263  }
   264  
   265  func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous, exactlyOnceDelivery bool, numMsgs, extraBytes int) {
   266  	t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,exactlyOnceDelivery:%t,numMsgs:%d", maxMsgs, synchronous, exactlyOnceDelivery, numMsgs), func(t *testing.T) {
   267  		t.Parallel()
   268  		testutil.Retry(t, 3, 10*time.Second, func(r *testutil.R) {
   269  			ctx := context.Background()
   270  			topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
   271  			if err != nil {
   272  				r.Errorf("CreateTopic error: %v", err)
   273  			}
   274  			defer topic.Delete(ctx)
   275  			defer topic.Stop()
   276  			exists, err := topic.Exists(ctx)
   277  			if err != nil {
   278  				r.Errorf("TopicExists error: %v", err)
   279  			}
   280  			if !exists {
   281  				r.Errorf("topic %v should exist, but it doesn't", topic)
   282  			}
   283  
   284  			sub, err := createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
   285  				Topic:                     topic,
   286  				EnableExactlyOnceDelivery: exactlyOnceDelivery,
   287  			})
   288  			if err != nil {
   289  				r.Errorf("CreateSub error: %v", err)
   290  			}
   291  			defer sub.Delete(ctx)
   292  			exists, err = sub.Exists(ctx)
   293  			if err != nil {
   294  				r.Errorf("SubExists error: %v", err)
   295  			}
   296  			if !exists {
   297  				r.Errorf("subscription %s should exist, but it doesn't", sub.ID())
   298  			}
   299  			var msgs []*Message
   300  			for i := 0; i < numMsgs; i++ {
   301  				text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes))
   302  				attrs := make(map[string]string)
   303  				attrs["foo"] = "bar"
   304  				msgs = append(msgs, &Message{
   305  					Data:       []byte(text),
   306  					Attributes: attrs,
   307  				})
   308  			}
   309  
   310  			// Publish some messages.
   311  			type pubResult struct {
   312  				m *Message
   313  				r *PublishResult
   314  			}
   315  			var rs []pubResult
   316  			for _, m := range msgs {
   317  				r := topic.Publish(ctx, m)
   318  				rs = append(rs, pubResult{m, r})
   319  			}
   320  			want := make(map[string]messageData)
   321  			for _, res := range rs {
   322  				id, err := res.r.Get(ctx)
   323  				if err != nil {
   324  					r.Errorf("r.Get: %v", err)
   325  				}
   326  				md := extractMessageData(res.m)
   327  				md.ID = id
   328  				want[md.ID] = md
   329  			}
   330  
   331  			sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs
   332  			sub.ReceiveSettings.Synchronous = synchronous
   333  
   334  			// Use a timeout to ensure that Pull does not block indefinitely if there are
   335  			// unexpectedly few messages available.
   336  			now := time.Now()
   337  			timeout := 3 * time.Minute
   338  			timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
   339  			defer cancel()
   340  			gotMsgs, err := pullN(timeoutCtx, sub, len(want), 0, func(ctx context.Context, m *Message) {
   341  				m.Ack()
   342  			})
   343  			if err != nil {
   344  				if c := status.Convert(err); c.Code() == codes.Canceled {
   345  					if time.Since(now) >= timeout {
   346  						r.Errorf("pullN took longer than %v", timeout)
   347  					}
   348  				} else {
   349  					r.Errorf("Pull: %v", err)
   350  				}
   351  			}
   352  			got := make(map[string]messageData)
   353  			for _, m := range gotMsgs {
   354  				md := extractMessageData(m)
   355  				got[md.ID] = md
   356  			}
   357  			if !testutil.Equal(got, want) {
   358  				r.Errorf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v",
   359  					maxMsgs, synchronous, got, want)
   360  			}
   361  		})
   362  	})
   363  }
   364  
   365  // IAM tests.
   366  // NOTE: for these to succeed, the test runner identity must have the Pub/Sub Admin or Owner roles.
   367  // To set, visit https://console.developers.google.com, select "IAM & Admin" from the top-left
   368  // menu, choose the account, click the Roles dropdown, and select "Pub/Sub > Pub/Sub Admin".
   369  // TODO(jba): move this to a testing package within cloud.google.com/iam, so we can re-use it.
   370  func testIAM(ctx context.Context, h *iam.Handle, permission string) (msg string, ok bool) {
   371  	// Manually adding withGoogleClientInfo here because this code only takes
   372  	// a handle with a grpc.ClientConn that has the "x-goog-api-client" header enforcer,
   373  	// but unfortunately not the underlying infrastructure that takes pre-set headers.
   374  	ctx = withGoogleClientInfo(ctx)
   375  
   376  	// Attempting to add an non-existent identity  (e.g. "alice@example.com") causes the service
   377  	// to return an internal error, so use a real identity.
   378  	const member = "domain:google.com"
   379  
   380  	var policy *iam.Policy
   381  	var err error
   382  
   383  	if policy, err = h.Policy(ctx); err != nil {
   384  		return fmt.Sprintf("Policy: %v", err), false
   385  	}
   386  	// The resource is new, so the policy should be empty.
   387  	if got := policy.Roles(); len(got) > 0 {
   388  		return fmt.Sprintf("initially: got roles %v, want none", got), false
   389  	}
   390  	// Add a member, set the policy, then check that the member is present.
   391  	policy.Add(member, iam.Viewer)
   392  	if err := h.SetPolicy(ctx, policy); err != nil {
   393  		return fmt.Sprintf("SetPolicy: %v", err), false
   394  	}
   395  	if policy, err = h.Policy(ctx); err != nil {
   396  		return fmt.Sprintf("Policy: %v", err), false
   397  	}
   398  	if got, want := policy.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) {
   399  		return fmt.Sprintf("after Add: got %v, want %v", got, want), false
   400  	}
   401  	// Now remove that member, set the policy, and check that it's empty again.
   402  	policy.Remove(member, iam.Viewer)
   403  	if err := h.SetPolicy(ctx, policy); err != nil {
   404  		return fmt.Sprintf("SetPolicy: %v", err), false
   405  	}
   406  	if policy, err = h.Policy(ctx); err != nil {
   407  		return fmt.Sprintf("Policy: %v", err), false
   408  	}
   409  	if got := policy.Roles(); len(got) > 0 {
   410  		return fmt.Sprintf("after Remove: got roles %v, want none", got), false
   411  	}
   412  	// Call TestPermissions.
   413  	// Because this user is an admin, it has all the permissions on the
   414  	// resource type. Note: the service fails if we ask for inapplicable
   415  	// permissions (e.g. a subscription permission on a topic, or a topic
   416  	// create permission on a topic rather than its parent).
   417  	wantPerms := []string{permission}
   418  	gotPerms, err := h.TestPermissions(ctx, wantPerms)
   419  	if err != nil {
   420  		return fmt.Sprintf("TestPermissions: %v", err), false
   421  	}
   422  	if !testutil.Equal(gotPerms, wantPerms) {
   423  		return fmt.Sprintf("TestPermissions: got %v, want %v", gotPerms, wantPerms), false
   424  	}
   425  	return "", true
   426  }
   427  
   428  func TestIntegration_LargePublishSize(t *testing.T) {
   429  	ctx := context.Background()
   430  	client := integrationTestClient(ctx, t)
   431  	defer client.Close()
   432  
   433  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
   434  	if err != nil {
   435  		t.Fatalf("CreateTopic error: %v", err)
   436  	}
   437  	defer topic.Delete(ctx)
   438  	defer topic.Stop()
   439  
   440  	// Calculate the largest possible message length that is still valid.
   441  	// First, calculate the max length of the encoded message accounting for the topic name.
   442  	length := MaxPublishRequestBytes - calcFieldSizeString(topic.String())
   443  	// Next, account for the overhead from encoding an individual PubsubMessage,
   444  	// and the inner PubsubMessage.Data field.
   445  	pbMsgOverhead := 1 + protowire.SizeVarint(uint64(length))
   446  	dataOverhead := 1 + protowire.SizeVarint(uint64(length-pbMsgOverhead))
   447  	maxLengthSingleMessage := length - pbMsgOverhead - dataOverhead
   448  
   449  	publishReq := &pb.PublishRequest{
   450  		Topic: topic.String(),
   451  		Messages: []*pb.PubsubMessage{
   452  			{
   453  				Data: bytes.Repeat([]byte{'A'}, maxLengthSingleMessage),
   454  			},
   455  		},
   456  	}
   457  
   458  	if got := proto.Size(publishReq); got != MaxPublishRequestBytes {
   459  		t.Fatalf("Created request size of %d bytes,\nwant %f bytes", got, MaxPublishRequestBytes)
   460  	}
   461  
   462  	// Publishing the max length message by itself should succeed.
   463  	msg := &Message{
   464  		Data: bytes.Repeat([]byte{'A'}, maxLengthSingleMessage),
   465  	}
   466  	topic.PublishSettings.FlowControlSettings.LimitExceededBehavior = FlowControlSignalError
   467  	r := topic.Publish(ctx, msg)
   468  	if _, err := r.Get(ctx); err != nil {
   469  		t.Fatalf("Failed to publish max length message: %v", err)
   470  	}
   471  
   472  	// Publish a small message first and make sure the max length message
   473  	// is added to its own bundle.
   474  	smallMsg := &Message{
   475  		Data: []byte{'A'},
   476  	}
   477  	topic.Publish(ctx, smallMsg)
   478  	r = topic.Publish(ctx, msg)
   479  	if _, err := r.Get(ctx); err != nil {
   480  		t.Fatalf("Failed to publish max length message after a small message: %v", err)
   481  	}
   482  
   483  	// Increase the data byte string by 1 byte, which should cause the request to fail,
   484  	// specifically due to exceeding the bundle byte limit.
   485  	msg.Data = append(msg.Data, 'A')
   486  	r = topic.Publish(ctx, msg)
   487  	if _, err := r.Get(ctx); err != ErrOversizedMessage {
   488  		t.Fatalf("Should throw item size too large error, got %v", err)
   489  	}
   490  }
   491  
   492  func TestIntegration_CancelReceive(t *testing.T) {
   493  	t.Parallel()
   494  	ctx := context.Background()
   495  	client := integrationTestClient(ctx, t)
   496  	defer client.Close()
   497  
   498  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
   499  	if err != nil {
   500  		t.Errorf("failed to create topic: %v", err)
   501  	}
   502  	defer topic.Delete(ctx)
   503  	defer topic.Stop()
   504  
   505  	var sub *Subscription
   506  	if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil {
   507  		t.Fatalf("failed to create subscription: %v", err)
   508  	}
   509  	defer sub.Delete(ctx)
   510  
   511  	ctx, cancel := context.WithCancel(context.Background())
   512  	sub.ReceiveSettings.MaxOutstandingMessages = -1
   513  	sub.ReceiveSettings.MaxOutstandingBytes = -1
   514  	sub.ReceiveSettings.NumGoroutines = 1
   515  
   516  	doneReceiving := make(chan struct{})
   517  
   518  	// Publish the messages.
   519  	go func() {
   520  		for {
   521  			select {
   522  			case <-doneReceiving:
   523  				return
   524  			default:
   525  				topic.Publish(ctx, &Message{Data: []byte("some msg")})
   526  				time.Sleep(time.Second)
   527  			}
   528  		}
   529  	}()
   530  
   531  	go func() {
   532  		err = sub.Receive(ctx, func(_ context.Context, msg *Message) {
   533  			cancel()
   534  			time.AfterFunc(5*time.Second, msg.Ack)
   535  		})
   536  		close(doneReceiving)
   537  	}()
   538  
   539  	select {
   540  	case <-time.After(60 * time.Second):
   541  		t.Fatalf("Waited 60 seconds for Receive to finish, should have finished sooner")
   542  	case <-doneReceiving:
   543  	}
   544  }
   545  
   546  func TestIntegration_CreateSubscription_NeverExpire(t *testing.T) {
   547  	t.Parallel()
   548  	ctx := context.Background()
   549  	client := integrationTestClient(ctx, t)
   550  	defer client.Close()
   551  
   552  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
   553  	if err != nil {
   554  		t.Fatalf("CreateTopic error: %v", err)
   555  	}
   556  	defer topic.Delete(ctx)
   557  	defer topic.Stop()
   558  
   559  	cfg := SubscriptionConfig{
   560  		Topic:            topic,
   561  		ExpirationPolicy: time.Duration(0),
   562  	}
   563  	var sub *Subscription
   564  	if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
   565  		t.Fatalf("CreateSub error: %v", err)
   566  	}
   567  	defer sub.Delete(ctx)
   568  
   569  	got, err := sub.Config(ctx)
   570  	if err != nil {
   571  		t.Fatal(err)
   572  	}
   573  	want := time.Duration(0)
   574  	if got.ExpirationPolicy != want {
   575  		t.Fatalf("config.ExpirationPolicy mismatch, got: %v, want: %v\n", got.ExpirationPolicy, want)
   576  	}
   577  }
   578  
   579  // findServiceAccountEmail tries to find the service account using testutil
   580  // JWTConfig as well as the ADC credentials. It will only invoke t.Skip if
   581  // it successfully retrieves credentials but finds a blank JWTConfig JSON blob.
   582  // For all other errors, it will invoke t.Fatal.
   583  func findServiceAccountEmail(ctx context.Context, t *testing.T) string {
   584  	jwtConf, err := testutil.JWTConfig()
   585  	if err == nil && jwtConf != nil {
   586  		return jwtConf.Email
   587  	}
   588  	creds := testutil.Credentials(ctx, ScopePubSub, ScopeCloudPlatform)
   589  	if creds == nil {
   590  		t.Fatal("Failed to retrieve credentials")
   591  	}
   592  	if len(creds.JSON) == 0 {
   593  		t.Skip("No JWTConfig JSON was present so can't get serviceAccountEmail")
   594  	}
   595  	jwtConf, err = google.JWTConfigFromJSON(creds.JSON)
   596  	if err != nil {
   597  		if strings.Contains(err.Error(), "authorized_user") {
   598  			t.Skip("Found ADC user so can't get serviceAccountEmail")
   599  		}
   600  		t.Fatalf("Failed to parse Google JWTConfig from JSON: %v", err)
   601  	}
   602  	return jwtConf.Email
   603  }
   604  
   605  func TestIntegration_UpdateSubscription(t *testing.T) {
   606  	t.Parallel()
   607  	ctx := context.Background()
   608  
   609  	client := integrationTestClient(ctx, t)
   610  	defer client.Close()
   611  
   612  	serviceAccountEmail := findServiceAccountEmail(ctx, t)
   613  
   614  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
   615  	if err != nil {
   616  		t.Fatalf("CreateTopic error: %v", err)
   617  	}
   618  	defer topic.Delete(ctx)
   619  	defer topic.Stop()
   620  
   621  	var sub *Subscription
   622  	projID := testutil.ProjID()
   623  	sCfg := SubscriptionConfig{
   624  		Topic: topic,
   625  		PushConfig: PushConfig{
   626  			Endpoint: "https://" + projID + ".appspot.com/_ah/push-handlers/push",
   627  			AuthenticationMethod: &OIDCToken{
   628  				Audience:            "client-12345",
   629  				ServiceAccountEmail: serviceAccountEmail,
   630  			},
   631  		},
   632  	}
   633  	if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), sCfg); err != nil {
   634  		t.Fatalf("CreateSub error: %v", err)
   635  	}
   636  	defer sub.Delete(ctx)
   637  
   638  	got, err := sub.Config(ctx)
   639  	if err != nil {
   640  		t.Fatal(err)
   641  	}
   642  	want := SubscriptionConfig{
   643  		Topic:               topic,
   644  		AckDeadline:         10 * time.Second,
   645  		RetainAckedMessages: false,
   646  		RetentionDuration:   defaultRetentionDuration,
   647  		ExpirationPolicy:    defaultExpirationPolicy,
   648  		PushConfig: PushConfig{
   649  			Endpoint: "https://" + projID + ".appspot.com/_ah/push-handlers/push",
   650  			AuthenticationMethod: &OIDCToken{
   651  				Audience:            "client-12345",
   652  				ServiceAccountEmail: serviceAccountEmail,
   653  			},
   654  		},
   655  		State: SubscriptionStateActive,
   656  	}
   657  	opt := cmpopts.IgnoreUnexported(SubscriptionConfig{})
   658  	if diff := testutil.Diff(got, want, opt); diff != "" {
   659  		t.Fatalf("\ngot: - want: +\n%s", diff)
   660  	}
   661  	// Add a PushConfig and change other fields.
   662  	pc := PushConfig{
   663  		Endpoint:   "https://" + projID + ".appspot.com/_ah/push-handlers/push",
   664  		Attributes: map[string]string{"x-goog-version": "v1"},
   665  		AuthenticationMethod: &OIDCToken{
   666  			Audience:            "client-updated-54321",
   667  			ServiceAccountEmail: serviceAccountEmail,
   668  		},
   669  	}
   670  	got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
   671  		PushConfig:          &pc,
   672  		AckDeadline:         2 * time.Minute,
   673  		RetainAckedMessages: true,
   674  		RetentionDuration:   2 * time.Hour,
   675  		Labels:              map[string]string{"label": "value"},
   676  		ExpirationPolicy:    25 * time.Hour,
   677  	})
   678  	if err != nil {
   679  		t.Fatal(err)
   680  	}
   681  	want = SubscriptionConfig{
   682  		Topic:               topic,
   683  		PushConfig:          pc,
   684  		AckDeadline:         2 * time.Minute,
   685  		RetainAckedMessages: true,
   686  		RetentionDuration:   2 * time.Hour,
   687  		Labels:              map[string]string{"label": "value"},
   688  		ExpirationPolicy:    25 * time.Hour,
   689  		State:               SubscriptionStateActive,
   690  	}
   691  
   692  	if !testutil.Equal(got, want, opt) {
   693  		t.Fatalf("\ngot  %+v\nwant %+v", got, want)
   694  	}
   695  
   696  	// Update ExpirationPolicy to never expire.
   697  	got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
   698  		ExpirationPolicy: time.Duration(0),
   699  	})
   700  	if err != nil {
   701  		t.Fatal(err)
   702  	}
   703  	want.ExpirationPolicy = time.Duration(0)
   704  
   705  	if !testutil.Equal(got, want, opt) {
   706  		t.Fatalf("\ngot  %+v\nwant %+v", got, want)
   707  	}
   708  
   709  	// Remove the PushConfig, turning the subscription back into pull mode.
   710  	// Change AckDeadline, remove labels.
   711  	pc = PushConfig{}
   712  	got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
   713  		PushConfig:  &pc,
   714  		AckDeadline: 30 * time.Second,
   715  		Labels:      map[string]string{},
   716  	})
   717  	if err != nil {
   718  		t.Fatal(err)
   719  	}
   720  	want.PushConfig = pc
   721  	want.AckDeadline = 30 * time.Second
   722  	want.Labels = nil
   723  	// service issue: PushConfig attributes are not removed.
   724  	// TODO(jba): remove when issue resolved.
   725  	want.PushConfig.Attributes = map[string]string{"x-goog-version": "v1"}
   726  	if !testutil.Equal(got, want, opt) {
   727  		t.Fatalf("\ngot  %+v\nwant %+v", got, want)
   728  	}
   729  	// If nothing changes, our client returns an error.
   730  	_, err = sub.Update(ctx, SubscriptionConfigToUpdate{})
   731  	if err == nil {
   732  		t.Fatal("got nil, wanted error")
   733  	}
   734  }
   735  
   736  // publishSync is a utility function for publishing a message and
   737  // blocking until the message has been confirmed.
   738  func publishSync(ctx context.Context, t *testing.T, topic *Topic, msg *Message) {
   739  	res := topic.Publish(ctx, msg)
   740  	_, err := res.Get(ctx)
   741  	if err != nil {
   742  		t.Fatalf("publishSync err: %v", err)
   743  	}
   744  }
   745  
   746  func TestIntegration_UpdateSubscription_ExpirationPolicy(t *testing.T) {
   747  	t.Parallel()
   748  	ctx := context.Background()
   749  	client := integrationTestClient(ctx, t)
   750  	defer client.Close()
   751  
   752  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
   753  	if err != nil {
   754  		t.Fatalf("CreateTopic error: %v", err)
   755  	}
   756  	defer topic.Delete(ctx)
   757  	defer topic.Stop()
   758  
   759  	var sub *Subscription
   760  	if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil {
   761  		t.Fatalf("CreateSub error: %v", err)
   762  	}
   763  	defer sub.Delete(ctx)
   764  
   765  	// Set ExpirationPolicy within the valid range.
   766  	got, err := sub.Update(ctx, SubscriptionConfigToUpdate{
   767  		RetentionDuration: 2 * time.Hour,
   768  		ExpirationPolicy:  25 * time.Hour,
   769  		AckDeadline:       2 * time.Minute,
   770  	})
   771  	if err != nil {
   772  		t.Fatal(err)
   773  	}
   774  	want := 25 * time.Hour
   775  	if got.ExpirationPolicy != want {
   776  		t.Fatalf("config.ExpirationPolicy mismatch; got: %v, want: %v", got.ExpirationPolicy, want)
   777  	}
   778  
   779  	// ExpirationPolicy to never expire.
   780  	got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
   781  		ExpirationPolicy: time.Duration(0),
   782  	})
   783  	if err != nil {
   784  		t.Fatalf("Unexpected error: %v\n", err)
   785  	}
   786  	want = time.Duration(0)
   787  	if diff := testutil.Diff(got.ExpirationPolicy, want); diff != "" {
   788  		t.Fatalf("\ngot: - want: +\n%s", diff)
   789  	}
   790  
   791  	// ExpirationPolicy when nil is passed in, should not cause any updates.
   792  	got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
   793  		ExpirationPolicy: nil,
   794  	})
   795  	if err == nil || err.Error() != "pubsub: UpdateSubscription call with nothing to update" {
   796  		t.Fatalf("Expected no attributes to be updated, error: %v", err)
   797  	}
   798  
   799  	// ExpirationPolicy of nil, with the previous value having been a non-zero value.
   800  	_, err = sub.Update(ctx, SubscriptionConfigToUpdate{
   801  		ExpirationPolicy: 26 * time.Hour,
   802  	})
   803  	if err != nil {
   804  		t.Fatal(err)
   805  	}
   806  	// Now examine what setting it to nil produces.
   807  	_, err = sub.Update(ctx, SubscriptionConfigToUpdate{
   808  		ExpirationPolicy: nil,
   809  	})
   810  	if err == nil || err.Error() != "pubsub: UpdateSubscription call with nothing to update" {
   811  		t.Fatalf("Expected no attributes to be updated, error: %v", err)
   812  	}
   813  }
   814  
   815  // NOTE: This test should be skipped by open source contributors. It requires
   816  // allowlisting, a (gsuite) organization project, and specific permissions.
   817  func TestIntegration_UpdateTopicLabels(t *testing.T) {
   818  	t.Parallel()
   819  	ctx := context.Background()
   820  	client := integrationTestClient(ctx, t)
   821  	defer client.Close()
   822  
   823  	compareConfig := func(got TopicConfig, wantLabels map[string]string) bool {
   824  		return testutil.Equal(got.Labels, wantLabels)
   825  	}
   826  
   827  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
   828  	if err != nil {
   829  		t.Fatalf("CreateTopic error: %v", err)
   830  	}
   831  	defer topic.Delete(ctx)
   832  	defer topic.Stop()
   833  
   834  	got, err := topic.Config(ctx)
   835  	if err != nil {
   836  		t.Fatal(err)
   837  	}
   838  	if !compareConfig(got, nil) {
   839  		t.Fatalf("\ngot  %+v\nwant no labels", got)
   840  	}
   841  
   842  	labels := map[string]string{"label": "value"}
   843  	got, err = topic.Update(ctx, TopicConfigToUpdate{Labels: labels})
   844  	if err != nil {
   845  		t.Fatal(err)
   846  	}
   847  	if !compareConfig(got, labels) {
   848  		t.Fatalf("\ngot  %+v\nwant labels %+v", got, labels)
   849  	}
   850  	// Remove all labels.
   851  	got, err = topic.Update(ctx, TopicConfigToUpdate{Labels: map[string]string{}})
   852  	if err != nil {
   853  		t.Fatal(err)
   854  	}
   855  	if !compareConfig(got, nil) {
   856  		t.Fatalf("\ngot  %+v\nwant no labels", got)
   857  	}
   858  }
   859  
   860  func TestIntegration_PublicTopic(t *testing.T) {
   861  	t.Parallel()
   862  	ctx := context.Background()
   863  	client := integrationTestClient(ctx, t)
   864  	defer client.Close()
   865  
   866  	sub, err := createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
   867  		Topic: client.TopicInProject("taxirides-realtime", "pubsub-public-data"),
   868  	})
   869  	if err != nil {
   870  		t.Fatal(err)
   871  	}
   872  	sub.Delete(ctx)
   873  }
   874  
   875  func TestIntegration_Errors(t *testing.T) {
   876  	// Test various edge conditions.
   877  	t.Parallel()
   878  	ctx := context.Background()
   879  	client := integrationTestClient(ctx, t)
   880  	defer client.Close()
   881  
   882  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
   883  	if err != nil {
   884  		t.Fatalf("CreateTopic error: %v", err)
   885  	}
   886  	defer topic.Delete(ctx)
   887  	defer topic.Stop()
   888  
   889  	// Out-of-range retention duration.
   890  	sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
   891  		Topic:             topic,
   892  		RetentionDuration: 1 * time.Second,
   893  	})
   894  	if want := codes.InvalidArgument; status.Code(err) != want {
   895  		t.Errorf("got <%v>, want %s", err, want)
   896  	}
   897  	if err == nil {
   898  		sub.Delete(ctx)
   899  	}
   900  
   901  	// Ack deadline less than minimum.
   902  	sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
   903  		Topic:       topic,
   904  		AckDeadline: 5 * time.Second,
   905  	})
   906  	if want := codes.Unknown; status.Code(err) != want {
   907  		t.Errorf("got <%v>, want %s", err, want)
   908  	}
   909  	if err == nil {
   910  		sub.Delete(ctx)
   911  	}
   912  
   913  	// Updating a non-existent subscription.
   914  	sub = client.Subscription(subIDs.New())
   915  	_, err = sub.Update(ctx, SubscriptionConfigToUpdate{AckDeadline: 20 * time.Second})
   916  	if want := codes.NotFound; status.Code(err) != want {
   917  		t.Errorf("got <%v>, want %s", err, want)
   918  	}
   919  	// Deleting a non-existent subscription.
   920  	err = sub.Delete(ctx)
   921  	if want := codes.NotFound; status.Code(err) != want {
   922  		t.Errorf("got <%v>, want %s", err, want)
   923  	}
   924  
   925  	// Updating out-of-range retention duration.
   926  	sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{Topic: topic})
   927  	if err != nil {
   928  		t.Fatal(err)
   929  	}
   930  	defer sub.Delete(ctx)
   931  	_, err = sub.Update(ctx, SubscriptionConfigToUpdate{RetentionDuration: 1000 * time.Hour})
   932  	if want := codes.InvalidArgument; status.Code(err) != want {
   933  		t.Errorf("got <%v>, want %s", err, want)
   934  	}
   935  }
   936  
   937  func TestIntegration_MessageStoragePolicy_TopicLevel(t *testing.T) {
   938  	t.Parallel()
   939  	ctx := context.Background()
   940  	client := integrationTestClient(ctx, t)
   941  	defer client.Close()
   942  
   943  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
   944  	if err != nil {
   945  		t.Fatalf("CreateTopic error: %v", err)
   946  	}
   947  	defer topic.Delete(ctx)
   948  	defer topic.Stop()
   949  
   950  	// Specify some regions to set.
   951  	regions := []string{"asia-east1", "us-east1"}
   952  	cfg, err := topic.Update(ctx, TopicConfigToUpdate{
   953  		MessageStoragePolicy: &MessageStoragePolicy{
   954  			AllowedPersistenceRegions: regions,
   955  		},
   956  	})
   957  	if err != nil {
   958  		t.Fatal(err)
   959  	}
   960  	got := cfg.MessageStoragePolicy.AllowedPersistenceRegions
   961  	want := regions
   962  	if !testutil.Equal(got, want) {
   963  		t.Fatalf("\ngot  %+v\nwant regions%+v", got, want)
   964  	}
   965  
   966  	// Removing all regions should fail
   967  	updateCfg := TopicConfigToUpdate{
   968  		MessageStoragePolicy: &MessageStoragePolicy{
   969  			AllowedPersistenceRegions: []string{},
   970  		},
   971  	}
   972  	if _, err = topic.Update(ctx, updateCfg); err == nil {
   973  		t.Fatalf("Unexpected succeeded in removing all regions\n%+v\n", got)
   974  	}
   975  }
   976  
   977  // NOTE: This test should be skipped by open source contributors. It requires
   978  // a (gsuite) organization project, and specific permissions. The test for MessageStoragePolicy
   979  // on a topic level can be run on any topic and is covered by the previous test.
   980  //
   981  // Googlers, see internal bug 77920644. Furthermore, be sure to add your
   982  // service account as an owner of ps-geofencing-test.
   983  func TestIntegration_MessageStoragePolicy_ProjectLevel(t *testing.T) {
   984  	// Verify that the message storage policy is populated.
   985  	if testing.Short() {
   986  		t.Skip("Integration tests skipped in short mode")
   987  	}
   988  	t.Parallel()
   989  	ctx := context.Background()
   990  	// If a message storage policy is not set on a topic, the policy depends on the Resource Location
   991  	// Restriction which is specified on an organization level. The usual testing project is in the
   992  	// google.com org, which has no resource location restrictions. Use a project in another org that
   993  	// does have a restriction set ("us-east1").
   994  	projID := "ps-geofencing-test"
   995  	// We can use the same creds as always because the service account of the default testing project
   996  	// has permission to use the above project. This test will fail if a different service account
   997  	// is used for testing.
   998  	ts := testutil.TokenSource(ctx, ScopePubSub, ScopeCloudPlatform)
   999  	if ts == nil {
  1000  		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
  1001  	}
  1002  	opts := withGRPCHeadersAssertion(t, option.WithTokenSource(ts))
  1003  	client, err := NewClient(ctx, projID, opts...)
  1004  	if err != nil {
  1005  		t.Fatalf("Creating client error: %v", err)
  1006  	}
  1007  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1008  	if err != nil {
  1009  		t.Fatalf("CreateTopic error: %v", err)
  1010  	}
  1011  	defer topic.Delete(ctx)
  1012  	defer topic.Stop()
  1013  
  1014  	config, err := topic.Config(ctx)
  1015  	if err != nil {
  1016  		t.Fatal(err)
  1017  	}
  1018  	got := config.MessageStoragePolicy.AllowedPersistenceRegions
  1019  	want := []string{"us-east1"}
  1020  	if !testutil.Equal(got, want) {
  1021  		t.Errorf("got %v, want %v", got, want)
  1022  	}
  1023  }
  1024  
  1025  func TestIntegration_CreateTopic_KMS(t *testing.T) {
  1026  	t.Parallel()
  1027  	ctx := context.Background()
  1028  	client := integrationTestClient(ctx, t)
  1029  	defer client.Close()
  1030  
  1031  	kmsClient, err := kms.NewKeyManagementClient(ctx)
  1032  	if err != nil {
  1033  		t.Fatal(err)
  1034  	}
  1035  
  1036  	keyRingID := "test-key-ring"
  1037  	want := "test-key2"
  1038  
  1039  	// Get the test KMS key ring, optionally creating it if it doesn't exist.
  1040  	keyRing, err := kmsClient.GetKeyRing(ctx, &kmspb.GetKeyRingRequest{
  1041  		Name: fmt.Sprintf("projects/%s/locations/global/keyRings/%s", testutil.ProjID(), keyRingID),
  1042  	})
  1043  	if err != nil {
  1044  		if status.Code(err) != codes.NotFound {
  1045  			t.Fatal(err)
  1046  		}
  1047  		createKeyRingReq := &kmspb.CreateKeyRingRequest{
  1048  			Parent:    fmt.Sprintf("projects/%s/locations/global", testutil.ProjID()),
  1049  			KeyRingId: keyRingID,
  1050  		}
  1051  		keyRing, err = kmsClient.CreateKeyRing(ctx, createKeyRingReq)
  1052  		if err != nil {
  1053  			t.Fatal(err)
  1054  		}
  1055  	}
  1056  
  1057  	// Get the test KMS crypto key, optionally creating it if it doesn't exist.
  1058  	key, err := kmsClient.GetCryptoKey(ctx, &kmspb.GetCryptoKeyRequest{
  1059  		Name: fmt.Sprintf("%s/cryptoKeys/%s", keyRing.GetName(), want),
  1060  	})
  1061  	if err != nil {
  1062  		if status.Code(err) != codes.NotFound {
  1063  			t.Fatal(err)
  1064  		}
  1065  		createKeyReq := &kmspb.CreateCryptoKeyRequest{
  1066  			Parent:      keyRing.GetName(),
  1067  			CryptoKeyId: want,
  1068  			CryptoKey: &kmspb.CryptoKey{
  1069  				Purpose: 1, // ENCRYPT_DECRYPT purpose
  1070  			},
  1071  		}
  1072  		key, err = kmsClient.CreateCryptoKey(ctx, createKeyReq)
  1073  		if err != nil {
  1074  			t.Fatal(err)
  1075  		}
  1076  	}
  1077  
  1078  	tc := TopicConfig{
  1079  		KMSKeyName: key.GetName(),
  1080  	}
  1081  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), &tc)
  1082  	if err != nil {
  1083  		t.Fatalf("CreateTopicWithConfig error: %v", err)
  1084  	}
  1085  	defer topic.Delete(ctx)
  1086  	defer topic.Stop()
  1087  
  1088  	cfg, err := topic.Config(ctx)
  1089  	if err != nil {
  1090  		t.Fatal(err)
  1091  	}
  1092  	got := cfg.KMSKeyName
  1093  
  1094  	if got != key.GetName() {
  1095  		t.Errorf("got %v, want %v", got, key.GetName())
  1096  	}
  1097  }
  1098  
  1099  func TestIntegration_CreateTopic_MessageStoragePolicy(t *testing.T) {
  1100  	t.Parallel()
  1101  	ctx := context.Background()
  1102  	client := integrationTestClient(ctx, t)
  1103  	defer client.Close()
  1104  
  1105  	tc := TopicConfig{
  1106  		MessageStoragePolicy: MessageStoragePolicy{
  1107  			AllowedPersistenceRegions: []string{"us-east1"},
  1108  		},
  1109  	}
  1110  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), &tc)
  1111  	if err != nil {
  1112  		t.Fatalf("CreateTopicWithConfig error: %v", err)
  1113  	}
  1114  	defer topic.Delete(ctx)
  1115  	defer topic.Stop()
  1116  
  1117  	got, err := topic.Config(ctx)
  1118  	if err != nil {
  1119  		t.Fatal(err)
  1120  	}
  1121  	want := tc
  1122  	if diff := testutil.Diff(got.MessageStoragePolicy, want.MessageStoragePolicy); diff != "" {
  1123  		t.Fatalf("\ngot: - want: +\n%s", diff)
  1124  	}
  1125  }
  1126  
  1127  func TestIntegration_OrderedKeys_Basic(t *testing.T) {
  1128  	ctx := context.Background()
  1129  	client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
  1130  	defer client.Close()
  1131  
  1132  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1133  	if err != nil {
  1134  		t.Fatal(err)
  1135  	}
  1136  	defer topic.Delete(ctx)
  1137  	defer topic.Stop()
  1138  	exists, err := topic.Exists(ctx)
  1139  	if err != nil {
  1140  		t.Fatal(err)
  1141  	}
  1142  	if !exists {
  1143  		t.Fatalf("topic %v should exist, but it doesn't", topic)
  1144  	}
  1145  	var sub *Subscription
  1146  	if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
  1147  		Topic:                 topic,
  1148  		EnableMessageOrdering: true,
  1149  	}); err != nil {
  1150  		t.Fatal(err)
  1151  	}
  1152  	defer sub.Delete(ctx)
  1153  	exists, err = sub.Exists(ctx)
  1154  	if err != nil {
  1155  		t.Fatal(err)
  1156  	}
  1157  	if !exists {
  1158  		t.Fatalf("subscription %s should exist, but it doesn't", sub.ID())
  1159  	}
  1160  
  1161  	topic.PublishSettings.DelayThreshold = time.Second
  1162  	topic.EnableMessageOrdering = true
  1163  
  1164  	orderingKey := "some-ordering-key"
  1165  	numItems := 1000
  1166  	for i := 0; i < numItems; i++ {
  1167  		r := topic.Publish(ctx, &Message{
  1168  			ID:          fmt.Sprintf("id-%d", i),
  1169  			Data:        []byte(fmt.Sprintf("item-%d", i)),
  1170  			OrderingKey: orderingKey,
  1171  		})
  1172  		go func() {
  1173  			if _, err := r.Get(ctx); err != nil {
  1174  				t.Error(err)
  1175  			}
  1176  		}()
  1177  	}
  1178  
  1179  	received := make(chan string, numItems)
  1180  	ctx2, cancel := context.WithCancel(ctx)
  1181  	go func() {
  1182  		for i := 0; i < numItems; i++ {
  1183  			select {
  1184  			case r := <-received:
  1185  				if got, want := r, fmt.Sprintf("item-%d", i); got != want {
  1186  					t.Errorf("%d: got %s, want %s", i, got, want)
  1187  				}
  1188  			case <-time.After(30 * time.Second):
  1189  				t.Errorf("timed out after 30s waiting for item %d", i)
  1190  				cancel()
  1191  			}
  1192  		}
  1193  		cancel()
  1194  	}()
  1195  
  1196  	if err := sub.Receive(ctx2, func(ctx context.Context, msg *Message) {
  1197  		defer msg.Ack()
  1198  		if msg.OrderingKey != orderingKey {
  1199  			t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
  1200  		}
  1201  
  1202  		received <- string(msg.Data)
  1203  	}); err != nil {
  1204  		if c := status.Code(err); c != codes.Canceled {
  1205  			t.Error(err)
  1206  		}
  1207  	}
  1208  }
  1209  
  1210  func TestIntegration_OrderedKeys_JSON(t *testing.T) {
  1211  	ctx := context.Background()
  1212  	client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
  1213  	defer client.Close()
  1214  
  1215  	testutil.Retry(t, 2, 1*time.Second, func(r *testutil.R) {
  1216  		topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1217  		if err != nil {
  1218  			r.Errorf("createTopicWithRetry err: %v", err)
  1219  		}
  1220  		defer topic.Delete(ctx)
  1221  		defer topic.Stop()
  1222  		exists, err := topic.Exists(ctx)
  1223  		if err != nil {
  1224  			r.Errorf("topic.Exists err: %v", err)
  1225  		}
  1226  		if !exists {
  1227  			r.Errorf("topic %v should exist, but it doesn't", topic)
  1228  		}
  1229  		var sub *Subscription
  1230  		if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
  1231  			Topic:                 topic,
  1232  			EnableMessageOrdering: true,
  1233  		}); err != nil {
  1234  			r.Errorf("creteSubWithRetry err: %v", err)
  1235  		}
  1236  		defer sub.Delete(ctx)
  1237  		exists, err = sub.Exists(ctx)
  1238  		if err != nil {
  1239  			r.Errorf("sub.Exists err: %v", err)
  1240  		}
  1241  		if !exists {
  1242  			r.Errorf("subscription %s should exist, but it doesn't", sub.ID())
  1243  		}
  1244  
  1245  		topic.PublishSettings.DelayThreshold = time.Second
  1246  		topic.EnableMessageOrdering = true
  1247  
  1248  		inFile, err := os.Open("testdata/publish.csv")
  1249  		if err != nil {
  1250  			r.Errorf("os.Open err: %v", err)
  1251  		}
  1252  		defer inFile.Close()
  1253  
  1254  		mu := sync.Mutex{}
  1255  		var publishData []testutil2.OrderedKeyMsg
  1256  		var receiveData []testutil2.OrderedKeyMsg
  1257  		// Keep track of duplicate messages to avoid negative waitgroup counter.
  1258  		receiveSet := make(map[string]struct{})
  1259  
  1260  		wg := sync.WaitGroup{}
  1261  		scanner := bufio.NewScanner(inFile)
  1262  		for scanner.Scan() {
  1263  			line := scanner.Text()
  1264  			// TODO: use strings.ReplaceAll once we only support 1.11+.
  1265  			line = strings.Replace(line, "\"", "", -1)
  1266  			parts := strings.Split(line, ",")
  1267  			key := parts[0]
  1268  			msg := parts[1]
  1269  			publishData = append(publishData, testutil2.OrderedKeyMsg{Key: key, Data: msg})
  1270  			res := topic.Publish(ctx, &Message{
  1271  				Data:        []byte(msg),
  1272  				OrderingKey: key,
  1273  			})
  1274  			go func() {
  1275  				_, err := res.Get(ctx)
  1276  				if err != nil {
  1277  					// Can't fail inside goroutine, so just log the error.
  1278  					r.Logf("publish error for message(%s): %v", msg, err)
  1279  				}
  1280  			}()
  1281  			wg.Add(1)
  1282  		}
  1283  		if err := scanner.Err(); err != nil {
  1284  			r.Errorf("scanner.Err(): %v", err)
  1285  		}
  1286  
  1287  		go func() {
  1288  			sub.Receive(ctx, func(ctx context.Context, msg *Message) {
  1289  				mu.Lock()
  1290  				defer mu.Unlock()
  1291  				// Messages are deduped using the data field, since in this case all
  1292  				// messages are unique.
  1293  				if _, ok := receiveSet[string(msg.Data)]; ok {
  1294  					r.Logf("received duplicate message: %s", msg.Data)
  1295  					return
  1296  				}
  1297  				receiveSet[string(msg.Data)] = struct{}{}
  1298  				receiveData = append(receiveData, testutil2.OrderedKeyMsg{Key: msg.OrderingKey, Data: string(msg.Data)})
  1299  				wg.Done()
  1300  				msg.Ack()
  1301  			})
  1302  		}()
  1303  
  1304  		done := make(chan struct{})
  1305  		go func() {
  1306  			wg.Wait()
  1307  			close(done)
  1308  		}()
  1309  
  1310  		select {
  1311  		case <-done:
  1312  		case <-time.After(2 * time.Minute):
  1313  			r.Errorf("timed out after 2m waiting for all messages to be received")
  1314  		}
  1315  
  1316  		mu.Lock()
  1317  		defer mu.Unlock()
  1318  		if err := testutil2.VerifyKeyOrdering(publishData, receiveData); err != nil {
  1319  			r.Errorf("VerifyKeyOrdering error: %v", err)
  1320  		}
  1321  	})
  1322  }
  1323  
  1324  func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
  1325  	ctx := context.Background()
  1326  	client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
  1327  	defer client.Close()
  1328  
  1329  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1330  	if err != nil {
  1331  		t.Fatal(err)
  1332  	}
  1333  	defer topic.Delete(ctx)
  1334  	defer topic.Stop()
  1335  	exists, err := topic.Exists(ctx)
  1336  	if err != nil {
  1337  		t.Fatal(err)
  1338  	}
  1339  	if !exists {
  1340  		t.Fatalf("topic %v should exist, but it doesn't", topic)
  1341  	}
  1342  
  1343  	topic.PublishSettings.BufferedByteLimit = 100
  1344  	topic.EnableMessageOrdering = true
  1345  
  1346  	orderingKey := "some-ordering-key2"
  1347  	// Publish a message that is too large so we'll get an error that
  1348  	// pauses publishing for this ordering key.
  1349  	r := topic.Publish(ctx, &Message{
  1350  		Data:        bytes.Repeat([]byte("A"), 1000),
  1351  		OrderingKey: orderingKey,
  1352  	})
  1353  	if _, err := r.Get(ctx); err == nil {
  1354  		t.Fatalf("expected bundle byte limit error, got nil")
  1355  	}
  1356  	// Publish a normal sized message now, which should fail
  1357  	// since publishing on this ordering key is paused.
  1358  	r = topic.Publish(ctx, &Message{
  1359  		Data:        []byte("should fail"),
  1360  		OrderingKey: orderingKey,
  1361  	})
  1362  	if _, err := r.Get(ctx); err == nil || !errors.As(err, &ErrPublishingPaused{}) {
  1363  		t.Fatalf("expected ordering keys publish error, got %v", err)
  1364  	}
  1365  
  1366  	// Lastly, call ResumePublish and make sure subsequent publishes succeed.
  1367  	topic.ResumePublish(orderingKey)
  1368  	r = topic.Publish(ctx, &Message{
  1369  		Data:        []byte("should succeed"),
  1370  		OrderingKey: orderingKey,
  1371  	})
  1372  	if _, err := r.Get(ctx); err != nil {
  1373  		t.Fatalf("got error while publishing message: %v", err)
  1374  	}
  1375  }
  1376  
  1377  // TestIntegration_OrderedKeys_SubscriptionOrdering tests that messages
  1378  // with ordering keys are not processed as such if the subscription
  1379  // does not have message ordering enabled.
  1380  func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) {
  1381  	ctx := context.Background()
  1382  	client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
  1383  	defer client.Close()
  1384  
  1385  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1386  	if err != nil {
  1387  		t.Fatal(err)
  1388  	}
  1389  	defer topic.Delete(ctx)
  1390  	defer topic.Stop()
  1391  	exists, err := topic.Exists(ctx)
  1392  	if err != nil {
  1393  		t.Fatal(err)
  1394  	}
  1395  	if !exists {
  1396  		t.Fatalf("topic %v should exist, but it doesn't", topic)
  1397  	}
  1398  	topic.EnableMessageOrdering = true
  1399  
  1400  	// Explicitly disable message ordering on the subscription.
  1401  	enableMessageOrdering := false
  1402  	subCfg := SubscriptionConfig{
  1403  		Topic:                 topic,
  1404  		EnableMessageOrdering: enableMessageOrdering,
  1405  	}
  1406  	sub, err := createSubWithRetry(ctx, t, client, subIDs.New(), subCfg)
  1407  	if err != nil {
  1408  		t.Fatal(err)
  1409  	}
  1410  	defer sub.Delete(ctx)
  1411  
  1412  	publishSync(ctx, t, topic, &Message{
  1413  		Data:        []byte("message-1"),
  1414  		OrderingKey: "ordering-key-1",
  1415  	})
  1416  
  1417  	publishSync(ctx, t, topic, &Message{
  1418  		Data:        []byte("message-2"),
  1419  		OrderingKey: "ordering-key-1",
  1420  	})
  1421  
  1422  	sub.ReceiveSettings.Synchronous = true
  1423  	ctx2, cancel := context.WithTimeout(ctx, 12*time.Second)
  1424  	defer cancel()
  1425  
  1426  	var numAcked int32
  1427  	sub.Receive(ctx2, func(_ context.Context, msg *Message) {
  1428  		// Create artificial constraints on message processing time.
  1429  		if string(msg.Data) == "message-1" {
  1430  			time.Sleep(10 * time.Second)
  1431  		} else {
  1432  			time.Sleep(5 * time.Second)
  1433  		}
  1434  		msg.Ack()
  1435  		atomic.AddInt32(&numAcked, 1)
  1436  	})
  1437  	// If the messages were received on a subscription with the EnableMessageOrdering=true,
  1438  	// total processing would exceed the timeout and only one message would be processed.
  1439  	if numAcked < 2 {
  1440  		t.Fatalf("did not process all messages in time, numAcked: %d", numAcked)
  1441  	}
  1442  }
  1443  
  1444  func TestIntegration_OrderingWithExactlyOnce(t *testing.T) {
  1445  	ctx := context.Background()
  1446  	client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
  1447  	defer client.Close()
  1448  
  1449  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1450  	if err != nil {
  1451  		t.Fatal(err)
  1452  	}
  1453  	defer topic.Delete(ctx)
  1454  	defer topic.Stop()
  1455  	exists, err := topic.Exists(ctx)
  1456  	if err != nil {
  1457  		t.Fatal(err)
  1458  	}
  1459  	if !exists {
  1460  		t.Fatalf("topic %v should exist, but it doesn't", topic)
  1461  	}
  1462  	var sub *Subscription
  1463  	if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
  1464  		Topic:                     topic,
  1465  		EnableMessageOrdering:     true,
  1466  		EnableExactlyOnceDelivery: true,
  1467  	}); err != nil {
  1468  		t.Fatal(err)
  1469  	}
  1470  	defer sub.Delete(ctx)
  1471  	exists, err = sub.Exists(ctx)
  1472  	if err != nil {
  1473  		t.Fatal(err)
  1474  	}
  1475  	if !exists {
  1476  		t.Fatalf("subscription %s should exist, but it doesn't", sub.ID())
  1477  	}
  1478  
  1479  	topic.PublishSettings.DelayThreshold = time.Second
  1480  	topic.EnableMessageOrdering = true
  1481  
  1482  	orderingKey := "some-ordering-key"
  1483  	numItems := 10
  1484  	for i := 0; i < numItems; i++ {
  1485  		r := topic.Publish(ctx, &Message{
  1486  			ID:          fmt.Sprintf("id-%d", i),
  1487  			Data:        []byte(fmt.Sprintf("item-%d", i)),
  1488  			OrderingKey: orderingKey,
  1489  		})
  1490  		go func() {
  1491  			if _, err := r.Get(ctx); err != nil {
  1492  				t.Error(err)
  1493  			}
  1494  		}()
  1495  	}
  1496  
  1497  	received := make(chan string, numItems)
  1498  	ctx2, cancel := context.WithCancel(ctx)
  1499  	go func() {
  1500  		for i := 0; i < numItems; i++ {
  1501  			select {
  1502  			case r := <-received:
  1503  				if got, want := r, fmt.Sprintf("item-%d", i); got != want {
  1504  					t.Errorf("%d: got %s, want %s", i, got, want)
  1505  				}
  1506  			case <-time.After(30 * time.Second):
  1507  				t.Errorf("timed out after 30s waiting for item %d", i)
  1508  				cancel()
  1509  			}
  1510  		}
  1511  		cancel()
  1512  	}()
  1513  
  1514  	if err := sub.Receive(ctx2, func(ctx context.Context, msg *Message) {
  1515  		defer msg.Ack()
  1516  		if msg.OrderingKey != orderingKey {
  1517  			t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
  1518  		}
  1519  
  1520  		received <- string(msg.Data)
  1521  	}); err != nil {
  1522  		if c := status.Code(err); c != codes.Canceled {
  1523  			t.Error(err)
  1524  		}
  1525  	}
  1526  
  1527  }
  1528  
  1529  func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) {
  1530  	t.Parallel()
  1531  	ctx := context.Background()
  1532  	client := integrationTestClient(ctx, t)
  1533  	defer client.Close()
  1534  
  1535  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1536  	if err != nil {
  1537  		t.Fatalf("CreateTopic error: %v", err)
  1538  	}
  1539  	defer topic.Delete(ctx)
  1540  	defer topic.Stop()
  1541  
  1542  	deadLetterTopic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1543  	if err != nil {
  1544  		t.Fatalf("CreateTopic error: %v", err)
  1545  	}
  1546  	defer deadLetterTopic.Delete(ctx)
  1547  	defer deadLetterTopic.Stop()
  1548  
  1549  	// We don't set MaxDeliveryAttempts in DeadLetterPolicy so that we can test
  1550  	// that MaxDeliveryAttempts defaults properly to 5 if not set.
  1551  	cfg := SubscriptionConfig{
  1552  		Topic: topic,
  1553  		DeadLetterPolicy: &DeadLetterPolicy{
  1554  			DeadLetterTopic: deadLetterTopic.String(),
  1555  		},
  1556  	}
  1557  	var sub *Subscription
  1558  	if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
  1559  		t.Fatalf("CreateSub error: %v", err)
  1560  	}
  1561  	defer sub.Delete(ctx)
  1562  
  1563  	got, err := sub.Config(ctx)
  1564  	if err != nil {
  1565  		t.Fatal(err)
  1566  	}
  1567  	want := &DeadLetterPolicy{
  1568  		DeadLetterTopic:     deadLetterTopic.String(),
  1569  		MaxDeliveryAttempts: 5,
  1570  	}
  1571  	if diff := testutil.Diff(got.DeadLetterPolicy, want); diff != "" {
  1572  		t.Fatalf("\ngot: - want: +\n%s", diff)
  1573  	}
  1574  
  1575  	res := topic.Publish(ctx, &Message{
  1576  		Data: []byte("failed message"),
  1577  	})
  1578  	if _, err := res.Get(ctx); err != nil {
  1579  		t.Fatalf("Publish message error: %v", err)
  1580  	}
  1581  
  1582  	ctx2, cancel := context.WithCancel(ctx)
  1583  	numAttempts := 1
  1584  	err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
  1585  		if numAttempts >= 5 {
  1586  			cancel()
  1587  			m.Ack()
  1588  			return
  1589  		}
  1590  		if *m.DeliveryAttempt != numAttempts {
  1591  			t.Fatalf("Message delivery attempt: %d does not match numAttempts: %d\n", m.DeliveryAttempt, numAttempts)
  1592  		}
  1593  		numAttempts++
  1594  		m.Nack()
  1595  	})
  1596  	if err != nil {
  1597  		t.Fatalf("Streaming pull error: %v\n", err)
  1598  	}
  1599  }
  1600  
  1601  // Test that the DeliveryAttempt field is nil when dead lettering is not enabled.
  1602  func TestIntegration_DeadLetterPolicy_DeliveryAttempt(t *testing.T) {
  1603  	t.Parallel()
  1604  	ctx := context.Background()
  1605  	client := integrationTestClient(ctx, t)
  1606  	defer client.Close()
  1607  
  1608  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1609  	if err != nil {
  1610  		t.Fatalf("CreateTopic error: %v", err)
  1611  	}
  1612  	defer topic.Delete(ctx)
  1613  	defer topic.Stop()
  1614  
  1615  	cfg := SubscriptionConfig{
  1616  		Topic: topic,
  1617  	}
  1618  	var sub *Subscription
  1619  	if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
  1620  		t.Fatalf("CreateSub error: %v", err)
  1621  	}
  1622  	defer sub.Delete(ctx)
  1623  
  1624  	res := topic.Publish(ctx, &Message{
  1625  		Data: []byte("failed message"),
  1626  	})
  1627  	if _, err := res.Get(ctx); err != nil {
  1628  		t.Fatalf("Publish message error: %v", err)
  1629  	}
  1630  
  1631  	ctx2, cancel := context.WithCancel(ctx)
  1632  	err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
  1633  		defer m.Ack()
  1634  		defer cancel()
  1635  		if m.DeliveryAttempt != nil {
  1636  			t.Fatalf("DeliveryAttempt should be nil when dead lettering is disabled")
  1637  		}
  1638  	})
  1639  	if err != nil {
  1640  		t.Fatalf("Streaming pull error: %v\n", err)
  1641  	}
  1642  }
  1643  
  1644  func TestIntegration_DeadLetterPolicy_ClearDeadLetter(t *testing.T) {
  1645  	t.Parallel()
  1646  	ctx := context.Background()
  1647  	client := integrationTestClient(ctx, t)
  1648  	defer client.Close()
  1649  
  1650  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1651  	if err != nil {
  1652  		t.Fatalf("CreateTopic error: %v", err)
  1653  	}
  1654  	defer topic.Delete(ctx)
  1655  	defer topic.Stop()
  1656  
  1657  	deadLetterTopic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1658  	if err != nil {
  1659  		t.Fatalf("CreateTopic error: %v", err)
  1660  	}
  1661  	defer deadLetterTopic.Delete(ctx)
  1662  	defer deadLetterTopic.Stop()
  1663  
  1664  	cfg := SubscriptionConfig{
  1665  		Topic: topic,
  1666  		DeadLetterPolicy: &DeadLetterPolicy{
  1667  			DeadLetterTopic: deadLetterTopic.String(),
  1668  		},
  1669  	}
  1670  	var sub *Subscription
  1671  	if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
  1672  		t.Fatalf("CreateSub error: %v", err)
  1673  	}
  1674  	defer sub.Delete(ctx)
  1675  
  1676  	sub.Update(ctx, SubscriptionConfigToUpdate{
  1677  		DeadLetterPolicy: &DeadLetterPolicy{},
  1678  	})
  1679  
  1680  	got, err := sub.Config(ctx)
  1681  	if err != nil {
  1682  		t.Fatal(err)
  1683  	}
  1684  	if got.DeadLetterPolicy != nil {
  1685  		t.Fatalf("config.DeadLetterPolicy; got: %v want: nil", got.DeadLetterPolicy)
  1686  	}
  1687  }
  1688  
  1689  // TestIntegration_BadEndpoint tests that specifying a bad
  1690  // endpoint will cause an error in RPCs.
  1691  func TestIntegration_BadEndpoint(t *testing.T) {
  1692  	t.Parallel()
  1693  	ctx := context.Background()
  1694  	opts := withGRPCHeadersAssertion(t,
  1695  		option.WithEndpoint("example.googleapis.com:443"),
  1696  	)
  1697  	client := integrationTestClient(ctx, t, opts...)
  1698  	defer client.Close()
  1699  	if _, err := client.CreateTopic(ctx, topicIDs.New()); err == nil {
  1700  		t.Fatalf("CreateTopic should fail with fake endpoint, got nil err")
  1701  	}
  1702  }
  1703  
  1704  func TestIntegration_Filter_CreateSubscription(t *testing.T) {
  1705  	t.Parallel()
  1706  	ctx := context.Background()
  1707  	client := integrationTestClient(ctx, t)
  1708  	defer client.Close()
  1709  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1710  	if err != nil {
  1711  		t.Fatalf("CreateTopic error: %v", err)
  1712  	}
  1713  	defer topic.Delete(ctx)
  1714  	defer topic.Stop()
  1715  	cfg := SubscriptionConfig{
  1716  		Topic:  topic,
  1717  		Filter: "attributes.event_type = \"1\"",
  1718  	}
  1719  	var sub *Subscription
  1720  	if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
  1721  		t.Fatalf("CreateSub error: %v", err)
  1722  	}
  1723  	defer sub.Delete(ctx)
  1724  	got, err := sub.Config(ctx)
  1725  	if err != nil {
  1726  		t.Fatal(err)
  1727  	}
  1728  	want := cfg.Filter
  1729  	if got.Filter != want {
  1730  		t.Fatalf("subcfg.Filter mismatch; got: %s, want: %s", got.Filter, want)
  1731  	}
  1732  	attrs := make(map[string]string)
  1733  	attrs["event_type"] = "1"
  1734  	res := topic.Publish(ctx, &Message{
  1735  		Data:       []byte("hello world"),
  1736  		Attributes: attrs,
  1737  	})
  1738  	if _, err := res.Get(ctx); err != nil {
  1739  		t.Fatalf("Publish message error: %v", err)
  1740  	}
  1741  	// Publish the same message with a different event_type
  1742  	// and check it is filtered out.
  1743  	attrs["event_type"] = "2"
  1744  	res = topic.Publish(ctx, &Message{
  1745  		Data:       []byte("hello world"),
  1746  		Attributes: attrs,
  1747  	})
  1748  	if _, err := res.Get(ctx); err != nil {
  1749  		t.Fatalf("Publish message error: %v", err)
  1750  	}
  1751  	ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
  1752  	defer cancel()
  1753  	err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
  1754  		defer m.Ack()
  1755  		if m.Attributes["event_type"] != "1" {
  1756  			t.Fatalf("Got message with attributes that should be filtered out: %v", m.Attributes)
  1757  		}
  1758  	})
  1759  	if err != nil {
  1760  		t.Fatalf("Streaming pull error: %v\n", err)
  1761  	}
  1762  }
  1763  
  1764  func TestIntegration_RetryPolicy(t *testing.T) {
  1765  	t.Parallel()
  1766  	ctx := context.Background()
  1767  	client := integrationTestClient(ctx, t)
  1768  	defer client.Close()
  1769  
  1770  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1771  	if err != nil {
  1772  		t.Fatalf("CreateTopic error: %v", err)
  1773  	}
  1774  	defer topic.Delete(ctx)
  1775  	defer topic.Stop()
  1776  
  1777  	cfg := SubscriptionConfig{
  1778  		Topic: topic,
  1779  		RetryPolicy: &RetryPolicy{
  1780  			MinimumBackoff: 20 * time.Second,
  1781  			MaximumBackoff: 500 * time.Second,
  1782  		},
  1783  	}
  1784  	var sub *Subscription
  1785  	if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
  1786  		t.Fatalf("CreateSub error: %v", err)
  1787  	}
  1788  	defer sub.Delete(ctx)
  1789  
  1790  	got, err := sub.Config(ctx)
  1791  	if err != nil {
  1792  		t.Fatal(err)
  1793  	}
  1794  	want := SubscriptionConfig{
  1795  		Topic:               topic,
  1796  		AckDeadline:         10 * time.Second,
  1797  		RetainAckedMessages: false,
  1798  		RetentionDuration:   defaultRetentionDuration,
  1799  		ExpirationPolicy:    defaultExpirationPolicy,
  1800  		RetryPolicy: &RetryPolicy{
  1801  			MinimumBackoff: 20 * time.Second,
  1802  			MaximumBackoff: 500 * time.Second,
  1803  		},
  1804  	}
  1805  	if diff := testutil.Diff(got.RetryPolicy, want.RetryPolicy); diff != "" {
  1806  		t.Fatalf("\ngot: - want: +\n%s", diff)
  1807  	}
  1808  
  1809  	// Test clearing the RetryPolicy
  1810  	cfgToUpdate := SubscriptionConfigToUpdate{
  1811  		RetryPolicy: &RetryPolicy{},
  1812  	}
  1813  	_, err = sub.Update(ctx, cfgToUpdate)
  1814  	if err != nil {
  1815  		t.Fatalf("got error while updating sub: %v", err)
  1816  	}
  1817  
  1818  	got, err = sub.Config(ctx)
  1819  	if err != nil {
  1820  		t.Fatal(err)
  1821  	}
  1822  	want.RetryPolicy = nil
  1823  	if diff := testutil.Diff(got.RetryPolicy, want.RetryPolicy); diff != "" {
  1824  		t.Fatalf("\ngot: - want: +\n%s", diff)
  1825  	}
  1826  }
  1827  
  1828  func TestIntegration_DetachSubscription(t *testing.T) {
  1829  	t.Parallel()
  1830  	ctx := context.Background()
  1831  	client := integrationTestClient(ctx, t)
  1832  	defer client.Close()
  1833  
  1834  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  1835  	if err != nil {
  1836  		t.Fatalf("CreateTopic error: %v", err)
  1837  	}
  1838  	defer topic.Delete(ctx)
  1839  	defer topic.Stop()
  1840  
  1841  	cfg := SubscriptionConfig{
  1842  		Topic: topic,
  1843  	}
  1844  	var sub *Subscription
  1845  	if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil {
  1846  		t.Fatalf("CreateSub error: %v", err)
  1847  	}
  1848  	defer sub.Delete(ctx)
  1849  
  1850  	if _, err := client.DetachSubscription(ctx, sub.String()); err != nil {
  1851  		t.Fatalf("DetachSubscription error: %v", err)
  1852  	}
  1853  
  1854  	newSub := client.Subscription(sub.ID())
  1855  	got, err := newSub.Config(ctx)
  1856  	if err != nil {
  1857  		t.Fatalf("GetSubscription error: %v", err)
  1858  	}
  1859  	if !got.Detached {
  1860  		t.Fatal("SubscriptionConfig not detached after calling detach")
  1861  	}
  1862  }
  1863  
  1864  func TestIntegration_SchemaAdmin(t *testing.T) {
  1865  	t.Parallel()
  1866  	ctx := context.Background()
  1867  	c := integrationTestSchemaClient(ctx, t)
  1868  	defer c.Close()
  1869  
  1870  	for _, tc := range []struct {
  1871  		desc       string
  1872  		schemaType SchemaType
  1873  		path       string
  1874  	}{
  1875  		{
  1876  			desc:       "avro schema",
  1877  			schemaType: SchemaAvro,
  1878  			path:       "testdata/schema/us-states.avsc",
  1879  		},
  1880  		{
  1881  			desc:       "protocol buffer schema",
  1882  			schemaType: SchemaProtocolBuffer,
  1883  			path:       "testdata/schema/us-states.proto",
  1884  		},
  1885  	} {
  1886  		t.Run(tc.desc, func(t *testing.T) {
  1887  			content, err := ioutil.ReadFile(tc.path)
  1888  			if err != nil {
  1889  				t.Fatal(err)
  1890  			}
  1891  			schema := string(content)
  1892  			schemaID := schemaIDs.New()
  1893  			schemaPath := fmt.Sprintf("projects/%s/schemas/%s", testutil.ProjID(), schemaID)
  1894  			sc := SchemaConfig{
  1895  				Type:       tc.schemaType,
  1896  				Definition: schema,
  1897  			}
  1898  			got, err := c.CreateSchema(ctx, schemaID, sc)
  1899  			if err != nil {
  1900  				t.Fatalf("SchemaClient.CreateSchema error: %v", err)
  1901  			}
  1902  
  1903  			want := &SchemaConfig{
  1904  				Name:       schemaPath,
  1905  				Type:       tc.schemaType,
  1906  				Definition: schema,
  1907  			}
  1908  			if diff := testutil.Diff(got, want, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
  1909  				t.Fatalf("\ngot: - want: +\n%s", diff)
  1910  			}
  1911  
  1912  			got, err = c.Schema(ctx, schemaID, SchemaViewFull)
  1913  			if err != nil {
  1914  				t.Fatalf("SchemaClient.Schema error: %v", err)
  1915  			}
  1916  			if diff := testutil.Diff(got, want, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
  1917  				t.Fatalf("\ngot: - want: +\n%s", diff)
  1918  			}
  1919  
  1920  			err = c.DeleteSchema(ctx, schemaID)
  1921  			if err != nil {
  1922  				t.Fatalf("SchemaClient.DeleteSchema error: %v", err)
  1923  			}
  1924  		})
  1925  	}
  1926  }
  1927  
  1928  func TestIntegration_ValidateSchema(t *testing.T) {
  1929  	t.Parallel()
  1930  	ctx := context.Background()
  1931  	c := integrationTestSchemaClient(ctx, t)
  1932  	defer c.Close()
  1933  
  1934  	for _, tc := range []struct {
  1935  		desc       string
  1936  		schemaType SchemaType
  1937  		path       string
  1938  		wantErr    error
  1939  	}{
  1940  		{
  1941  			desc:       "avro schema",
  1942  			schemaType: SchemaAvro,
  1943  			path:       "testdata/schema/us-states.avsc",
  1944  			wantErr:    nil,
  1945  		},
  1946  		{
  1947  			desc:       "protocol buffer schema",
  1948  			schemaType: SchemaProtocolBuffer,
  1949  			path:       "testdata/schema/us-states.proto",
  1950  			wantErr:    nil,
  1951  		},
  1952  		{
  1953  			desc:       "protocol buffer schema",
  1954  			schemaType: SchemaProtocolBuffer,
  1955  			path:       "testdata/schema/invalid.avsc",
  1956  			wantErr:    status.Errorf(codes.InvalidArgument, "Request contains an invalid argument."),
  1957  		},
  1958  	} {
  1959  		t.Run(tc.desc, func(t *testing.T) {
  1960  			content, err := ioutil.ReadFile(tc.path)
  1961  			if err != nil {
  1962  				t.Fatal(err)
  1963  			}
  1964  			def := string(content)
  1965  			cfg := SchemaConfig{
  1966  				Type:       tc.schemaType,
  1967  				Definition: def,
  1968  			}
  1969  			_, gotErr := c.ValidateSchema(ctx, cfg)
  1970  			if status.Code(gotErr) != status.Code(tc.wantErr) {
  1971  				t.Fatalf("got err: %v\nwant err: %v", gotErr, tc.wantErr)
  1972  			}
  1973  		})
  1974  	}
  1975  }
  1976  
  1977  func TestIntegration_ValidateMessage(t *testing.T) {
  1978  	t.Parallel()
  1979  	ctx := context.Background()
  1980  	c := integrationTestSchemaClient(ctx, t)
  1981  	defer c.Close()
  1982  
  1983  	for _, tc := range []struct {
  1984  		desc        string
  1985  		schemaType  SchemaType
  1986  		schemaPath  string
  1987  		encoding    SchemaEncoding
  1988  		messagePath string
  1989  		wantErr     error
  1990  	}{
  1991  		{
  1992  			desc:        "avro json encoding",
  1993  			schemaType:  SchemaAvro,
  1994  			schemaPath:  "testdata/schema/us-states.avsc",
  1995  			encoding:    EncodingJSON,
  1996  			messagePath: "testdata/schema/alaska.json",
  1997  			wantErr:     nil,
  1998  		},
  1999  		{
  2000  			desc:        "avro binary encoding",
  2001  			schemaType:  SchemaAvro,
  2002  			schemaPath:  "testdata/schema/us-states.avsc",
  2003  			encoding:    EncodingBinary,
  2004  			messagePath: "testdata/schema/alaska.avro",
  2005  			wantErr:     nil,
  2006  		},
  2007  		{
  2008  			desc:        "proto json encoding",
  2009  			schemaType:  SchemaProtocolBuffer,
  2010  			schemaPath:  "testdata/schema/us-states.proto",
  2011  			encoding:    EncodingJSON,
  2012  			messagePath: "testdata/schema/alaska.json",
  2013  			wantErr:     nil,
  2014  		},
  2015  		{
  2016  			desc:        "protocol buffer schema",
  2017  			schemaType:  SchemaProtocolBuffer,
  2018  			schemaPath:  "testdata/schema/invalid.avsc",
  2019  			encoding:    EncodingBinary,
  2020  			messagePath: "testdata/schema/invalid.avsc",
  2021  			wantErr:     status.Errorf(codes.InvalidArgument, "Request contains an invalid argument."),
  2022  		},
  2023  	} {
  2024  		t.Run(tc.desc, func(t *testing.T) {
  2025  			content, err := ioutil.ReadFile(tc.schemaPath)
  2026  			if err != nil {
  2027  				t.Fatal(err)
  2028  			}
  2029  			def := string(content)
  2030  			cfg := SchemaConfig{
  2031  				Type:       tc.schemaType,
  2032  				Definition: def,
  2033  			}
  2034  
  2035  			msg, err := ioutil.ReadFile(tc.messagePath)
  2036  			if err != nil {
  2037  				t.Fatal(err)
  2038  			}
  2039  			_, gotErr := c.ValidateMessageWithConfig(ctx, msg, tc.encoding, cfg)
  2040  			if status.Code(gotErr) != status.Code(tc.wantErr) {
  2041  				t.Fatalf("got err: %v\nwant err: %v", gotErr, tc.wantErr)
  2042  			}
  2043  		})
  2044  	}
  2045  }
  2046  
  2047  func TestIntegration_TopicRetention(t *testing.T) {
  2048  	ctx := context.Background()
  2049  	c := integrationTestClient(ctx, t)
  2050  	defer c.Close()
  2051  
  2052  	tc := TopicConfig{
  2053  		RetentionDuration: 31 * 24 * time.Hour, // max retention duration
  2054  	}
  2055  
  2056  	topic, err := createTopicWithRetry(ctx, t, c, topicIDs.New(), &tc)
  2057  	if err != nil {
  2058  		t.Fatalf("failed to create topic: %v", err)
  2059  	}
  2060  	defer topic.Delete(ctx)
  2061  	defer topic.Stop()
  2062  
  2063  	newDur := 11 * time.Minute
  2064  	cfg, err := topic.Update(ctx, TopicConfigToUpdate{
  2065  		RetentionDuration: newDur,
  2066  	})
  2067  	if err != nil {
  2068  		t.Fatalf("failed to update topic: %v", err)
  2069  	}
  2070  	if got := cfg.RetentionDuration; got != newDur {
  2071  		t.Fatalf("cfg.RetentionDuration, got: %v, want: %v", got, newDur)
  2072  	}
  2073  
  2074  	// Create a subscription on the topic and read TopicMessageRetentionDuration.
  2075  	s, err := createSubWithRetry(ctx, t, c, subIDs.New(), SubscriptionConfig{
  2076  		Topic: topic,
  2077  	})
  2078  	if err != nil {
  2079  		t.Fatalf("failed to create subscription: %v", err)
  2080  	}
  2081  	defer s.Delete(ctx)
  2082  	sCfg, err := s.Config(ctx)
  2083  	if err != nil {
  2084  		t.Fatalf("failed to get sub config: %v", err)
  2085  	}
  2086  	if got := sCfg.TopicMessageRetentionDuration; got != newDur {
  2087  		t.Fatalf("sCfg.TopicMessageRetentionDuration, got: %v, want: %v", got, newDur)
  2088  	}
  2089  
  2090  	// Clear retention duration by setting to a negative value.
  2091  	cfg, err = topic.Update(ctx, TopicConfigToUpdate{
  2092  		RetentionDuration: -1 * time.Minute,
  2093  	})
  2094  	if err != nil {
  2095  		t.Fatal(err)
  2096  	}
  2097  	if got := cfg.RetentionDuration; got != nil {
  2098  		t.Fatalf("expected cleared retention duration, got: %v", got)
  2099  	}
  2100  }
  2101  
  2102  func TestIntegration_ExactlyOnceDelivery_PublishReceive(t *testing.T) {
  2103  	ctx := context.Background()
  2104  	client := integrationTestClient(ctx, t)
  2105  
  2106  	for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited
  2107  		testPublishAndReceive(t, client, maxMsgs, false, true, 10, 0)
  2108  	}
  2109  }
  2110  
  2111  func TestIntegration_TopicUpdateSchema(t *testing.T) {
  2112  	ctx := context.Background()
  2113  	c := integrationTestClient(ctx, t)
  2114  	defer c.Close()
  2115  
  2116  	sc := integrationTestSchemaClient(ctx, t)
  2117  	defer sc.Close()
  2118  
  2119  	schemaContent, err := ioutil.ReadFile("testdata/schema/us-states.avsc")
  2120  	if err != nil {
  2121  		t.Fatal(err)
  2122  	}
  2123  
  2124  	schemaID := schemaIDs.New()
  2125  	schemaCfg, err := sc.CreateSchema(ctx, schemaID, SchemaConfig{
  2126  		Type:       SchemaAvro,
  2127  		Definition: string(schemaContent),
  2128  	})
  2129  	if err != nil {
  2130  		t.Fatal(err)
  2131  	}
  2132  	defer sc.DeleteSchema(ctx, schemaID)
  2133  
  2134  	topic, err := createTopicWithRetry(ctx, t, c, topicIDs.New(), nil)
  2135  	if err != nil {
  2136  		t.Fatal(err)
  2137  	}
  2138  	defer topic.Delete(ctx)
  2139  	defer topic.Stop()
  2140  
  2141  	schema := &SchemaSettings{
  2142  		Schema:   schemaCfg.Name,
  2143  		Encoding: EncodingJSON,
  2144  	}
  2145  	cfg, err := topic.Update(ctx, TopicConfigToUpdate{
  2146  		SchemaSettings: schema,
  2147  	})
  2148  	if err != nil {
  2149  		t.Fatal(err)
  2150  	}
  2151  	if diff := cmp.Diff(cfg.SchemaSettings, schema); diff != "" {
  2152  		t.Fatalf("schema settings for update -want, +got: %v", diff)
  2153  	}
  2154  }
  2155  
  2156  func TestIntegration_DetectProjectID(t *testing.T) {
  2157  	if testing.Short() {
  2158  		t.Skip("Integration tests skipped in short mode")
  2159  	}
  2160  	ctx := context.Background()
  2161  	testCreds := testutil.Credentials(ctx)
  2162  	if testCreds == nil {
  2163  		t.Skip("test credentials not present, skipping")
  2164  	}
  2165  
  2166  	goodClient, err := NewClient(ctx, DetectProjectID, option.WithCredentials(testCreds))
  2167  	if err != nil {
  2168  		t.Errorf("test pubsub.NewClient: %v", err)
  2169  	}
  2170  	if goodClient.Project() != testutil.ProjID() {
  2171  		t.Errorf("client.Project() got %q, want %q", goodClient.Project(), testutil.ProjID())
  2172  	}
  2173  
  2174  	badTS := testutil.ErroringTokenSource{}
  2175  	if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil {
  2176  		t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.projectID)
  2177  	}
  2178  }
  2179  
  2180  func TestIntegration_PublishCompression(t *testing.T) {
  2181  	ctx := context.Background()
  2182  	client := integrationTestClient(ctx, t)
  2183  	defer client.Close()
  2184  
  2185  	topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
  2186  	if err != nil {
  2187  		t.Fatal(err)
  2188  	}
  2189  	defer topic.Delete(ctx)
  2190  	defer topic.Stop()
  2191  
  2192  	topic.PublishSettings.EnableCompression = true
  2193  	topic.PublishSettings.CompressionBytesThreshold = 50
  2194  
  2195  	const messageSizeBytes = 1000
  2196  
  2197  	msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))}
  2198  	res := topic.Publish(ctx, msg)
  2199  
  2200  	_, err = res.Get(ctx)
  2201  	if err != nil {
  2202  		t.Errorf("publish result got err: %v", err)
  2203  	}
  2204  }
  2205  
  2206  // createTopicWithRetry creates a topic, wrapped with testutil.Retry and returns the created topic or an error.
  2207  func createTopicWithRetry(ctx context.Context, t *testing.T, c *Client, topicID string, cfg *TopicConfig) (*Topic, error) {
  2208  	var topic *Topic
  2209  	var err error
  2210  	testutil.Retry(t, 5, 1*time.Second, func(r *testutil.R) {
  2211  		if cfg != nil {
  2212  			topic, err = c.CreateTopicWithConfig(ctx, topicID, cfg)
  2213  			if err != nil {
  2214  				r.Errorf("CreateTopic error: %v", err)
  2215  			}
  2216  		} else {
  2217  			topic, err = c.CreateTopic(ctx, topicID)
  2218  			if err != nil {
  2219  				r.Errorf("CreateTopic error: %v", err)
  2220  			}
  2221  		}
  2222  	})
  2223  	return topic, err
  2224  }
  2225  
  2226  // createSubWithRetry creates a subscription, wrapped with testutil.Retry and returns the created subscription or an error.
  2227  func createSubWithRetry(ctx context.Context, t *testing.T, c *Client, subID string, cfg SubscriptionConfig) (*Subscription, error) {
  2228  	var sub *Subscription
  2229  	var err error
  2230  	testutil.Retry(t, 5, 1*time.Second, func(r *testutil.R) {
  2231  		sub, err = c.CreateSubscription(ctx, subID, cfg)
  2232  		if err != nil {
  2233  			r.Errorf("CreateSub error: %v", err)
  2234  		}
  2235  	})
  2236  	return sub, err
  2237  }
  2238  

View as plain text