1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "log"
22 "testing"
23 "time"
24
25 "cloud.google.com/go/internal/testutil"
26 pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
27 "cloud.google.com/go/pubsub/pstest"
28 "github.com/google/go-cmp/cmp/cmpopts"
29 "google.golang.org/api/iterator"
30 "google.golang.org/api/option"
31 "google.golang.org/grpc"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/credentials/insecure"
34 "google.golang.org/grpc/status"
35 "google.golang.org/protobuf/types/known/durationpb"
36 "google.golang.org/protobuf/types/known/timestamppb"
37 )
38
39
40 func slurpSubs(it *SubscriptionIterator) ([]*Subscription, error) {
41 var subs []*Subscription
42 for {
43 switch sub, err := it.Next(); err {
44 case nil:
45 subs = append(subs, sub)
46 case iterator.Done:
47 return subs, nil
48 default:
49 return nil, err
50 }
51 }
52 }
53
54 func TestSubscriptionID(t *testing.T) {
55 const id = "id"
56 c := &Client{projectID: "projid"}
57 s := c.Subscription(id)
58 if got, want := s.ID(), id; got != want {
59 t.Errorf("Subscription.ID() = %q; want %q", got, want)
60 }
61 }
62
63 func TestListProjectSubscriptions(t *testing.T) {
64 ctx := context.Background()
65 c, srv := newFake(t)
66 defer c.Close()
67 defer srv.Close()
68
69 topic := mustCreateTopic(t, c, "t")
70 var want []string
71 for i := 1; i <= 2; i++ {
72 id := fmt.Sprintf("s%d", i)
73 want = append(want, id)
74 _, err := c.CreateSubscription(ctx, id, SubscriptionConfig{Topic: topic})
75 if err != nil {
76 t.Fatal(err)
77 }
78 }
79 subs, err := slurpSubs(c.Subscriptions(ctx))
80 if err != nil {
81 t.Fatal(err)
82 }
83
84 got := getSubIDs(subs)
85 if !testutil.Equal(got, want) {
86 t.Errorf("got %v, want %v", got, want)
87 }
88
89
90 it := c.Subscriptions(ctx)
91 i := 1
92 for {
93 sub, err := it.NextConfig()
94 if err == iterator.Done {
95 break
96 }
97 if err != nil {
98 t.Errorf("SubscriptionIterator.NextConfig() got err: %v", err)
99 }
100 if got := sub.Topic.ID(); got != topic.ID() {
101 t.Errorf("subConfig.Topic mismatch, got: %v, want: %v", got, topic.ID())
102 }
103
104 want := fmt.Sprintf("s%d", i)
105 if got := sub.ID(); got != want {
106 t.Errorf("sub.ID() mismatch: got %s, want: %s", got, want)
107 }
108 want = fmt.Sprintf("projects/P/subscriptions/s%d", i)
109 if got := sub.String(); got != want {
110 t.Errorf("sub.String() mismatch: got %s, want: %s", got, want)
111 }
112 i++
113 }
114 }
115
116 func getSubIDs(subs []*Subscription) []string {
117 var names []string
118 for _, sub := range subs {
119 names = append(names, sub.ID())
120 }
121 return names
122 }
123
124 func TestListTopicSubscriptions(t *testing.T) {
125 ctx := context.Background()
126 c, srv := newFake(t)
127 defer c.Close()
128 defer srv.Close()
129
130 topics := []*Topic{
131 mustCreateTopic(t, c, "t0"),
132 mustCreateTopic(t, c, "t1"),
133 }
134 wants := make([][]string, 2)
135 for i := 0; i < 5; i++ {
136 id := fmt.Sprintf("s%d", i)
137 sub, err := c.CreateSubscription(ctx, id, SubscriptionConfig{Topic: topics[i%2]})
138 if err != nil {
139 t.Fatal(err)
140 }
141 wants[i%2] = append(wants[i%2], sub.ID())
142 }
143
144 for i, topic := range topics {
145 subs, err := slurpSubs(topic.Subscriptions(ctx))
146 if err != nil {
147 t.Fatal(err)
148 }
149 got := getSubIDs(subs)
150 if !testutil.Equal(got, wants[i]) {
151 t.Errorf("#%d: got %v, want %v", i, got, wants[i])
152 }
153 }
154 }
155
156 const defaultRetentionDuration = 168 * time.Hour
157
158 func TestSubscriptionConfig(t *testing.T) {
159 ctx := context.Background()
160 client, srv := newFake(t)
161 defer client.Close()
162 defer srv.Close()
163
164 topic := mustCreateTopic(t, client, "t")
165 sub, err := client.CreateSubscription(ctx, "s", SubscriptionConfig{
166 Topic: topic,
167 ExpirationPolicy: 30 * time.Hour,
168 PushConfig: PushConfig{
169 Endpoint: "https://example.com/push",
170 AuthenticationMethod: &OIDCToken{
171 ServiceAccountEmail: "foo@example.com",
172 Audience: "client-12345",
173 },
174 },
175 })
176 if err != nil {
177 t.Fatal(err)
178 }
179 cfg, err := sub.Config(ctx)
180 if err != nil {
181 t.Fatal(err)
182 }
183 want := SubscriptionConfig{
184 Topic: topic,
185 AckDeadline: 10 * time.Second,
186 RetainAckedMessages: false,
187 RetentionDuration: defaultRetentionDuration,
188 ExpirationPolicy: 30 * time.Hour,
189 PushConfig: PushConfig{
190 Endpoint: "https://example.com/push",
191 AuthenticationMethod: &OIDCToken{
192 ServiceAccountEmail: "foo@example.com",
193 Audience: "client-12345",
194 },
195 Wrapper: &PubsubWrapper{},
196 },
197 EnableExactlyOnceDelivery: false,
198 State: SubscriptionStateActive,
199 }
200 opt := cmpopts.IgnoreUnexported(SubscriptionConfig{})
201 if diff := testutil.Diff(cfg, want, opt); diff != "" {
202 t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
203 }
204
205 got, err := sub.Update(ctx, SubscriptionConfigToUpdate{
206 AckDeadline: 20 * time.Second,
207 RetainAckedMessages: true,
208 Labels: map[string]string{"label": "value"},
209 ExpirationPolicy: 72 * time.Hour,
210 PushConfig: &PushConfig{
211 Endpoint: "https://example2.com/push",
212 AuthenticationMethod: &OIDCToken{
213 ServiceAccountEmail: "bar@example.com",
214 Audience: "client-98765",
215 },
216 Wrapper: &NoWrapper{
217 WriteMetadata: true,
218 },
219 },
220 EnableExactlyOnceDelivery: true,
221 })
222 if err != nil {
223 t.Fatal(err)
224 }
225 want = SubscriptionConfig{
226 Topic: topic,
227 AckDeadline: 20 * time.Second,
228 RetainAckedMessages: true,
229 RetentionDuration: defaultRetentionDuration,
230 Labels: map[string]string{"label": "value"},
231 ExpirationPolicy: 72 * time.Hour,
232 PushConfig: PushConfig{
233 Endpoint: "https://example2.com/push",
234 AuthenticationMethod: &OIDCToken{
235 ServiceAccountEmail: "bar@example.com",
236 Audience: "client-98765",
237 },
238 Wrapper: &NoWrapper{
239 WriteMetadata: true,
240 },
241 },
242 EnableExactlyOnceDelivery: true,
243 State: SubscriptionStateActive,
244 }
245 if diff := testutil.Diff(got, want, opt); diff != "" {
246 t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
247 }
248
249 got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
250 RetentionDuration: 2 * time.Hour,
251 Labels: map[string]string{},
252 })
253 if err != nil {
254 t.Fatal(err)
255 }
256 want.RetentionDuration = 2 * time.Hour
257 want.Labels = nil
258 if diff := testutil.Diff(got, want, opt); diff != "" {
259 t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
260 }
261
262 _, err = sub.Update(ctx, SubscriptionConfigToUpdate{})
263 if err == nil {
264 t.Fatal("got nil, want error")
265 }
266
267
268 got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
269 ExpirationPolicy: time.Duration(0),
270 })
271 if err != nil {
272 t.Fatal(err)
273 }
274 want.ExpirationPolicy = time.Duration(0)
275 if diff := testutil.Diff(got, want, opt); diff != "" {
276 t.Fatalf("compare subscription config mismatch, -got, +want\n%s", diff)
277 }
278 }
279
280 func TestReceive(t *testing.T) {
281 testReceive(t, true, false)
282 testReceive(t, false, false)
283 testReceive(t, false, true)
284 }
285
286 func testReceive(t *testing.T, synchronous, exactlyOnceDelivery bool) {
287 t.Run(fmt.Sprintf("synchronous:%t,exactlyOnceDelivery:%t", synchronous, exactlyOnceDelivery), func(t *testing.T) {
288 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
289 defer cancel()
290 client, srv := newFake(t)
291 defer client.Close()
292 defer srv.Close()
293
294 topic := mustCreateTopic(t, client, "t")
295 sub, err := client.CreateSubscription(ctx, "s", SubscriptionConfig{
296 Topic: topic,
297 EnableExactlyOnceDelivery: exactlyOnceDelivery,
298 })
299 if err != nil {
300 t.Fatal(err)
301 }
302 for i := 0; i < 256; i++ {
303 srv.Publish(topic.name, []byte{byte(i)}, nil)
304 }
305 sub.ReceiveSettings.Synchronous = synchronous
306 msgs, err := pullN(ctx, sub, 256, 0, func(_ context.Context, m *Message) {
307 if exactlyOnceDelivery {
308 ar := m.AckWithResult()
309
310 ackStatus, err := ar.Get(context.Background())
311 if err != nil {
312 t.Fatalf("pullN err for message(%s): %v", m.ID, err)
313 }
314 if ackStatus != AcknowledgeStatusSuccess {
315 t.Fatalf("pullN got non-success AckStatus: %v", ackStatus)
316 }
317 } else {
318 m.Ack()
319 }
320 })
321 if c := status.Convert(err); err != nil && c.Code() != codes.Canceled {
322 t.Fatalf("Pull: %v", err)
323 }
324 var seen [256]bool
325 for _, m := range msgs {
326 seen[m.Data[0]] = true
327 }
328 for i, saw := range seen {
329 if !saw {
330 t.Errorf("sync=%t, eod=%t: did not see message #%d", synchronous, exactlyOnceDelivery, i)
331 }
332 }
333 })
334 }
335
336 func (t1 *Topic) Equal(t2 *Topic) bool {
337 if t1 == nil && t2 == nil {
338 return true
339 }
340 if t1 == nil || t2 == nil {
341 return false
342 }
343 return t1.c == t2.c && t1.name == t2.name
344 }
345
346
347 func newFake(t *testing.T) (*Client, *pstest.Server) {
348 ctx := context.Background()
349 srv := pstest.NewServer()
350 client, err := NewClient(ctx, projName,
351 option.WithEndpoint(srv.Addr),
352 option.WithoutAuthentication(),
353 option.WithGRPCDialOption(grpc.WithInsecure()))
354 if err != nil {
355 t.Fatal(err)
356 }
357 return client, srv
358 }
359
360 func TestPushConfigAuthenticationMethod_toProto(t *testing.T) {
361 in := &PushConfig{
362 Endpoint: "https://example.com/push",
363 AuthenticationMethod: &OIDCToken{
364 ServiceAccountEmail: "foo@example.com",
365 Audience: "client-12345",
366 },
367 }
368 got := in.toProto()
369 want := &pb.PushConfig{
370 PushEndpoint: "https://example.com/push",
371 AuthenticationMethod: &pb.PushConfig_OidcToken_{
372 OidcToken: &pb.PushConfig_OidcToken{
373 ServiceAccountEmail: "foo@example.com",
374 Audience: "client-12345",
375 },
376 },
377 }
378 if diff := testutil.Diff(got, want); diff != "" {
379 t.Errorf("Roundtrip to Proto failed\ngot: - want: +\n%s", diff)
380 }
381 }
382
383 func TestDeadLettering_toProto(t *testing.T) {
384 in := &DeadLetterPolicy{
385 MaxDeliveryAttempts: 10,
386 DeadLetterTopic: "projects/p/topics/t",
387 }
388 got := in.toProto()
389 want := &pb.DeadLetterPolicy{
390 DeadLetterTopic: "projects/p/topics/t",
391 MaxDeliveryAttempts: 10,
392 }
393 if diff := testutil.Diff(got, want); diff != "" {
394 t.Errorf("Roundtrip to Proto failed\ngot: - want: +\n%s", diff)
395 }
396 }
397
398
399
400 func TestDeadLettering_toMessage(t *testing.T) {
401
402 receivedMsg := &pb.ReceivedMessage{
403 AckId: "1234",
404 Message: &pb.PubsubMessage{
405 Data: []byte("some message"),
406 MessageId: "id-1234",
407 PublishTime: timestamppb.Now(),
408 },
409 }
410 got, err := toMessage(receivedMsg, time.Time{}, nil)
411 if err != nil {
412 t.Errorf("toMessage failed: %v", err)
413 }
414 if got.DeliveryAttempt != nil {
415 t.Errorf("toMessage with dead-lettering disabled failed\ngot: %d, want nil", *got.DeliveryAttempt)
416 }
417
418
419 receivedMsg.DeliveryAttempt = 10
420 got, err = toMessage(receivedMsg, time.Time{}, nil)
421 if err != nil {
422 t.Errorf("toMessage failed: %v", err)
423 }
424 if *got.DeliveryAttempt != int(receivedMsg.DeliveryAttempt) {
425 t.Errorf("toMessage with dead-lettered enabled failed\ngot: %d, want %d", *got.DeliveryAttempt, receivedMsg.DeliveryAttempt)
426 }
427 }
428
429 func TestRetryPolicy_toProto(t *testing.T) {
430 in := &RetryPolicy{
431 MinimumBackoff: 20 * time.Second,
432 MaximumBackoff: 300 * time.Second,
433 }
434 got := in.toProto()
435 want := &pb.RetryPolicy{
436 MinimumBackoff: durationpb.New(20 * time.Second),
437 MaximumBackoff: durationpb.New(300 * time.Second),
438 }
439 if diff := testutil.Diff(got, want); diff != "" {
440 t.Errorf("Roundtrip to Proto failed\ngot: - want: +\n%s", diff)
441 }
442 }
443
444 func TestOrdering_CreateSubscription(t *testing.T) {
445 ctx := context.Background()
446 client, srv := newFake(t)
447 defer client.Close()
448 defer srv.Close()
449
450 topic := mustCreateTopic(t, client, "t")
451 subConfig := SubscriptionConfig{
452 Topic: topic,
453 EnableMessageOrdering: true,
454 }
455 orderSub, err := client.CreateSubscription(ctx, "s", subConfig)
456 if err != nil {
457 t.Fatal(err)
458 }
459 cfg, err := orderSub.Config(ctx)
460 if err != nil {
461 t.Fatal(err)
462 }
463 if !cfg.EnableMessageOrdering {
464 t.Fatalf("Expected EnableMessageOrdering to be true in %s", orderSub.String())
465 }
466
467
468 ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
469 defer cancel()
470 orderSub.Receive(ctx, func(ctx context.Context, msg *Message) {
471 msg.Ack()
472 })
473 }
474
475 func TestBigQuerySubscription(t *testing.T) {
476 ctx, cancel := context.WithCancel(context.Background())
477 defer cancel()
478 client, srv := newFake(t)
479 defer client.Close()
480 defer srv.Close()
481
482 topic := mustCreateTopic(t, client, "t")
483 bqTable := "some-project:some-dataset.some-table"
484 bqConfig := BigQueryConfig{
485 Table: bqTable,
486 }
487
488 subConfig := SubscriptionConfig{
489 Topic: topic,
490 BigQueryConfig: bqConfig,
491 }
492 bqSub, err := client.CreateSubscription(ctx, "s", subConfig)
493 if err != nil {
494 t.Fatal(err)
495 }
496 cfg, err := bqSub.Config(ctx)
497 if err != nil {
498 t.Fatal(err)
499 }
500
501 want := bqConfig
502 want.State = BigQueryConfigActive
503 if diff := testutil.Diff(cfg.BigQueryConfig, want); diff != "" {
504 t.Fatalf("CreateBQSubscription mismatch: \n%s", diff)
505 }
506 }
507
508 func TestCloudStorageSubscription(t *testing.T) {
509 ctx, cancel := context.WithCancel(context.Background())
510 defer cancel()
511 client, srv := newFake(t)
512 defer client.Close()
513 defer srv.Close()
514
515 topic := mustCreateTopic(t, client, "t")
516 bucket := "fake-bucket"
517 csCfg := CloudStorageConfig{
518 Bucket: bucket,
519 FilenamePrefix: "some-prefix",
520 FilenameSuffix: "some-suffix",
521 OutputFormat: &CloudStorageOutputFormatAvroConfig{
522 WriteMetadata: true,
523 },
524 MaxDuration: 10 * time.Minute,
525 MaxBytes: 10e5,
526 }
527
528 subConfig := SubscriptionConfig{
529 Topic: topic,
530 CloudStorageConfig: csCfg,
531 }
532 csSub, err := client.CreateSubscription(ctx, "s", subConfig)
533 if err != nil {
534 t.Fatal(err)
535 }
536 cfg, err := csSub.Config(ctx)
537 if err != nil {
538 t.Fatal(err)
539 }
540
541 want := csCfg
542 want.State = CloudStorageConfigActive
543 if diff := testutil.Diff(cfg.CloudStorageConfig, want); diff != "" {
544 t.Fatalf("create cloud storage subscription mismatch: \n%s", diff)
545 }
546
547 csCfg.OutputFormat = &CloudStorageOutputFormatTextConfig{}
548 cfg, err = csSub.Update(ctx, SubscriptionConfigToUpdate{
549 CloudStorageConfig: &csCfg,
550 })
551 if err != nil {
552 t.Fatal(err)
553 }
554 got := cfg.CloudStorageConfig
555 want = csCfg
556 want.State = CloudStorageConfigActive
557 if diff := testutil.Diff(got, want); diff != "" {
558 t.Fatalf("update cloud storage subscription mismatch: \n%s", diff)
559 }
560
561
562 cfg, err = csSub.Update(ctx, SubscriptionConfigToUpdate{
563 CloudStorageConfig: &CloudStorageConfig{},
564 })
565 if err != nil {
566 t.Fatal(err)
567 }
568 got = cfg.CloudStorageConfig
569 want = CloudStorageConfig{}
570 if diff := testutil.Diff(got, want); diff != "" {
571 t.Fatalf("remove cloud storage subscription mismatch: \n%s", diff)
572 }
573 }
574
575 func TestExactlyOnceDelivery_AckSuccess(t *testing.T) {
576 t.Parallel()
577 ctx, cancel := context.WithCancel(context.Background())
578 client, srv := newFake(t)
579 defer client.Close()
580 defer srv.Close()
581
582 topic := mustCreateTopic(t, client, "t")
583 subConfig := SubscriptionConfig{
584 Topic: topic,
585 EnableExactlyOnceDelivery: true,
586 }
587 s, err := client.CreateSubscription(ctx, "s", subConfig)
588 if err != nil {
589 t.Fatalf("create sub err: %v", err)
590 }
591 s.ReceiveSettings.NumGoroutines = 1
592 r := topic.Publish(ctx, &Message{
593 Data: []byte("exactly-once-message"),
594 })
595 if _, err := r.Get(ctx); err != nil {
596 t.Fatalf("failed to publish message: %v", err)
597 }
598
599 err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
600 ar := msg.AckWithResult()
601 s, err := ar.Get(ctx)
602 if s != AcknowledgeStatusSuccess {
603 t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusSuccess)
604 }
605 if err != nil {
606 t.Errorf("AckResult error got %v", err)
607 }
608 cancel()
609 })
610 if err != nil {
611 t.Fatalf("s.Receive err: %v", err)
612 }
613 }
614
615 func TestExactlyOnceDelivery_AckFailureErrorPermissionDenied(t *testing.T) {
616 t.Parallel()
617 ctx, cancel := context.WithCancel(context.Background())
618 srv := pstest.NewServer(pstest.WithErrorInjection("Acknowledge", codes.PermissionDenied, "insufficient permission"))
619 client, err := NewClient(ctx, projName,
620 option.WithEndpoint(srv.Addr),
621 option.WithoutAuthentication(),
622 option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
623 if err != nil {
624 t.Fatal(err)
625 }
626 defer client.Close()
627 defer srv.Close()
628
629 topic := mustCreateTopic(t, client, "t")
630 subConfig := SubscriptionConfig{
631 Topic: topic,
632 EnableExactlyOnceDelivery: true,
633 }
634 s, err := client.CreateSubscription(ctx, "s", subConfig)
635 if err != nil {
636 t.Fatalf("create sub err: %v", err)
637 }
638 s.ReceiveSettings.NumGoroutines = 1
639 r := topic.Publish(ctx, &Message{
640 Data: []byte("exactly-once-message"),
641 })
642 if _, err := r.Get(ctx); err != nil {
643 t.Fatalf("failed to publish message: %v", err)
644 }
645 err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
646 ar := msg.AckWithResult()
647 s, err := ar.Get(ctx)
648 if s != AcknowledgeStatusPermissionDenied {
649 t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusPermissionDenied)
650 }
651 wantErr := status.Errorf(codes.PermissionDenied, "insufficient permission")
652 if !errors.Is(err, wantErr) {
653 t.Errorf("AckResult error\ngot %v\nwant %s", err, wantErr)
654 }
655 cancel()
656 })
657 if err != nil {
658 t.Fatalf("s.Receive err: %v", err)
659 }
660 }
661
662 func TestExactlyOnceDelivery_AckRetryDeadlineExceeded(t *testing.T) {
663 ctx, cancel := context.WithCancel(context.Background())
664 srv := pstest.NewServer(pstest.WithErrorInjection("Acknowledge", codes.Internal, "internal error"))
665 client, err := NewClient(ctx, projName,
666 option.WithEndpoint(srv.Addr),
667 option.WithoutAuthentication(),
668 option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
669 if err != nil {
670 t.Fatal(err)
671 }
672 defer client.Close()
673 defer srv.Close()
674
675 topic := mustCreateTopic(t, client, "t")
676 subConfig := SubscriptionConfig{
677 Topic: topic,
678 EnableExactlyOnceDelivery: true,
679 }
680 s, err := client.CreateSubscription(ctx, "s", subConfig)
681 if err != nil {
682 t.Fatalf("create sub err: %v", err)
683 }
684 r := topic.Publish(ctx, &Message{
685 Data: []byte("exactly-once-message"),
686 })
687 if _, err := r.Get(ctx); err != nil {
688 t.Fatalf("failed to publish message: %v", err)
689 }
690
691 s.ReceiveSettings = ReceiveSettings{
692 NumGoroutines: 1,
693 }
694
695 exactlyOnceDeliveryRetryDeadline = 10 * time.Second
696 err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
697 log.Printf("received message: %v\n", msg)
698 ar := msg.AckWithResult()
699 s, err := ar.Get(ctx)
700 if s != AcknowledgeStatusOther {
701 t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusOther)
702 }
703 wantErr := context.DeadlineExceeded
704 if !errors.Is(err, wantErr) {
705 t.Errorf("AckResult error\ngot %v\nwant %s", err, wantErr)
706 }
707 cancel()
708 })
709 if err != nil {
710 t.Fatalf("s.Receive err: %v", err)
711 }
712 }
713
714 func TestExactlyOnceDelivery_NackSuccess(t *testing.T) {
715 t.Parallel()
716 ctx, cancel := context.WithCancel(context.Background())
717 client, srv := newFake(t)
718 defer client.Close()
719 defer srv.Close()
720
721 topic := mustCreateTopic(t, client, "t")
722 subConfig := SubscriptionConfig{
723 Topic: topic,
724 EnableExactlyOnceDelivery: true,
725 }
726 s, err := client.CreateSubscription(ctx, "s", subConfig)
727 if err != nil {
728 t.Fatalf("create sub err: %v", err)
729 }
730 r := topic.Publish(ctx, &Message{
731 Data: []byte("exactly-once-message"),
732 })
733 if _, err := r.Get(ctx); err != nil {
734 t.Fatalf("failed to publish message: %v", err)
735 }
736
737 s.ReceiveSettings = ReceiveSettings{
738 NumGoroutines: 1,
739 }
740 err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
741 ar := msg.NackWithResult()
742 s, err := ar.Get(context.Background())
743 if s != AcknowledgeStatusSuccess {
744 t.Errorf("AckResult AckStatus got %v, want %v", s, AcknowledgeStatusSuccess)
745 }
746 if err != nil {
747 t.Errorf("AckResult error got %v", err)
748 }
749 cancel()
750 })
751 if err != nil {
752 t.Fatalf("s.Receive err: %v", err)
753 }
754 }
755
756 func TestExactlyOnceDelivery_ReceiptModackError(t *testing.T) {
757 ctx := context.Background()
758 srv := pstest.NewServer(pstest.WithErrorInjection("ModifyAckDeadline", codes.Internal, "internal error"))
759 client, err := NewClient(ctx, projName,
760 option.WithEndpoint(srv.Addr),
761 option.WithoutAuthentication(),
762 option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
763 if err != nil {
764 t.Fatal(err)
765 }
766 defer client.Close()
767 defer srv.Close()
768
769 topic := mustCreateTopic(t, client, "t")
770 subConfig := SubscriptionConfig{
771 Topic: topic,
772 EnableExactlyOnceDelivery: true,
773 }
774 s, err := client.CreateSubscription(ctx, "s", subConfig)
775 if err != nil {
776 t.Fatalf("create sub err: %v", err)
777 }
778 r := topic.Publish(ctx, &Message{
779 Data: []byte("exactly-once-message"),
780 })
781 if _, err := r.Get(ctx); err != nil {
782 t.Fatalf("failed to publish message: %v", err)
783 }
784 ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
785 defer cancel()
786 s.Receive(ctx, func(ctx context.Context, msg *Message) {
787 t.Fatal("expected message to not have been delivered when exactly once enabled")
788 })
789 }
790
791 func TestSubscribeMessageExpirationFlowControl(t *testing.T) {
792 t.Parallel()
793 ctx, cancel := context.WithCancel(context.Background())
794 defer cancel()
795 client, srv := newFake(t)
796 defer client.Close()
797 defer srv.Close()
798
799 topic := mustCreateTopic(t, client, "t")
800 subConfig := SubscriptionConfig{
801 Topic: topic,
802 }
803 s, err := client.CreateSubscription(ctx, "s", subConfig)
804 if err != nil {
805 t.Fatalf("create sub err: %v", err)
806 }
807
808 s.ReceiveSettings.NumGoroutines = 1
809 s.ReceiveSettings.MaxOutstandingMessages = 1
810 s.ReceiveSettings.MaxExtension = 10 * time.Second
811 s.ReceiveSettings.MaxExtensionPeriod = 10 * time.Second
812 r := topic.Publish(ctx, &Message{
813 Data: []byte("redelivered-message"),
814 })
815 if _, err := r.Get(ctx); err != nil {
816 t.Fatalf("failed to publish message: %v", err)
817 }
818
819 deliveryCount := 0
820 ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
821 defer cancel()
822 err = s.Receive(ctx, func(ctx context.Context, msg *Message) {
823
824 if deliveryCount == 1 {
825 msg.Ack()
826 }
827
828 deliveryCount++
829 if deliveryCount == 2 {
830 cancel()
831 }
832 })
833 if deliveryCount != 2 {
834 t.Fatalf("expected 2 iterations of the callback, got %d", deliveryCount)
835 }
836 if err != nil {
837 t.Fatalf("s.Receive err: %v", err)
838 }
839 }
840
View as plain text