...

Source file src/cloud.google.com/go/pubsub/subscription_test.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2016 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"log"
    22  	"testing"
    23  	"time"
    24  
    25  	"cloud.google.com/go/internal/testutil"
    26  	pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
    27  	"cloud.google.com/go/pubsub/pstest"
    28  	"github.com/google/go-cmp/cmp/cmpopts"
    29  	"google.golang.org/api/iterator"
    30  	"google.golang.org/api/option"
    31  	"google.golang.org/grpc"
    32  	"google.golang.org/grpc/codes"
    33  	"google.golang.org/grpc/credentials/insecure"
    34  	"google.golang.org/grpc/status"
    35  	"google.golang.org/protobuf/types/known/durationpb"
    36  	"google.golang.org/protobuf/types/known/timestamppb"
    37  )
    38  
    39  // All returns the remaining subscriptions from this iterator.
    40  func slurpSubs(it *SubscriptionIterator) ([]*Subscription, error) {
    41  	var subs []*Subscription
    42  	for {
    43  		switch sub, err := it.Next(); err {
    44  		case nil:
    45  			subs = append(subs, sub)
    46  		case iterator.Done:
    47  			return subs, nil
    48  		default:
    49  			return nil, err
    50  		}
    51  	}
    52  }
    53  
    54  func TestSubscriptionID(t *testing.T) {
    55  	const id = "id"
    56  	c := &Client{projectID: "projid"}
    57  	s := c.Subscription(id)
    58  	if got, want := s.ID(), id; got != want {
    59  		t.Errorf("Subscription.ID() = %q; want %q", got, want)
    60  	}
    61  }
    62  
    63  func TestListProjectSubscriptions(t *testing.T) {
    64  	ctx := context.Background()
    65  	c, srv := newFake(t)
    66  	defer c.Close()
    67  	defer srv.Close()
    68  
    69  	topic := mustCreateTopic(t, c, "t")
    70  	var want []string
    71  	for i := 1; i <= 2; i++ {
    72  		id := fmt.Sprintf("s%d", i)
    73  		want = append(want, id)
    74  		_, err := c.CreateSubscription(ctx, id, SubscriptionConfig{Topic: topic})
    75  		if err != nil {
    76  			t.Fatal(err)
    77  		}
    78  	}
    79  	subs, err := slurpSubs(c.Subscriptions(ctx))
    80  	if err != nil {
    81  		t.Fatal(err)
    82  	}
    83  
    84  	got := getSubIDs(subs)
    85  	if !testutil.Equal(got, want) {
    86  		t.Errorf("got %v, want %v", got, want)
    87  	}
    88  
    89  	// Call list again, but check the config this time.
    90  	it := c.Subscriptions(ctx)
    91  	i := 1
    92  	for {
    93  		sub, err := it.NextConfig()
    94  		if err == iterator.Done {
    95  			break
    96  		}
    97  		if err != nil {
    98  			t.Errorf("SubscriptionIterator.NextConfig() got err: %v", err)
    99  		}
   100  		if got := sub.Topic.ID(); got != topic.ID() {
   101  			t.Errorf("subConfig.Topic mismatch, got: %v, want: %v", got, topic.ID())
   102  		}
   103  
   104  		want := fmt.Sprintf("s%d", i)
   105  		if got := sub.ID(); got != want {
   106  			t.Errorf("sub.ID() mismatch: got %s, want: %s", got, want)
   107  		}
   108  		want = fmt.Sprintf("projects/P/subscriptions/s%d", i)
   109  		if got := sub.String(); got != want {
   110  			t.Errorf("sub.String() mismatch: got %s, want: %s", got, want)
   111  		}
   112  		i++
   113  	}
   114  }
   115  
   116  func getSubIDs(subs []*Subscription) []string {
   117  	var names []string
   118  	for _, sub := range subs {
   119  		names = append(names, sub.ID())
   120  	}
   121  	return names
   122  }
   123  
   124  func TestListTopicSubscriptions(t *testing.T) {
   125  	ctx := context.Background()
   126  	c, srv := newFake(t)
   127  	defer c.Close()
   128  	defer srv.Close()
   129  
   130  	topics := []*Topic{
   131  		mustCreateTopic(t, c, "t0"),
   132  		mustCreateTopic(t, c, "t1"),
   133  	}
   134  	wants := make([][]string, 2)
   135  	for i := 0; i < 5; i++ {
   136  		id := fmt.Sprintf("s%d", i)
   137  		sub, err := c.CreateSubscription(ctx, id, SubscriptionConfig{Topic: topics[i%2]})
   138  		if err != nil {
   139  			t.Fatal(err)
   140  		}
   141  		wants[i%2] = append(wants[i%2], sub.ID())
   142  	}
   143  
   144  	for i, topic := range topics {
   145  		subs, err := slurpSubs(topic.Subscriptions(ctx))
   146  		if err != nil {
   147  			t.Fatal(err)
   148  		}
   149  		got := getSubIDs(subs)
   150  		if !testutil.Equal(got, wants[i]) {
   151  			t.Errorf("#%d: got %v, want %v", i, got, wants[i])
   152  		}
   153  	}
   154  }
   155  
   156  const defaultRetentionDuration = 168 * time.Hour
   157  
   158  func TestSubscriptionConfig(t *testing.T) {
   159  	ctx := context.Background()
   160  	client, srv := newFake(t)
   161  	defer client.Close()
   162  	defer srv.Close()
   163  
   164  	topic := mustCreateTopic(t, client, "t")
   165  	sub, err := client.CreateSubscription(ctx, "s", SubscriptionConfig{
   166  		Topic:            topic,
   167  		ExpirationPolicy: 30 * time.Hour,
   168  		PushConfig: PushConfig{
   169  			Endpoint: "https://example.com/push",
   170  			AuthenticationMethod: &OIDCToken{
   171  				ServiceAccountEmail: "foo@example.com",
   172  				Audience:            "client-12345",
   173  			},
   174  		},
   175  	})
   176  	if err != nil {
   177  		t.Fatal(err)
   178  	}
   179  	cfg, err := sub.Config(ctx)
   180  	if err != nil {
   181  		t.Fatal(err)
   182  	}
   183  	want := SubscriptionConfig{
   184  		Topic:               topic,
   185  		AckDeadline:         10 * time.Second,
   186  		RetainAckedMessages: false,
   187  		RetentionDuration:   defaultRetentionDuration,
   188  		ExpirationPolicy:    30 * time.Hour,
   189  		PushConfig: PushConfig{
   190  			Endpoint: "https://example.com/push",
   191  			AuthenticationMethod: &OIDCToken{
   192  				ServiceAccountEmail: "foo@example.com",
   193  				Audience:            "client-12345",
   194  			},
   195  			Wrapper: &PubsubWrapper{},
   196  		},
   197  		EnableExactlyOnceDelivery: false,
   198  		State:                     SubscriptionStateActive,
   199  	}
   200  	opt := cmpopts.IgnoreUnexported(SubscriptionConfig{})
   201  	if diff := testutil.Diff(cfg, want, opt); diff != "" {
   202  		t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
   203  	}
   204  
   205  	got, err := sub.Update(ctx, SubscriptionConfigToUpdate{
   206  		AckDeadline:         20 * time.Second,
   207  		RetainAckedMessages: true,
   208  		Labels:              map[string]string{"label": "value"},
   209  		ExpirationPolicy:    72 * time.Hour,
   210  		PushConfig: &PushConfig{
   211  			Endpoint: "https://example2.com/push",
   212  			AuthenticationMethod: &OIDCToken{
   213  				ServiceAccountEmail: "bar@example.com",
   214  				Audience:            "client-98765",
   215  			},
   216  			Wrapper: &NoWrapper{
   217  				WriteMetadata: true,
   218  			},
   219  		},
   220  		EnableExactlyOnceDelivery: true,
   221  	})
   222  	if err != nil {
   223  		t.Fatal(err)
   224  	}
   225  	want = SubscriptionConfig{
   226  		Topic:               topic,
   227  		AckDeadline:         20 * time.Second,
   228  		RetainAckedMessages: true,
   229  		RetentionDuration:   defaultRetentionDuration,
   230  		Labels:              map[string]string{"label": "value"},
   231  		ExpirationPolicy:    72 * time.Hour,
   232  		PushConfig: PushConfig{
   233  			Endpoint: "https://example2.com/push",
   234  			AuthenticationMethod: &OIDCToken{
   235  				ServiceAccountEmail: "bar@example.com",
   236  				Audience:            "client-98765",
   237  			},
   238  			Wrapper: &NoWrapper{
   239  				WriteMetadata: true,
   240  			},
   241  		},
   242  		EnableExactlyOnceDelivery: true,
   243  		State:                     SubscriptionStateActive,
   244  	}
   245  	if diff := testutil.Diff(got, want, opt); diff != "" {
   246  		t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
   247  	}
   248  
   249  	got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
   250  		RetentionDuration: 2 * time.Hour,
   251  		Labels:            map[string]string{},
   252  	})
   253  	if err != nil {
   254  		t.Fatal(err)
   255  	}
   256  	want.RetentionDuration = 2 * time.Hour
   257  	want.Labels = nil
   258  	if diff := testutil.Diff(got, want, opt); diff != "" {
   259  		t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
   260  	}
   261  
   262  	_, err = sub.Update(ctx, SubscriptionConfigToUpdate{})
   263  	if err == nil {
   264  		t.Fatal("got nil, want error")
   265  	}
   266  
   267  	// Check ExpirationPolicy when set to never expire.
   268  	got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
   269  		ExpirationPolicy: time.Duration(0),
   270  	})
   271  	if err != nil {
   272  		t.Fatal(err)
   273  	}
   274  	want.ExpirationPolicy = time.Duration(0)
   275  	if diff := testutil.Diff(got, want, opt); diff != "" {
   276  		t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
   277  	}
   278  }
   279  
   280  func TestReceive(t *testing.T) {
   281  	testReceive(t, true, false)
   282  	testReceive(t, false, false)
   283  	testReceive(t, false, true)
   284  }
   285  
   286  func testReceive(t *testing.T, synchronous, exactlyOnceDelivery bool) {
   287  	t.Run(fmt.Sprintf("synchronous:%t,exactlyOnceDelivery:%t", synchronous, exactlyOnceDelivery), func(t *testing.T) {
   288  		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
   289  		defer cancel()
   290  		client, srv := newFake(t)
   291  		defer client.Close()
   292  		defer srv.Close()
   293  
   294  		topic := mustCreateTopic(t, client, "t")
   295  		sub, err := client.CreateSubscription(ctx, "s", SubscriptionConfig{
   296  			Topic:                     topic,
   297  			EnableExactlyOnceDelivery: exactlyOnceDelivery,
   298  		})
   299  		if err != nil {
   300  			t.Fatal(err)
   301  		}
   302  		for i := 0; i < 256; i++ {
   303  			srv.Publish(topic.name, []byte{byte(i)}, nil)
   304  		}
   305  		sub.ReceiveSettings.Synchronous = synchronous
   306  		msgs, err := pullN(ctx, sub, 256, 0, func(_ context.Context, m *Message) {
   307  			if exactlyOnceDelivery {
   308  				ar := m.AckWithResult()
   309  				// Don't use the above ctx here since that will get cancelled.
   310  				ackStatus, err := ar.Get(context.Background())
   311  				if err != nil {
   312  					t.Fatalf("pullN err for message(%s): %v", m.ID, err)
   313  				}
   314  				if ackStatus != AcknowledgeStatusSuccess {
   315  					t.Fatalf("pullN got non-success AckStatus: %v", ackStatus)
   316  				}
   317  			} else {
   318  				m.Ack()
   319  			}
   320  		})
   321  		if c := status.Convert(err); err != nil && c.Code() != codes.Canceled {
   322  			t.Fatalf("Pull: %v", err)
   323  		}
   324  		var seen [256]bool
   325  		for _, m := range msgs {
   326  			seen[m.Data[0]] = true
   327  		}
   328  		for i, saw := range seen {
   329  			if !saw {
   330  				t.Errorf("sync=%t, eod=%t: did not see message #%d", synchronous, exactlyOnceDelivery, i)
   331  			}
   332  		}
   333  	})
   334  }
   335  
   336  func (t1 *Topic) Equal(t2 *Topic) bool {
   337  	if t1 == nil && t2 == nil {
   338  		return true
   339  	}
   340  	if t1 == nil || t2 == nil {
   341  		return false
   342  	}
   343  	return t1.c == t2.c && t1.name == t2.name
   344  }
   345  
   346  // Note: be sure to close client and server!
   347  func newFake(t *testing.T) (*Client, *pstest.Server) {
   348  	ctx := context.Background()
   349  	srv := pstest.NewServer()
   350  	client, err := NewClient(ctx, projName,
   351  		option.WithEndpoint(srv.Addr),
   352  		option.WithoutAuthentication(),
   353  		option.WithGRPCDialOption(grpc.WithInsecure()))
   354  	if err != nil {
   355  		t.Fatal(err)
   356  	}
   357  	return client, srv
   358  }
   359  
   360  func TestPushConfigAuthenticationMethod_toProto(t *testing.T) {
   361  	in := &PushConfig{
   362  		Endpoint: "https://example.com/push",
   363  		AuthenticationMethod: &OIDCToken{
   364  			ServiceAccountEmail: "foo@example.com",
   365  			Audience:            "client-12345",
   366  		},
   367  	}
   368  	got := in.toProto()
   369  	want := &pb.PushConfig{
   370  		PushEndpoint: "https://example.com/push",
   371  		AuthenticationMethod: &pb.PushConfig_OidcToken_{
   372  			OidcToken: &pb.PushConfig_OidcToken{
   373  				ServiceAccountEmail: "foo@example.com",
   374  				Audience:            "client-12345",
   375  			},
   376  		},
   377  	}
   378  	if diff := testutil.Diff(got, want); diff != "" {
   379  		t.Errorf("Roundtrip to Proto failed\ngot: - want: +\n%s", diff)
   380  	}
   381  }
   382  
   383  func TestDeadLettering_toProto(t *testing.T) {
   384  	in := &DeadLetterPolicy{
   385  		MaxDeliveryAttempts: 10,
   386  		DeadLetterTopic:     "projects/p/topics/t",
   387  	}
   388  	got := in.toProto()
   389  	want := &pb.DeadLetterPolicy{
   390  		DeadLetterTopic:     "projects/p/topics/t",
   391  		MaxDeliveryAttempts: 10,
   392  	}
   393  	if diff := testutil.Diff(got, want); diff != "" {
   394  		t.Errorf("Roundtrip to Proto failed\ngot: - want: +\n%s", diff)
   395  	}
   396  }
   397  
   398  // Check if incoming ReceivedMessages are properly converted to Message structs
   399  // that expose the DeliveryAttempt field when dead lettering is enabled/disabled.
   400  func TestDeadLettering_toMessage(t *testing.T) {
   401  	// If dead lettering is disabled, DeliveryAttempt should default to 0.
   402  	receivedMsg := &pb.ReceivedMessage{
   403  		AckId: "1234",
   404  		Message: &pb.PubsubMessage{
   405  			Data:        []byte("some message"),
   406  			MessageId:   "id-1234",
   407  			PublishTime: timestamppb.Now(),
   408  		},
   409  	}
   410  	got, err := toMessage(receivedMsg, time.Time{}, nil)
   411  	if err != nil {
   412  		t.Errorf("toMessage failed: %v", err)
   413  	}
   414  	if got.DeliveryAttempt != nil {
   415  		t.Errorf("toMessage with dead-lettering disabled failed\ngot: %d, want nil", *got.DeliveryAttempt)
   416  	}
   417  
   418  	// If dead lettering is enabled, toMessage should properly pass through the DeliveryAttempt field.
   419  	receivedMsg.DeliveryAttempt = 10
   420  	got, err = toMessage(receivedMsg, time.Time{}, nil)
   421  	if err != nil {
   422  		t.Errorf("toMessage failed: %v", err)
   423  	}
   424  	if *got.DeliveryAttempt != int(receivedMsg.DeliveryAttempt) {
   425  		t.Errorf("toMessage with dead-lettered enabled failed\ngot: %d, want %d", *got.DeliveryAttempt, receivedMsg.DeliveryAttempt)
   426  	}
   427  }
   428  
   429  func TestRetryPolicy_toProto(t *testing.T) {
   430  	in := &RetryPolicy{
   431  		MinimumBackoff: 20 * time.Second,
   432  		MaximumBackoff: 300 * time.Second,
   433  	}
   434  	got := in.toProto()
   435  	want := &pb.RetryPolicy{
   436  		MinimumBackoff: durationpb.New(20 * time.Second),
   437  		MaximumBackoff: durationpb.New(300 * time.Second),
   438  	}
   439  	if diff := testutil.Diff(got, want); diff != "" {
   440  		t.Errorf("Roundtrip to Proto failed\ngot: - want: +\n%s", diff)
   441  	}
   442  }
   443  
   444  func TestOrdering_CreateSubscription(t *testing.T) {
   445  	ctx := context.Background()
   446  	client, srv := newFake(t)
   447  	defer client.Close()
   448  	defer srv.Close()
   449  
   450  	topic := mustCreateTopic(t, client, "t")
   451  	subConfig := SubscriptionConfig{
   452  		Topic:                 topic,
   453  		EnableMessageOrdering: true,
   454  	}
   455  	orderSub, err := client.CreateSubscription(ctx, "s", subConfig)
   456  	if err != nil {
   457  		t.Fatal(err)
   458  	}
   459  	cfg, err := orderSub.Config(ctx)
   460  	if err != nil {
   461  		t.Fatal(err)
   462  	}
   463  	if !cfg.EnableMessageOrdering {
   464  		t.Fatalf("Expected EnableMessageOrdering to be true in %s", orderSub.String())
   465  	}
   466  
   467  	// Test cancellation works as intended with ordering enabled.
   468  	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
   469  	defer cancel()
   470  	orderSub.Receive(ctx, func(ctx context.Context, msg *Message) {
   471  		msg.Ack()
   472  	})
   473  }
   474  
   475  func TestBigQuerySubscription(t *testing.T) {
   476  	ctx, cancel := context.WithCancel(context.Background())
   477  	defer cancel()
   478  	client, srv := newFake(t)
   479  	defer client.Close()
   480  	defer srv.Close()
   481  
   482  	topic := mustCreateTopic(t, client, "t")
   483  	bqTable := "some-project:some-dataset.some-table"
   484  	bqConfig := BigQueryConfig{
   485  		Table: bqTable,
   486  	}
   487  
   488  	subConfig := SubscriptionConfig{
   489  		Topic:          topic,
   490  		BigQueryConfig: bqConfig,
   491  	}
   492  	bqSub, err := client.CreateSubscription(ctx, "s", subConfig)
   493  	if err != nil {
   494  		t.Fatal(err)
   495  	}
   496  	cfg, err := bqSub.Config(ctx)
   497  	if err != nil {
   498  		t.Fatal(err)
   499  	}
   500  
   501  	want := bqConfig
   502  	want.State = BigQueryConfigActive
   503  	if diff := testutil.Diff(cfg.BigQueryConfig, want); diff != "" {
   504  		t.Fatalf("CreateBQSubscription mismatch: \n%s", diff)
   505  	}
   506  }
   507  
   508  func TestCloudStorageSubscription(t *testing.T) {
   509  	ctx, cancel := context.WithCancel(context.Background())
   510  	defer cancel()
   511  	client, srv := newFake(t)
   512  	defer client.Close()
   513  	defer srv.Close()
   514  
   515  	topic := mustCreateTopic(t, client, "t")
   516  	bucket := "fake-bucket"
   517  	csCfg := CloudStorageConfig{
   518  		Bucket:         bucket,
   519  		FilenamePrefix: "some-prefix",
   520  		FilenameSuffix: "some-suffix",
   521  		OutputFormat: &CloudStorageOutputFormatAvroConfig{
   522  			WriteMetadata: true,
   523  		},
   524  		MaxDuration: 10 * time.Minute,
   525  		MaxBytes:    10e5,
   526  	}
   527  
   528  	subConfig := SubscriptionConfig{
   529  		Topic:              topic,
   530  		CloudStorageConfig: csCfg,
   531  	}
   532  	csSub, err := client.CreateSubscription(ctx, "s", subConfig)
   533  	if err != nil {
   534  		t.Fatal(err)
   535  	}
   536  	cfg, err := csSub.Config(ctx)
   537  	if err != nil {
   538  		t.Fatal(err)
   539  	}
   540  
   541  	want := csCfg
   542  	want.State = CloudStorageConfigActive
   543  	if diff := testutil.Diff(cfg.CloudStorageConfig, want); diff != "" {
   544  		t.Fatalf("create cloud storage subscription mismatch: \n%s", diff)
   545  	}
   546  
   547  	csCfg.OutputFormat = &CloudStorageOutputFormatTextConfig{}
   548  	cfg, err = csSub.Update(ctx, SubscriptionConfigToUpdate{
   549  		CloudStorageConfig: &csCfg,
   550  	})
   551  	if err != nil {
   552  		t.Fatal(err)
   553  	}
   554  	got := cfg.CloudStorageConfig
   555  	want = csCfg
   556  	want.State = CloudStorageConfigActive
   557  	if diff := testutil.Diff(got, want); diff != "" {
   558  		t.Fatalf("update cloud storage subscription mismatch: \n%s", diff)
   559  	}
   560  
   561  	// Test resetting to a pull based subscription.
   562  	cfg, err = csSub.Update(ctx, SubscriptionConfigToUpdate{
   563  		CloudStorageConfig: &CloudStorageConfig{},
   564  	})
   565  	if err != nil {
   566  		t.Fatal(err)
   567  	}
   568  	got = cfg.CloudStorageConfig
   569  	want = CloudStorageConfig{}
   570  	if diff := testutil.Diff(got, want); diff != "" {
   571  		t.Fatalf("remove cloud storage subscription mismatch: \n%s", diff)
   572  	}
   573  }
   574  
   575  func TestExactlyOnceDelivery_AckSuccess(t *testing.T) {
   576  	t.Parallel()
   577  	ctx, cancel := context.WithCancel(context.Background())
   578  	client, srv := newFake(t)
   579  	defer client.Close()
   580  	defer srv.Close()
   581  
   582  	topic := mustCreateTopic(t, client, "t")
   583  	subConfig := SubscriptionConfig{
   584  		Topic:                     topic,
   585  		EnableExactlyOnceDelivery: true,
   586  	}
   587  	s, err := client.CreateSubscription(ctx, "s", subConfig)
   588  	if err != nil {
   589  		t.Fatalf("create sub err: %v", err)
   590  	}
   591  	s.ReceiveSettings.NumGoroutines = 1
   592  	r := topic.Publish(ctx, &Message{
   593  		Data: []byte("exactly-once-message"),
   594  	})
   595  	if _, err := r.Get(ctx); err != nil {
   596  		t.Fatalf("failed to publish message: %v", err)
   597  	}
   598  
   599  	err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
   600  		ar := msg.AckWithResult()
   601  		s, err := ar.Get(ctx)
   602  		if s != AcknowledgeStatusSuccess {
   603  			t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusSuccess)
   604  		}
   605  		if err != nil {
   606  			t.Errorf("AckResult error got %v", err)
   607  		}
   608  		cancel()
   609  	})
   610  	if err != nil {
   611  		t.Fatalf("s.Receive err: %v", err)
   612  	}
   613  }
   614  
   615  func TestExactlyOnceDelivery_AckFailureErrorPermissionDenied(t *testing.T) {
   616  	t.Parallel()
   617  	ctx, cancel := context.WithCancel(context.Background())
   618  	srv := pstest.NewServer(pstest.WithErrorInjection("Acknowledge", codes.PermissionDenied, "insufficient permission"))
   619  	client, err := NewClient(ctx, projName,
   620  		option.WithEndpoint(srv.Addr),
   621  		option.WithoutAuthentication(),
   622  		option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
   623  	if err != nil {
   624  		t.Fatal(err)
   625  	}
   626  	defer client.Close()
   627  	defer srv.Close()
   628  
   629  	topic := mustCreateTopic(t, client, "t")
   630  	subConfig := SubscriptionConfig{
   631  		Topic:                     topic,
   632  		EnableExactlyOnceDelivery: true,
   633  	}
   634  	s, err := client.CreateSubscription(ctx, "s", subConfig)
   635  	if err != nil {
   636  		t.Fatalf("create sub err: %v", err)
   637  	}
   638  	s.ReceiveSettings.NumGoroutines = 1
   639  	r := topic.Publish(ctx, &Message{
   640  		Data: []byte("exactly-once-message"),
   641  	})
   642  	if _, err := r.Get(ctx); err != nil {
   643  		t.Fatalf("failed to publish message: %v", err)
   644  	}
   645  	err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
   646  		ar := msg.AckWithResult()
   647  		s, err := ar.Get(ctx)
   648  		if s != AcknowledgeStatusPermissionDenied {
   649  			t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusPermissionDenied)
   650  		}
   651  		wantErr := status.Errorf(codes.PermissionDenied, "insufficient permission")
   652  		if !errors.Is(err, wantErr) {
   653  			t.Errorf("AckResult error\ngot  %v\nwant %s", err, wantErr)
   654  		}
   655  		cancel()
   656  	})
   657  	if err != nil {
   658  		t.Fatalf("s.Receive err: %v", err)
   659  	}
   660  }
   661  
   662  func TestExactlyOnceDelivery_AckRetryDeadlineExceeded(t *testing.T) {
   663  	ctx, cancel := context.WithCancel(context.Background())
   664  	srv := pstest.NewServer(pstest.WithErrorInjection("Acknowledge", codes.Internal, "internal error"))
   665  	client, err := NewClient(ctx, projName,
   666  		option.WithEndpoint(srv.Addr),
   667  		option.WithoutAuthentication(),
   668  		option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
   669  	if err != nil {
   670  		t.Fatal(err)
   671  	}
   672  	defer client.Close()
   673  	defer srv.Close()
   674  
   675  	topic := mustCreateTopic(t, client, "t")
   676  	subConfig := SubscriptionConfig{
   677  		Topic:                     topic,
   678  		EnableExactlyOnceDelivery: true,
   679  	}
   680  	s, err := client.CreateSubscription(ctx, "s", subConfig)
   681  	if err != nil {
   682  		t.Fatalf("create sub err: %v", err)
   683  	}
   684  	r := topic.Publish(ctx, &Message{
   685  		Data: []byte("exactly-once-message"),
   686  	})
   687  	if _, err := r.Get(ctx); err != nil {
   688  		t.Fatalf("failed to publish message: %v", err)
   689  	}
   690  
   691  	s.ReceiveSettings = ReceiveSettings{
   692  		NumGoroutines: 1,
   693  	}
   694  	// Override the default timeout here so this test doesn't take 10 minutes.
   695  	exactlyOnceDeliveryRetryDeadline = 10 * time.Second
   696  	err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
   697  		log.Printf("received message: %v\n", msg)
   698  		ar := msg.AckWithResult()
   699  		s, err := ar.Get(ctx)
   700  		if s != AcknowledgeStatusOther {
   701  			t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusOther)
   702  		}
   703  		wantErr := context.DeadlineExceeded
   704  		if !errors.Is(err, wantErr) {
   705  			t.Errorf("AckResult error\ngot  %v\nwant %s", err, wantErr)
   706  		}
   707  		cancel()
   708  	})
   709  	if err != nil {
   710  		t.Fatalf("s.Receive err: %v", err)
   711  	}
   712  }
   713  
   714  func TestExactlyOnceDelivery_NackSuccess(t *testing.T) {
   715  	t.Parallel()
   716  	ctx, cancel := context.WithCancel(context.Background())
   717  	client, srv := newFake(t)
   718  	defer client.Close()
   719  	defer srv.Close()
   720  
   721  	topic := mustCreateTopic(t, client, "t")
   722  	subConfig := SubscriptionConfig{
   723  		Topic:                     topic,
   724  		EnableExactlyOnceDelivery: true,
   725  	}
   726  	s, err := client.CreateSubscription(ctx, "s", subConfig)
   727  	if err != nil {
   728  		t.Fatalf("create sub err: %v", err)
   729  	}
   730  	r := topic.Publish(ctx, &Message{
   731  		Data: []byte("exactly-once-message"),
   732  	})
   733  	if _, err := r.Get(ctx); err != nil {
   734  		t.Fatalf("failed to publish message: %v", err)
   735  	}
   736  
   737  	s.ReceiveSettings = ReceiveSettings{
   738  		NumGoroutines: 1,
   739  	}
   740  	err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
   741  		ar := msg.NackWithResult()
   742  		s, err := ar.Get(context.Background())
   743  		if s != AcknowledgeStatusSuccess {
   744  			t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusSuccess)
   745  		}
   746  		if err != nil {
   747  			t.Errorf("AckResult error got %v", err)
   748  		}
   749  		cancel()
   750  	})
   751  	if err != nil {
   752  		t.Fatalf("s.Receive err: %v", err)
   753  	}
   754  }
   755  
   756  func TestExactlyOnceDelivery_ReceiptModackError(t *testing.T) {
   757  	ctx := context.Background()
   758  	srv := pstest.NewServer(pstest.WithErrorInjection("ModifyAckDeadline", codes.Internal, "internal error"))
   759  	client, err := NewClient(ctx, projName,
   760  		option.WithEndpoint(srv.Addr),
   761  		option.WithoutAuthentication(),
   762  		option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
   763  	if err != nil {
   764  		t.Fatal(err)
   765  	}
   766  	defer client.Close()
   767  	defer srv.Close()
   768  
   769  	topic := mustCreateTopic(t, client, "t")
   770  	subConfig := SubscriptionConfig{
   771  		Topic:                     topic,
   772  		EnableExactlyOnceDelivery: true,
   773  	}
   774  	s, err := client.CreateSubscription(ctx, "s", subConfig)
   775  	if err != nil {
   776  		t.Fatalf("create sub err: %v", err)
   777  	}
   778  	r := topic.Publish(ctx, &Message{
   779  		Data: []byte("exactly-once-message"),
   780  	})
   781  	if _, err := r.Get(ctx); err != nil {
   782  		t.Fatalf("failed to publish message: %v", err)
   783  	}
   784  	ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
   785  	defer cancel()
   786  	s.Receive(ctx, func(ctx context.Context, msg *Message) {
   787  		t.Fatal("expected message to not have been delivered when exactly once enabled")
   788  	})
   789  }
   790  
   791  func TestSubscribeMessageExpirationFlowControl(t *testing.T) {
   792  	t.Parallel()
   793  	ctx, cancel := context.WithCancel(context.Background())
   794  	defer cancel()
   795  	client, srv := newFake(t)
   796  	defer client.Close()
   797  	defer srv.Close()
   798  
   799  	topic := mustCreateTopic(t, client, "t")
   800  	subConfig := SubscriptionConfig{
   801  		Topic: topic,
   802  	}
   803  	s, err := client.CreateSubscription(ctx, "s", subConfig)
   804  	if err != nil {
   805  		t.Fatalf("create sub err: %v", err)
   806  	}
   807  
   808  	s.ReceiveSettings.NumGoroutines = 1
   809  	s.ReceiveSettings.MaxOutstandingMessages = 1
   810  	s.ReceiveSettings.MaxExtension = 10 * time.Second
   811  	s.ReceiveSettings.MaxExtensionPeriod = 10 * time.Second
   812  	r := topic.Publish(ctx, &Message{
   813  		Data: []byte("redelivered-message"),
   814  	})
   815  	if _, err := r.Get(ctx); err != nil {
   816  		t.Fatalf("failed to publish message: %v", err)
   817  	}
   818  
   819  	deliveryCount := 0
   820  	ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
   821  	defer cancel()
   822  	err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
   823  		// Only acknowledge the message on the 2nd invocation of the callback (2nd delivery).
   824  		if deliveryCount == 1 {
   825  			msg.Ack()
   826  		}
   827  		// Otherwise, do nothing and let the message expire.
   828  		deliveryCount++
   829  		if deliveryCount == 2 {
   830  			cancel()
   831  		}
   832  	})
   833  	if deliveryCount != 2 {
   834  		t.Fatalf("expected 2 iterations of the callback, got %d", deliveryCount)
   835  	}
   836  	if err != nil {
   837  		t.Fatalf("s.Receive err: %v", err)
   838  	}
   839  }
   840  

View as plain text