1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "bytes"
19 "context"
20 "errors"
21 "fmt"
22 "reflect"
23 "sync"
24 "sync/atomic"
25 "testing"
26 "time"
27
28 ipubsub "cloud.google.com/go/internal/pubsub"
29 "cloud.google.com/go/internal/testutil"
30 pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
31 "cloud.google.com/go/pubsub/pstest"
32 "google.golang.org/api/option"
33 "google.golang.org/grpc"
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/credentials/insecure"
36 "google.golang.org/grpc/status"
37 )
38
39 var (
40 projName = "P"
41 topicName = "some-topic"
42 subName = "some-sub"
43 fullyQualifiedTopicName = fmt.Sprintf("projects/%s/topics/%s", projName, topicName)
44 fullyQualifiedSubName = fmt.Sprintf("projects/%s/subscriptions/%s", projName, subName)
45 )
46
47 func TestSplitRequestIDs(t *testing.T) {
48 t.Parallel()
49 ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"}
50 for _, test := range []struct {
51 ids []string
52 splitIndex int
53 }{
54 {[]string{}, 0},
55 {ids, 2},
56 {ids[:2], 2},
57 {ids[:1], 1},
58 } {
59 got1, got2 := splitRequestIDs(test.ids, 2)
60 want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:]
61 if !testutil.Equal(len(got1), len(want1)) {
62 t.Errorf("%v, 1: got %v, want %v", test, got1, want1)
63 }
64 if !testutil.Equal(len(got2), len(want2)) {
65 t.Errorf("%v, 2: got %v, want %v", test, got2, want2)
66 }
67 }
68 }
69
70 func TestCalcFieldSize(t *testing.T) {
71 t.Parallel()
72
73 req := &pb.AcknowledgeRequest{
74 Subscription: "sub",
75 AckIds: []string{"aaa", "bbb", "ccc", "ddd", "eee"},
76 }
77 size := calcFieldSizeString(req.Subscription) + calcFieldSizeString(req.AckIds...)
78
79
80 want := (1 + 1) + len(req.Subscription) +
81 5*(1+1+3)
82 if size != want {
83 t.Errorf("pubsub: calculated ack req size of %d bytes, want %d", size, want)
84 }
85
86 req.Subscription = string(bytes.Repeat([]byte{'A'}, 300))
87 size = calcFieldSizeString(req.Subscription) + calcFieldSizeString(req.AckIds...)
88
89
90 want = (1 + 2) + len(req.Subscription) +
91 5*(1+1+3)
92 if size != want {
93 t.Errorf("pubsub: calculated ack req size of %d bytes, want %d", size, want)
94 }
95
96
97 modAckReq := &pb.ModifyAckDeadlineRequest{
98 Subscription: "sub",
99 AckIds: []string{"aaa", "bbb", "ccc", "ddd", "eee"},
100 AckDeadlineSeconds: 300,
101 }
102
103 size = calcFieldSizeString(modAckReq.Subscription) +
104 calcFieldSizeString(modAckReq.AckIds...) +
105 calcFieldSizeInt(int(modAckReq.AckDeadlineSeconds))
106
107 want = (1 + 1) + len(modAckReq.Subscription) +
108 5*(1+1+3) +
109 (1 + 2)
110 if size != want {
111 t.Errorf("pubsub: calculated modAck req size of %d bytes, want %d", size, want)
112 }
113 }
114
115 func TestMaxExtensionPeriod(t *testing.T) {
116 srv := pstest.NewServer()
117 ctx, cancel := context.WithCancel(context.Background())
118 defer cancel()
119
120 srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
121
122 _, client, err := initConn(ctx, srv.Addr)
123 if err != nil {
124 t.Fatal(err)
125 }
126 want := 15 * time.Second
127 iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{
128 maxExtensionPeriod: want,
129 })
130
131
132 receiveTime := time.Now().Add(time.Duration(-20) * time.Second)
133 iter.ackTimeDist.Record(int(time.Since(receiveTime) / time.Second))
134
135 if got := iter.ackDeadline(); got != want {
136 t.Fatalf("deadline got = %v, want %v", got, want)
137 }
138 }
139
140 func TestAckDistribution(t *testing.T) {
141 if testing.Short() {
142 t.SkipNow()
143 }
144 t.Skip("broken")
145
146 ctx, cancel := context.WithCancel(context.Background())
147 defer cancel()
148
149 minDurationPerLeaseExtension = 1 * time.Second
150 pstest.SetMinAckDeadline(minDurationPerLeaseExtension)
151 srv := pstest.NewServer()
152 defer srv.Close()
153 defer pstest.ResetMinAckDeadline()
154
155
156
157
158 srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
159
160 queuedMsgs := make(chan int32, 1024)
161 go continuouslySend(ctx, srv, queuedMsgs)
162
163 for _, testcase := range []struct {
164 initialProcessSecs int32
165 finalProcessSecs int32
166 }{
167 {initialProcessSecs: 3, finalProcessSecs: 5},
168 {initialProcessSecs: 5, finalProcessSecs: 3},
169 } {
170 t.Logf("Testing %d -> %d", testcase.initialProcessSecs, testcase.finalProcessSecs)
171
172
173
174
175
176
177 processTimeSecs := testcase.initialProcessSecs
178
179 s, client, err := initConn(ctx, srv.Addr)
180 if err != nil {
181 t.Fatal(err)
182 }
183
184
185 recvdWg := &sync.WaitGroup{}
186
187 go startReceiving(ctx, t, s, recvdWg, &processTimeSecs)
188 startSending(t, queuedMsgs, &processTimeSecs, testcase.initialProcessSecs, testcase.finalProcessSecs, recvdWg)
189
190 recvdWg.Wait()
191 time.Sleep(100 * time.Millisecond)
192 err = client.Close()
193 if err != nil {
194 t.Fatal(err)
195 }
196
197 modacks := modacksByTime(srv.Messages())
198 u := modackDeadlines(modacks)
199 initialDL := int32(minDurationPerLeaseExtension / time.Second)
200 if !setsAreEqual(u, []int32{initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs}) {
201 t.Fatalf("Expected modack deadlines to contain (exactly, and only) %ds, %ds, %ds. Instead, got %v",
202 initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs, toSet(u))
203 }
204 }
205 }
206
207
208 func modacksByTime(msgs []*pstest.Message) map[time.Time][]pstest.Modack {
209 modacks := map[time.Time][]pstest.Modack{}
210
211 for _, msg := range msgs {
212 for _, m := range msg.Modacks {
213 modacks[m.ReceivedAt] = append(modacks[m.ReceivedAt], m)
214 }
215 }
216 return modacks
217 }
218
219
220 func setsAreEqual(haystack, needles []int32) bool {
221 hMap := map[int32]bool{}
222 nMap := map[int32]bool{}
223
224 for _, n := range needles {
225 nMap[n] = true
226 }
227
228 for _, n := range haystack {
229 hMap[n] = true
230 }
231
232 return reflect.DeepEqual(nMap, hMap)
233 }
234
235
236
237 func startReceiving(ctx context.Context, t *testing.T, s *Subscription, recvdWg *sync.WaitGroup, processTimeSecs *int32) {
238 t.Log("Receiving..")
239
240 var recvdMu sync.Mutex
241 recvd := map[string]bool{}
242
243 err := s.Receive(ctx, func(ctx context.Context, msg *Message) {
244 msgData := string(msg.Data)
245 recvdMu.Lock()
246 _, ok := recvd[msgData]
247 if ok {
248 recvdMu.Unlock()
249 t.Logf("already saw \"%s\"\n", msgData)
250 return
251 }
252 recvd[msgData] = true
253 recvdMu.Unlock()
254
255 select {
256 case <-ctx.Done():
257 msg.Nack()
258 recvdWg.Done()
259 case <-time.After(time.Duration(atomic.LoadInt32(processTimeSecs)) * time.Second):
260 msg.Ack()
261 recvdWg.Done()
262 }
263 })
264 if err != nil {
265 if status.Code(err) != codes.Canceled {
266 t.Error(err)
267 }
268 }
269 }
270
271
272 func startSending(t *testing.T, queuedMsgs chan int32, processTimeSecs *int32, initialProcessSecs int32, finalProcessSecs int32, recvdWg *sync.WaitGroup) {
273 var msg int32
274
275
276
277
278 t.Log("minAckDeadlineSecsSending an initial message")
279 recvdWg.Add(1)
280 msg++
281 queuedMsgs <- msg
282 <-time.After(minDurationPerLeaseExtension)
283
284 t.Logf("Sending some messages to update distribution to %d. This new distribution will be used "+
285 "when the next batch of messages go out.", initialProcessSecs)
286 for i := 0; i < 10; i++ {
287 recvdWg.Add(1)
288 msg++
289 queuedMsgs <- msg
290 }
291 atomic.SwapInt32(processTimeSecs, finalProcessSecs)
292 <-time.After(time.Duration(initialProcessSecs) * time.Second)
293
294 t.Logf("Sending many messages to update distribution to %d. This new distribution will be used "+
295 "when the next batch of messages go out.", finalProcessSecs)
296 for i := 0; i < 100; i++ {
297 recvdWg.Add(1)
298 msg++
299 queuedMsgs <- msg
300 }
301 <-time.After(time.Duration(finalProcessSecs) * time.Second)
302
303 t.Logf("Last message going out, whose deadline should be %d.", finalProcessSecs)
304 recvdWg.Add(1)
305 msg++
306 queuedMsgs <- msg
307 }
308
309
310 func continuouslySend(ctx context.Context, srv *pstest.Server, queuedMsgs chan int32) {
311 for {
312 select {
313 case <-ctx.Done():
314 return
315 case m := <-queuedMsgs:
316 srv.Publish(fullyQualifiedTopicName, []byte(fmt.Sprintf("message %d", m)), nil)
317 }
318 }
319 }
320
321 func toSet(arr []int32) []int32 {
322 var s []int32
323 m := map[int32]bool{}
324
325 for _, v := range arr {
326 _, ok := m[v]
327 if !ok {
328 s = append(s, v)
329 m[v] = true
330 }
331 }
332
333 return s
334
335 }
336
337 func initConn(ctx context.Context, addr string) (*Subscription, *Client, error) {
338 conn, err := grpc.Dial(addr, grpc.WithInsecure())
339 if err != nil {
340 return nil, nil, err
341 }
342 e := testutil.DefaultHeadersEnforcer()
343 opts := append(e.CallOptions(), option.WithGRPCConn(conn))
344 client, err := NewClient(ctx, projName, opts...)
345 if err != nil {
346 return nil, nil, err
347 }
348
349 topic := client.Topic(topicName)
350 s, err := client.CreateSubscription(ctx, fmt.Sprintf("sub-%d", time.Now().UnixNano()), SubscriptionConfig{Topic: topic})
351 if err != nil {
352 return nil, nil, err
353 }
354
355 exists, err := s.Exists(ctx)
356 if !exists {
357 return nil, nil, errors.New("Subscription does not exist")
358 }
359 if err != nil {
360 return nil, nil, err
361 }
362
363 return s, client, nil
364 }
365
366
367
368 func modackDeadlines(m map[time.Time][]pstest.Modack) []int32 {
369 var u []int32
370 for _, vv := range m {
371 for _, v := range vv {
372 u = append(u, v.AckDeadline)
373 }
374 }
375 return u
376 }
377
378 func TestIterator_ModifyAckContextDeadline(t *testing.T) {
379
380
381 opts := []pstest.ServerReactorOption{
382 pstest.WithErrorInjection("ModifyAckDeadline", codes.Unknown, "context deadline exceeded"),
383 }
384 srv := pstest.NewServer(opts...)
385 ctx, cancel := context.WithCancel(context.Background())
386 defer cancel()
387
388 srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
389 s, client, err := initConn(ctx, srv.Addr)
390 if err != nil {
391 t.Fatal(err)
392 }
393
394 srv.Publish(fullyQualifiedTopicName, []byte("some-message"), nil)
395 cctx, cancel := context.WithTimeout(ctx, time.Duration(5*time.Second))
396 defer cancel()
397 err = s.Receive(cctx, func(ctx context.Context, m *Message) {
398 m.Ack()
399 })
400 if err != nil {
401 t.Fatalf("Got error in Receive: %v", err)
402 }
403
404 err = client.Close()
405 if err != nil {
406 t.Fatal(err)
407 }
408 }
409
410 func TestIterator_SynchronousPullCancel(t *testing.T) {
411 srv := pstest.NewServer()
412 ctx, cancel := context.WithCancel(context.Background())
413 defer cancel()
414
415 srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
416
417 _, client, err := initConn(ctx, srv.Addr)
418 if err != nil {
419 t.Fatal(err)
420 }
421 iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{})
422
423
424 iter.cancel()
425
426 if _, err := iter.pullMessages(100); err != nil {
427 t.Fatalf("Got error in pullMessages: %v", err)
428 }
429 }
430
431 func TestIterator_BoundedDuration(t *testing.T) {
432
433
434
435
436
437
438 testCases := []struct {
439 desc string
440 AckDeadline time.Duration
441 MinDuration time.Duration
442 MaxDuration time.Duration
443 exactlyOnce bool
444 Want time.Duration
445 }{
446 {
447 desc: "AckDeadline should be updated to the min duration",
448 AckDeadline: time.Duration(10 * time.Second),
449 MinDuration: time.Duration(15 * time.Second),
450 MaxDuration: time.Duration(10 * time.Minute),
451 exactlyOnce: false,
452 Want: time.Duration(15 * time.Second),
453 },
454 {
455 desc: "AckDeadline should be updated to 1 minute when using exactly once",
456 AckDeadline: time.Duration(10 * time.Second),
457 MinDuration: 0,
458 MaxDuration: time.Duration(10 * time.Minute),
459 exactlyOnce: true,
460 Want: time.Duration(1 * time.Minute),
461 },
462 {
463 desc: "AckDeadline should not be updated here, even though exactly once is enabled",
464 AckDeadline: time.Duration(10 * time.Second),
465 MinDuration: time.Duration(15 * time.Second),
466 MaxDuration: time.Duration(10 * time.Minute),
467 exactlyOnce: true,
468 Want: time.Duration(15 * time.Second),
469 },
470 {
471 desc: "AckDeadline should not be updated here",
472 AckDeadline: time.Duration(10 * time.Minute),
473 MinDuration: time.Duration(15 * time.Second),
474 MaxDuration: time.Duration(10 * time.Minute),
475 exactlyOnce: true,
476 Want: time.Duration(10 * time.Minute),
477 },
478 {
479 desc: "AckDeadline should not be updated when neither durations are set",
480 AckDeadline: time.Duration(5 * time.Minute),
481 MinDuration: 0,
482 MaxDuration: 0,
483 exactlyOnce: false,
484 Want: time.Duration(5 * time.Minute),
485 },
486 {
487 desc: "AckDeadline should should not be updated here since it is within both boundaries",
488 AckDeadline: time.Duration(5 * time.Minute),
489 MinDuration: time.Duration(1 * time.Minute),
490 MaxDuration: time.Duration(7 * time.Minute),
491 exactlyOnce: false,
492 Want: time.Duration(5 * time.Minute),
493 },
494 }
495 for _, tc := range testCases {
496 t.Run(tc.desc, func(t *testing.T) {
497 got := boundedDuration(tc.AckDeadline, tc.MinDuration, tc.MaxDuration, tc.exactlyOnce)
498 if got != tc.Want {
499 t.Errorf("boundedDuration mismatch:\n%+v\ngot: %v, want: %v", tc, got, tc.Want)
500 }
501 })
502 }
503 }
504
505 func TestIterator_StreamingPullExactlyOnce(t *testing.T) {
506 srv := pstest.NewServer()
507 ctx, cancel := context.WithCancel(context.Background())
508 defer cancel()
509
510 srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
511
512 conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
513 if err != nil {
514 t.Fatal(err)
515 }
516 opts := withGRPCHeadersAssertion(t, option.WithGRPCConn(conn))
517 client, err := NewClient(ctx, projName, opts...)
518 if err != nil {
519 t.Fatal(err)
520 }
521
522 topic := client.Topic(topicName)
523 sc := SubscriptionConfig{
524 Topic: topic,
525 EnableMessageOrdering: true,
526 EnableExactlyOnceDelivery: true,
527 }
528 _, err = client.CreateSubscription(ctx, subName, sc)
529 if err != nil {
530 t.Fatal(err)
531 }
532
533
534 srv.Publish(fullyQualifiedTopicName, []byte("msg"), nil)
535
536 iter := newMessageIterator(client.subc, fullyQualifiedSubName, &pullOptions{
537 synchronous: false,
538 maxOutstandingMessages: 100,
539 maxOutstandingBytes: 1e6,
540 maxPrefetch: 30,
541 maxExtension: 1 * time.Minute,
542 maxExtensionPeriod: 10 * time.Second,
543 })
544
545 if _, err := iter.receive(10); err != nil {
546 t.Fatalf("Got error in recvMessages: %v", err)
547 }
548
549 if !iter.enableExactlyOnceDelivery {
550 t.Fatalf("expected iter.enableExactlyOnce=true")
551 }
552 }
553
554 func TestAddToDistribution(t *testing.T) {
555 c, _ := newFake(t)
556
557 iter := newMessageIterator(c.subc, "some-sub", &pullOptions{})
558
559
560 receiveTime := time.Now().Add(time.Duration(-1) * time.Second)
561 iter.addToDistribution(receiveTime)
562 deadline := iter.ackTimeDist.Percentile(.99)
563 want := 10
564 if deadline != want {
565 t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want)
566 }
567
568
569 receiveTime = time.Now().Add(time.Duration(-300) * time.Second)
570 iter.addToDistribution(receiveTime)
571 deadline = iter.ackTimeDist.Percentile(.99)
572 want = 300
573 if deadline != want {
574 t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want)
575 }
576
577
578 receiveTime = time.Now().Add(time.Duration(-1000) * time.Second)
579 iter.addToDistribution(receiveTime)
580 deadline = iter.ackTimeDist.Percentile(.99)
581 want = 600
582 if deadline != want {
583 t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want)
584 }
585 }
586
587 func TestPingStreamAckDeadline(t *testing.T) {
588 c, srv := newFake(t)
589 ctx, cancel := context.WithCancel(context.Background())
590 defer cancel()
591
592 srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
593 topic := c.Topic(topicName)
594 s, err := c.CreateSubscription(ctx, subName, SubscriptionConfig{Topic: topic})
595 if err != nil {
596 t.Errorf("failed to create subscription: %v", err)
597 }
598
599 iter := newMessageIterator(c.subc, fullyQualifiedSubName, &pullOptions{})
600 defer iter.stop()
601
602 iter.eoMu.RLock()
603 if iter.enableExactlyOnceDelivery {
604 t.Error("iter.enableExactlyOnceDelivery should be false")
605 }
606 iter.eoMu.RUnlock()
607
608 _, err = s.Update(ctx, SubscriptionConfigToUpdate{
609 EnableExactlyOnceDelivery: true,
610 })
611 if err != nil {
612 t.Error(err)
613 }
614 srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
615
616 iter.receive(1)
617 iter.eoMu.RLock()
618 if !iter.enableExactlyOnceDelivery {
619 t.Error("iter.enableExactlyOnceDelivery should be true")
620 }
621 iter.eoMu.RUnlock()
622 }
623
624 func compareCompletedRetryLengths(t *testing.T, completed, retry map[string]*AckResult, wantCompleted, wantRetry int) {
625 if l := len(completed); l != wantCompleted {
626 t.Errorf("completed slice length got %d, want %d", l, wantCompleted)
627 }
628 if l := len(retry); l != wantRetry {
629 t.Errorf("retry slice length got %d, want %d", l, wantRetry)
630 }
631 }
632
633 func TestExactlyOnceProcessRequests(t *testing.T) {
634 ctx := context.Background()
635
636 t.Run("NoResults", func(t *testing.T) {
637
638
639 completed, retry := processResults(nil, nil, nil)
640 compareCompletedRetryLengths(t, completed, retry, 0, 0)
641 })
642
643 t.Run("NoErrorsNilAckResult", func(t *testing.T) {
644
645 ackReqMap := map[string]*AckResult{
646 "ackID": nil,
647 }
648 completed, retry := processResults(nil, ackReqMap, nil)
649 compareCompletedRetryLengths(t, completed, retry, 1, 0)
650 })
651
652 t.Run("NoErrors", func(t *testing.T) {
653
654 r := ipubsub.NewAckResult()
655 ackReqMap := map[string]*AckResult{
656 "ackID1": r,
657 }
658 completed, retry := processResults(nil, ackReqMap, nil)
659 compareCompletedRetryLengths(t, completed, retry, 1, 0)
660
661
662 s, err := r.Get(ctx)
663 if err != nil {
664 t.Errorf("AckResult err: got %v, want nil", err)
665 }
666 if s != AcknowledgeStatusSuccess {
667 t.Errorf("got %v, want AcknowledgeStatusSuccess", s)
668 }
669 })
670
671 t.Run("PermanentErrorInvalidAckID", func(t *testing.T) {
672 r := ipubsub.NewAckResult()
673 ackReqMap := map[string]*AckResult{
674 "ackID1": r,
675 }
676 errorsMap := map[string]string{
677 "ackID1": permanentInvalidAckErrString,
678 }
679 completed, retry := processResults(nil, ackReqMap, errorsMap)
680 compareCompletedRetryLengths(t, completed, retry, 1, 0)
681 s, err := r.Get(ctx)
682 if err == nil {
683 t.Error("AckResult err: got nil, want err")
684 }
685 if s != AcknowledgeStatusInvalidAckID {
686 t.Errorf("got %v, want AcknowledgeStatusSuccess", s)
687 }
688 })
689
690 t.Run("TransientErrorRetry", func(t *testing.T) {
691 r := ipubsub.NewAckResult()
692 ackReqMap := map[string]*AckResult{
693 "ackID1": r,
694 }
695 errorsMap := map[string]string{
696 "ackID1": transientErrStringPrefix + "_FAILURE",
697 }
698 completed, retry := processResults(nil, ackReqMap, errorsMap)
699 compareCompletedRetryLengths(t, completed, retry, 0, 1)
700 })
701
702 t.Run("UnknownError", func(t *testing.T) {
703 r := ipubsub.NewAckResult()
704 ackReqMap := map[string]*AckResult{
705 "ackID1": r,
706 }
707 errorsMap := map[string]string{
708 "ackID1": "unknown_error",
709 }
710 completed, retry := processResults(nil, ackReqMap, errorsMap)
711 compareCompletedRetryLengths(t, completed, retry, 1, 0)
712
713 s, err := r.Get(ctx)
714 if s != AcknowledgeStatusOther {
715 t.Errorf("got %v, want AcknowledgeStatusOther", s)
716 }
717 if err == nil || err.Error() != "unknown_error" {
718 t.Errorf("AckResult err: got %s, want unknown_error", err.Error())
719 }
720 })
721
722 t.Run("PermissionDenied", func(t *testing.T) {
723 r := ipubsub.NewAckResult()
724 ackReqMap := map[string]*AckResult{
725 "ackID1": r,
726 }
727 st := status.New(codes.PermissionDenied, "permission denied")
728 completed, retry := processResults(st, ackReqMap, nil)
729 compareCompletedRetryLengths(t, completed, retry, 1, 0)
730 s, err := r.Get(ctx)
731 if err == nil {
732 t.Error("AckResult err: got nil, want err")
733 }
734 if s != AcknowledgeStatusPermissionDenied {
735 t.Errorf("got %v, want AcknowledgeStatusPermissionDenied", s)
736 }
737 })
738
739 t.Run("FailedPrecondition", func(t *testing.T) {
740 r := ipubsub.NewAckResult()
741 ackReqMap := map[string]*AckResult{
742 "ackID1": r,
743 }
744 st := status.New(codes.FailedPrecondition, "failed_precondition")
745 completed, retry := processResults(st, ackReqMap, nil)
746 compareCompletedRetryLengths(t, completed, retry, 1, 0)
747 s, err := r.Get(ctx)
748 if err == nil {
749 t.Error("AckResult err: got nil, want err")
750 }
751 if s != AcknowledgeStatusFailedPrecondition {
752 t.Errorf("got %v, want AcknowledgeStatusFailedPrecondition", s)
753 }
754 })
755
756 t.Run("OtherErrorStatus", func(t *testing.T) {
757 r := ipubsub.NewAckResult()
758 ackReqMap := map[string]*AckResult{
759 "ackID1": r,
760 }
761 st := status.New(codes.OutOfRange, "out of range")
762 completed, retry := processResults(st, ackReqMap, nil)
763 compareCompletedRetryLengths(t, completed, retry, 1, 0)
764 s, err := r.Get(ctx)
765 if err == nil {
766 t.Error("AckResult err: got nil, want err")
767 }
768 if s != AcknowledgeStatusOther {
769 t.Errorf("got %v, want AcknowledgeStatusOther", s)
770 }
771 })
772
773 t.Run("MixedSuccessFailureAcks", func(t *testing.T) {
774 r1 := ipubsub.NewAckResult()
775 r2 := ipubsub.NewAckResult()
776 r3 := ipubsub.NewAckResult()
777 ackReqMap := map[string]*AckResult{
778 "ackID1": r1,
779 "ackID2": r2,
780 "ackID3": r3,
781 }
782 errorsMap := map[string]string{
783 "ackID1": permanentInvalidAckErrString,
784 "ackID2": transientErrStringPrefix + "_FAILURE",
785 }
786 completed, retry := processResults(nil, ackReqMap, errorsMap)
787 compareCompletedRetryLengths(t, completed, retry, 2, 1)
788
789 s, err := r1.Get(ctx)
790 if err == nil {
791 t.Error("r1: AckResult err: got nil, want err")
792 }
793 if s != AcknowledgeStatusInvalidAckID {
794 t.Errorf("r1: got %v, want AcknowledgeInvalidAckID", s)
795 }
796
797
798 ctx2, cancel := context.WithTimeout(ctx, 2*time.Second)
799 defer cancel()
800 _, err = r2.Get(ctx2)
801 if !errors.Is(err, context.DeadlineExceeded) {
802 t.Errorf("r2: AckResult.Get should timeout, got: %v", err)
803 }
804
805
806 s, err = r3.Get(ctx)
807 if err != nil {
808 t.Errorf("r3: AckResult err: got %v, want nil\n", err)
809 }
810 if s != AcknowledgeStatusSuccess {
811 t.Errorf("r3: got %v, want AcknowledgeStatusSuccess", s)
812 }
813 })
814
815 t.Run("RetriableErrorStatusReturnsRequestForRetrying", func(t *testing.T) {
816 for c := range exactlyOnceDeliveryTemporaryRetryErrors {
817 r := ipubsub.NewAckResult()
818 ackReqMap := map[string]*AckResult{
819 "ackID1": r,
820 }
821 st := status.New(c, "")
822 completed, retry := processResults(st, ackReqMap, nil)
823 compareCompletedRetryLengths(t, completed, retry, 0, 1)
824 }
825 })
826 }
827
View as plain text