// Copyright 2017 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pubsub import ( "bytes" "context" "errors" "fmt" "reflect" "sync" "sync/atomic" "testing" "time" ipubsub "cloud.google.com/go/internal/pubsub" "cloud.google.com/go/internal/testutil" pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" "cloud.google.com/go/pubsub/pstest" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" ) var ( projName = "P" topicName = "some-topic" subName = "some-sub" fullyQualifiedTopicName = fmt.Sprintf("projects/%s/topics/%s", projName, topicName) fullyQualifiedSubName = fmt.Sprintf("projects/%s/subscriptions/%s", projName, subName) ) func TestSplitRequestIDs(t *testing.T) { t.Parallel() ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"} for _, test := range []struct { ids []string splitIndex int }{ {[]string{}, 0}, // empty slice, no split {ids, 2}, // slice of size 5, split at index 2 {ids[:2], 2}, // slice of size 3, split at index 2 {ids[:1], 1}, // slice of size 1, split at index 1 } { got1, got2 := splitRequestIDs(test.ids, 2) want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:] if !testutil.Equal(len(got1), len(want1)) { t.Errorf("%v, 1: got %v, want %v", test, got1, want1) } if !testutil.Equal(len(got2), len(want2)) { t.Errorf("%v, 2: got %v, want %v", test, got2, want2) } } } func TestCalcFieldSize(t *testing.T) { t.Parallel() // Create a mock ack request to test. req := &pb.AcknowledgeRequest{ Subscription: "sub", AckIds: []string{"aaa", "bbb", "ccc", "ddd", "eee"}, } size := calcFieldSizeString(req.Subscription) + calcFieldSizeString(req.AckIds...) // Proto encoding is calculated from 1 tag byte and 1 size byte for each string. want := (1 + 1) + len(req.Subscription) + // subscription field: 1 tag byte + 1 size byte 5*(1+1+3) // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)] if size != want { t.Errorf("pubsub: calculated ack req size of %d bytes, want %d", size, want) } req.Subscription = string(bytes.Repeat([]byte{'A'}, 300)) size = calcFieldSizeString(req.Subscription) + calcFieldSizeString(req.AckIds...) // With a longer subscription name, we use an extra size byte. want = (1 + 2) + len(req.Subscription) + // subscription field: 1 tag byte + 2 size bytes 5*(1+1+3) // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)] if size != want { t.Errorf("pubsub: calculated ack req size of %d bytes, want %d", size, want) } // Create a mock modack request to test. modAckReq := &pb.ModifyAckDeadlineRequest{ Subscription: "sub", AckIds: []string{"aaa", "bbb", "ccc", "ddd", "eee"}, AckDeadlineSeconds: 300, } size = calcFieldSizeString(modAckReq.Subscription) + calcFieldSizeString(modAckReq.AckIds...) + calcFieldSizeInt(int(modAckReq.AckDeadlineSeconds)) want = (1 + 1) + len(modAckReq.Subscription) + // subscription field: 1 tag byte + 1 size byte 5*(1+1+3) + // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)] (1 + 2) // ackDeadline: 1 tag byte + 2 size bytes if size != want { t.Errorf("pubsub: calculated modAck req size of %d bytes, want %d", size, want) } } func TestMaxExtensionPeriod(t *testing.T) { srv := pstest.NewServer() ctx, cancel := context.WithCancel(context.Background()) defer cancel() srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) _, client, err := initConn(ctx, srv.Addr) if err != nil { t.Fatal(err) } want := 15 * time.Second iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{ maxExtensionPeriod: want, }) // Add a datapoint that's greater than maxExtensionPeriod. receiveTime := time.Now().Add(time.Duration(-20) * time.Second) iter.ackTimeDist.Record(int(time.Since(receiveTime) / time.Second)) if got := iter.ackDeadline(); got != want { t.Fatalf("deadline got = %v, want %v", got, want) } } func TestAckDistribution(t *testing.T) { if testing.Short() { t.SkipNow() } t.Skip("broken") ctx, cancel := context.WithCancel(context.Background()) defer cancel() minDurationPerLeaseExtension = 1 * time.Second pstest.SetMinAckDeadline(minDurationPerLeaseExtension) srv := pstest.NewServer() defer srv.Close() defer pstest.ResetMinAckDeadline() // Create the topic via a Publish. It's convenient to do it here as opposed to client.CreateTopic because the client // has not been established yet, and also because we want to create the topic once whereas the client is established // below twice. srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) queuedMsgs := make(chan int32, 1024) go continuouslySend(ctx, srv, queuedMsgs) for _, testcase := range []struct { initialProcessSecs int32 finalProcessSecs int32 }{ {initialProcessSecs: 3, finalProcessSecs: 5}, // Process time goes up {initialProcessSecs: 5, finalProcessSecs: 3}, // Process time goes down } { t.Logf("Testing %d -> %d", testcase.initialProcessSecs, testcase.finalProcessSecs) // processTimeSecs is used by the sender to coordinate with the receiver how long the receiver should // pretend to process for. e.g. if we test 3s -> 5s, processTimeSecs will start at 3, causing receiver // to process messages received for 3s while sender sends the first batch. Then, as sender begins to // send the next batch, sender will swap processTimeSeconds to 5s and begin sending, and receiver will // process each message for 5s. In this way we simulate a client whose time-to-ack (process time) changes. processTimeSecs := testcase.initialProcessSecs s, client, err := initConn(ctx, srv.Addr) if err != nil { t.Fatal(err) } // recvdWg increments for each message sent, and decrements for each message received. recvdWg := &sync.WaitGroup{} go startReceiving(ctx, t, s, recvdWg, &processTimeSecs) startSending(t, queuedMsgs, &processTimeSecs, testcase.initialProcessSecs, testcase.finalProcessSecs, recvdWg) recvdWg.Wait() time.Sleep(100 * time.Millisecond) // Wait a bit more for resources to clean up err = client.Close() if err != nil { t.Fatal(err) } modacks := modacksByTime(srv.Messages()) u := modackDeadlines(modacks) initialDL := int32(minDurationPerLeaseExtension / time.Second) if !setsAreEqual(u, []int32{initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs}) { t.Fatalf("Expected modack deadlines to contain (exactly, and only) %ds, %ds, %ds. Instead, got %v", initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs, toSet(u)) } } } // modacksByTime buckets modacks by time. func modacksByTime(msgs []*pstest.Message) map[time.Time][]pstest.Modack { modacks := map[time.Time][]pstest.Modack{} for _, msg := range msgs { for _, m := range msg.Modacks { modacks[m.ReceivedAt] = append(modacks[m.ReceivedAt], m) } } return modacks } // setsAreEqual reports whether a and b contain the same values, ignoring duplicates. func setsAreEqual(haystack, needles []int32) bool { hMap := map[int32]bool{} nMap := map[int32]bool{} for _, n := range needles { nMap[n] = true } for _, n := range haystack { hMap[n] = true } return reflect.DeepEqual(nMap, hMap) } // startReceiving pretends to be a client. It calls s.Receive and acks messages after some random delay. It also // looks out for dupes - any message that arrives twice will cause a failure. func startReceiving(ctx context.Context, t *testing.T, s *Subscription, recvdWg *sync.WaitGroup, processTimeSecs *int32) { t.Log("Receiving..") var recvdMu sync.Mutex recvd := map[string]bool{} err := s.Receive(ctx, func(ctx context.Context, msg *Message) { msgData := string(msg.Data) recvdMu.Lock() _, ok := recvd[msgData] if ok { recvdMu.Unlock() t.Logf("already saw \"%s\"\n", msgData) return } recvd[msgData] = true recvdMu.Unlock() select { case <-ctx.Done(): msg.Nack() recvdWg.Done() case <-time.After(time.Duration(atomic.LoadInt32(processTimeSecs)) * time.Second): msg.Ack() recvdWg.Done() } }) if err != nil { if status.Code(err) != codes.Canceled { t.Error(err) } } } // startSending sends four batches of messages broken up by minDeadline, initialProcessSecs, and finalProcessSecs. func startSending(t *testing.T, queuedMsgs chan int32, processTimeSecs *int32, initialProcessSecs int32, finalProcessSecs int32, recvdWg *sync.WaitGroup) { var msg int32 // We must send this block to force the receiver to send its initially-configured modack time. The time that // gets sent should be ignorant of the distribution, since there haven't been enough (any, actually) messages // to create a distribution yet. t.Log("minAckDeadlineSecsSending an initial message") recvdWg.Add(1) msg++ queuedMsgs <- msg <-time.After(minDurationPerLeaseExtension) t.Logf("Sending some messages to update distribution to %d. This new distribution will be used "+ "when the next batch of messages go out.", initialProcessSecs) for i := 0; i < 10; i++ { recvdWg.Add(1) msg++ queuedMsgs <- msg } atomic.SwapInt32(processTimeSecs, finalProcessSecs) <-time.After(time.Duration(initialProcessSecs) * time.Second) t.Logf("Sending many messages to update distribution to %d. This new distribution will be used "+ "when the next batch of messages go out.", finalProcessSecs) for i := 0; i < 100; i++ { recvdWg.Add(1) msg++ queuedMsgs <- msg // Send many messages to drastically change distribution } <-time.After(time.Duration(finalProcessSecs) * time.Second) t.Logf("Last message going out, whose deadline should be %d.", finalProcessSecs) recvdWg.Add(1) msg++ queuedMsgs <- msg } // continuouslySend continuously sends messages that exist on the queuedMsgs chan. func continuouslySend(ctx context.Context, srv *pstest.Server, queuedMsgs chan int32) { for { select { case <-ctx.Done(): return case m := <-queuedMsgs: srv.Publish(fullyQualifiedTopicName, []byte(fmt.Sprintf("message %d", m)), nil) } } } func toSet(arr []int32) []int32 { var s []int32 m := map[int32]bool{} for _, v := range arr { _, ok := m[v] if !ok { s = append(s, v) m[v] = true } } return s } func initConn(ctx context.Context, addr string) (*Subscription, *Client, error) { conn, err := grpc.Dial(addr, grpc.WithInsecure()) if err != nil { return nil, nil, err } e := testutil.DefaultHeadersEnforcer() opts := append(e.CallOptions(), option.WithGRPCConn(conn)) client, err := NewClient(ctx, projName, opts...) if err != nil { return nil, nil, err } topic := client.Topic(topicName) s, err := client.CreateSubscription(ctx, fmt.Sprintf("sub-%d", time.Now().UnixNano()), SubscriptionConfig{Topic: topic}) if err != nil { return nil, nil, err } exists, err := s.Exists(ctx) if !exists { return nil, nil, errors.New("Subscription does not exist") } if err != nil { return nil, nil, err } return s, client, nil } // modackDeadlines takes a map of time => Modack, gathers all the Modack.AckDeadlines, // and returns them as a slice func modackDeadlines(m map[time.Time][]pstest.Modack) []int32 { var u []int32 for _, vv := range m { for _, v := range vv { u = append(u, v.AckDeadline) } } return u } func TestIterator_ModifyAckContextDeadline(t *testing.T) { // Test that all context deadline exceeded errors in ModAckDeadline // are not propagated to the client. opts := []pstest.ServerReactorOption{ pstest.WithErrorInjection("ModifyAckDeadline", codes.Unknown, "context deadline exceeded"), } srv := pstest.NewServer(opts...) ctx, cancel := context.WithCancel(context.Background()) defer cancel() srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) s, client, err := initConn(ctx, srv.Addr) if err != nil { t.Fatal(err) } srv.Publish(fullyQualifiedTopicName, []byte("some-message"), nil) cctx, cancel := context.WithTimeout(ctx, time.Duration(5*time.Second)) defer cancel() err = s.Receive(cctx, func(ctx context.Context, m *Message) { m.Ack() }) if err != nil { t.Fatalf("Got error in Receive: %v", err) } err = client.Close() if err != nil { t.Fatal(err) } } func TestIterator_SynchronousPullCancel(t *testing.T) { srv := pstest.NewServer() ctx, cancel := context.WithCancel(context.Background()) defer cancel() srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) _, client, err := initConn(ctx, srv.Addr) if err != nil { t.Fatal(err) } iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{}) // Cancelling the iterator and pulling should not result in any errors. iter.cancel() if _, err := iter.pullMessages(100); err != nil { t.Fatalf("Got error in pullMessages: %v", err) } } func TestIterator_BoundedDuration(t *testing.T) { // Use exported fields for time.Duration fields so they // print nicely. Otherwise, they will print as integers. // // AckDeadline is bounded by min/max ack deadline, which are // 10 seconds and 600 seconds respectively. This is // true for the real distribution data points as well. testCases := []struct { desc string AckDeadline time.Duration MinDuration time.Duration MaxDuration time.Duration exactlyOnce bool Want time.Duration }{ { desc: "AckDeadline should be updated to the min duration", AckDeadline: time.Duration(10 * time.Second), MinDuration: time.Duration(15 * time.Second), MaxDuration: time.Duration(10 * time.Minute), exactlyOnce: false, Want: time.Duration(15 * time.Second), }, { desc: "AckDeadline should be updated to 1 minute when using exactly once", AckDeadline: time.Duration(10 * time.Second), MinDuration: 0, MaxDuration: time.Duration(10 * time.Minute), exactlyOnce: true, Want: time.Duration(1 * time.Minute), }, { desc: "AckDeadline should not be updated here, even though exactly once is enabled", AckDeadline: time.Duration(10 * time.Second), MinDuration: time.Duration(15 * time.Second), MaxDuration: time.Duration(10 * time.Minute), exactlyOnce: true, Want: time.Duration(15 * time.Second), }, { desc: "AckDeadline should not be updated here", AckDeadline: time.Duration(10 * time.Minute), MinDuration: time.Duration(15 * time.Second), MaxDuration: time.Duration(10 * time.Minute), exactlyOnce: true, Want: time.Duration(10 * time.Minute), }, { desc: "AckDeadline should not be updated when neither durations are set", AckDeadline: time.Duration(5 * time.Minute), MinDuration: 0, MaxDuration: 0, exactlyOnce: false, Want: time.Duration(5 * time.Minute), }, { desc: "AckDeadline should should not be updated here since it is within both boundaries", AckDeadline: time.Duration(5 * time.Minute), MinDuration: time.Duration(1 * time.Minute), MaxDuration: time.Duration(7 * time.Minute), exactlyOnce: false, Want: time.Duration(5 * time.Minute), }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { got := boundedDuration(tc.AckDeadline, tc.MinDuration, tc.MaxDuration, tc.exactlyOnce) if got != tc.Want { t.Errorf("boundedDuration mismatch:\n%+v\ngot: %v, want: %v", tc, got, tc.Want) } }) } } func TestIterator_StreamingPullExactlyOnce(t *testing.T) { srv := pstest.NewServer() ctx, cancel := context.WithCancel(context.Background()) defer cancel() srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatal(err) } opts := withGRPCHeadersAssertion(t, option.WithGRPCConn(conn)) client, err := NewClient(ctx, projName, opts...) if err != nil { t.Fatal(err) } topic := client.Topic(topicName) sc := SubscriptionConfig{ Topic: topic, EnableMessageOrdering: true, EnableExactlyOnceDelivery: true, } _, err = client.CreateSubscription(ctx, subName, sc) if err != nil { t.Fatal(err) } // Make sure to call publish before constructing the iterator. srv.Publish(fullyQualifiedTopicName, []byte("msg"), nil) iter := newMessageIterator(client.subc, fullyQualifiedSubName, &pullOptions{ synchronous: false, maxOutstandingMessages: 100, maxOutstandingBytes: 1e6, maxPrefetch: 30, maxExtension: 1 * time.Minute, maxExtensionPeriod: 10 * time.Second, }) if _, err := iter.receive(10); err != nil { t.Fatalf("Got error in recvMessages: %v", err) } if !iter.enableExactlyOnceDelivery { t.Fatalf("expected iter.enableExactlyOnce=true") } } func TestAddToDistribution(t *testing.T) { c, _ := newFake(t) iter := newMessageIterator(c.subc, "some-sub", &pullOptions{}) // Start with a datapoint that's too small that should be bounded to 10s. receiveTime := time.Now().Add(time.Duration(-1) * time.Second) iter.addToDistribution(receiveTime) deadline := iter.ackTimeDist.Percentile(.99) want := 10 if deadline != want { t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want) } // The next datapoint should not be bounded. receiveTime = time.Now().Add(time.Duration(-300) * time.Second) iter.addToDistribution(receiveTime) deadline = iter.ackTimeDist.Percentile(.99) want = 300 if deadline != want { t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want) } // Lastly, add a datapoint that should be bounded to 600s receiveTime = time.Now().Add(time.Duration(-1000) * time.Second) iter.addToDistribution(receiveTime) deadline = iter.ackTimeDist.Percentile(.99) want = 600 if deadline != want { t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want) } } func TestPingStreamAckDeadline(t *testing.T) { c, srv := newFake(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) topic := c.Topic(topicName) s, err := c.CreateSubscription(ctx, subName, SubscriptionConfig{Topic: topic}) if err != nil { t.Errorf("failed to create subscription: %v", err) } iter := newMessageIterator(c.subc, fullyQualifiedSubName, &pullOptions{}) defer iter.stop() iter.eoMu.RLock() if iter.enableExactlyOnceDelivery { t.Error("iter.enableExactlyOnceDelivery should be false") } iter.eoMu.RUnlock() _, err = s.Update(ctx, SubscriptionConfigToUpdate{ EnableExactlyOnceDelivery: true, }) if err != nil { t.Error(err) } srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) // Receive one message via the stream to trigger the update to enableExactlyOnceDelivery iter.receive(1) iter.eoMu.RLock() if !iter.enableExactlyOnceDelivery { t.Error("iter.enableExactlyOnceDelivery should be true") } iter.eoMu.RUnlock() } func compareCompletedRetryLengths(t *testing.T, completed, retry map[string]*AckResult, wantCompleted, wantRetry int) { if l := len(completed); l != wantCompleted { t.Errorf("completed slice length got %d, want %d", l, wantCompleted) } if l := len(retry); l != wantRetry { t.Errorf("retry slice length got %d, want %d", l, wantRetry) } } func TestExactlyOnceProcessRequests(t *testing.T) { ctx := context.Background() t.Run("NoResults", func(t *testing.T) { // If the ackResMap is nil, then the resulting slices should be empty. // nil maps here behave the same as if they were empty maps. completed, retry := processResults(nil, nil, nil) compareCompletedRetryLengths(t, completed, retry, 0, 0) }) t.Run("NoErrorsNilAckResult", func(t *testing.T) { // No errors so request should be completed even without an AckResult. ackReqMap := map[string]*AckResult{ "ackID": nil, } completed, retry := processResults(nil, ackReqMap, nil) compareCompletedRetryLengths(t, completed, retry, 1, 0) }) t.Run("NoErrors", func(t *testing.T) { // No errors so AckResult should be completed with success. r := ipubsub.NewAckResult() ackReqMap := map[string]*AckResult{ "ackID1": r, } completed, retry := processResults(nil, ackReqMap, nil) compareCompletedRetryLengths(t, completed, retry, 1, 0) // We can obtain the AckStatus from AckResult if results are completed. s, err := r.Get(ctx) if err != nil { t.Errorf("AckResult err: got %v, want nil", err) } if s != AcknowledgeStatusSuccess { t.Errorf("got %v, want AcknowledgeStatusSuccess", s) } }) t.Run("PermanentErrorInvalidAckID", func(t *testing.T) { r := ipubsub.NewAckResult() ackReqMap := map[string]*AckResult{ "ackID1": r, } errorsMap := map[string]string{ "ackID1": permanentInvalidAckErrString, } completed, retry := processResults(nil, ackReqMap, errorsMap) compareCompletedRetryLengths(t, completed, retry, 1, 0) s, err := r.Get(ctx) if err == nil { t.Error("AckResult err: got nil, want err") } if s != AcknowledgeStatusInvalidAckID { t.Errorf("got %v, want AcknowledgeStatusSuccess", s) } }) t.Run("TransientErrorRetry", func(t *testing.T) { r := ipubsub.NewAckResult() ackReqMap := map[string]*AckResult{ "ackID1": r, } errorsMap := map[string]string{ "ackID1": transientErrStringPrefix + "_FAILURE", } completed, retry := processResults(nil, ackReqMap, errorsMap) compareCompletedRetryLengths(t, completed, retry, 0, 1) }) t.Run("UnknownError", func(t *testing.T) { r := ipubsub.NewAckResult() ackReqMap := map[string]*AckResult{ "ackID1": r, } errorsMap := map[string]string{ "ackID1": "unknown_error", } completed, retry := processResults(nil, ackReqMap, errorsMap) compareCompletedRetryLengths(t, completed, retry, 1, 0) s, err := r.Get(ctx) if s != AcknowledgeStatusOther { t.Errorf("got %v, want AcknowledgeStatusOther", s) } if err == nil || err.Error() != "unknown_error" { t.Errorf("AckResult err: got %s, want unknown_error", err.Error()) } }) t.Run("PermissionDenied", func(t *testing.T) { r := ipubsub.NewAckResult() ackReqMap := map[string]*AckResult{ "ackID1": r, } st := status.New(codes.PermissionDenied, "permission denied") completed, retry := processResults(st, ackReqMap, nil) compareCompletedRetryLengths(t, completed, retry, 1, 0) s, err := r.Get(ctx) if err == nil { t.Error("AckResult err: got nil, want err") } if s != AcknowledgeStatusPermissionDenied { t.Errorf("got %v, want AcknowledgeStatusPermissionDenied", s) } }) t.Run("FailedPrecondition", func(t *testing.T) { r := ipubsub.NewAckResult() ackReqMap := map[string]*AckResult{ "ackID1": r, } st := status.New(codes.FailedPrecondition, "failed_precondition") completed, retry := processResults(st, ackReqMap, nil) compareCompletedRetryLengths(t, completed, retry, 1, 0) s, err := r.Get(ctx) if err == nil { t.Error("AckResult err: got nil, want err") } if s != AcknowledgeStatusFailedPrecondition { t.Errorf("got %v, want AcknowledgeStatusFailedPrecondition", s) } }) t.Run("OtherErrorStatus", func(t *testing.T) { r := ipubsub.NewAckResult() ackReqMap := map[string]*AckResult{ "ackID1": r, } st := status.New(codes.OutOfRange, "out of range") completed, retry := processResults(st, ackReqMap, nil) compareCompletedRetryLengths(t, completed, retry, 1, 0) s, err := r.Get(ctx) if err == nil { t.Error("AckResult err: got nil, want err") } if s != AcknowledgeStatusOther { t.Errorf("got %v, want AcknowledgeStatusOther", s) } }) t.Run("MixedSuccessFailureAcks", func(t *testing.T) { r1 := ipubsub.NewAckResult() r2 := ipubsub.NewAckResult() r3 := ipubsub.NewAckResult() ackReqMap := map[string]*AckResult{ "ackID1": r1, "ackID2": r2, "ackID3": r3, } errorsMap := map[string]string{ "ackID1": permanentInvalidAckErrString, "ackID2": transientErrStringPrefix + "_FAILURE", } completed, retry := processResults(nil, ackReqMap, errorsMap) compareCompletedRetryLengths(t, completed, retry, 2, 1) // message with ackID "ackID1" fails s, err := r1.Get(ctx) if err == nil { t.Error("r1: AckResult err: got nil, want err") } if s != AcknowledgeStatusInvalidAckID { t.Errorf("r1: got %v, want AcknowledgeInvalidAckID", s) } // message with ackID "ackID2" is to be retried ctx2, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() _, err = r2.Get(ctx2) if !errors.Is(err, context.DeadlineExceeded) { t.Errorf("r2: AckResult.Get should timeout, got: %v", err) } // message with ackID "ackID3" succeeds s, err = r3.Get(ctx) if err != nil { t.Errorf("r3: AckResult err: got %v, want nil\n", err) } if s != AcknowledgeStatusSuccess { t.Errorf("r3: got %v, want AcknowledgeStatusSuccess", s) } }) t.Run("RetriableErrorStatusReturnsRequestForRetrying", func(t *testing.T) { for c := range exactlyOnceDeliveryTemporaryRetryErrors { r := ipubsub.NewAckResult() ackReqMap := map[string]*AckResult{ "ackID1": r, } st := status.New(c, "") completed, retry := processResults(st, ackReqMap, nil) compareCompletedRetryLengths(t, completed, retry, 0, 1) } }) }