...

Source file src/cloud.google.com/go/pubsub/iterator_test.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2017 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"errors"
    21  	"fmt"
    22  	"reflect"
    23  	"sync"
    24  	"sync/atomic"
    25  	"testing"
    26  	"time"
    27  
    28  	ipubsub "cloud.google.com/go/internal/pubsub"
    29  	"cloud.google.com/go/internal/testutil"
    30  	pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
    31  	"cloud.google.com/go/pubsub/pstest"
    32  	"google.golang.org/api/option"
    33  	"google.golang.org/grpc"
    34  	"google.golang.org/grpc/codes"
    35  	"google.golang.org/grpc/credentials/insecure"
    36  	"google.golang.org/grpc/status"
    37  )
    38  
    39  var (
    40  	projName                = "P"
    41  	topicName               = "some-topic"
    42  	subName                 = "some-sub"
    43  	fullyQualifiedTopicName = fmt.Sprintf("projects/%s/topics/%s", projName, topicName)
    44  	fullyQualifiedSubName   = fmt.Sprintf("projects/%s/subscriptions/%s", projName, subName)
    45  )
    46  
    47  func TestSplitRequestIDs(t *testing.T) {
    48  	t.Parallel()
    49  	ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"}
    50  	for _, test := range []struct {
    51  		ids        []string
    52  		splitIndex int
    53  	}{
    54  		{[]string{}, 0}, // empty slice, no split
    55  		{ids, 2},        // slice of size 5, split at index 2
    56  		{ids[:2], 2},    // slice of size 3, split at index 2
    57  		{ids[:1], 1},    // slice of size 1, split at index 1
    58  	} {
    59  		got1, got2 := splitRequestIDs(test.ids, 2)
    60  		want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:]
    61  		if !testutil.Equal(len(got1), len(want1)) {
    62  			t.Errorf("%v, 1: got %v, want %v", test, got1, want1)
    63  		}
    64  		if !testutil.Equal(len(got2), len(want2)) {
    65  			t.Errorf("%v, 2: got %v, want %v", test, got2, want2)
    66  		}
    67  	}
    68  }
    69  
    70  func TestCalcFieldSize(t *testing.T) {
    71  	t.Parallel()
    72  	// Create a mock ack request to test.
    73  	req := &pb.AcknowledgeRequest{
    74  		Subscription: "sub",
    75  		AckIds:       []string{"aaa", "bbb", "ccc", "ddd", "eee"},
    76  	}
    77  	size := calcFieldSizeString(req.Subscription) + calcFieldSizeString(req.AckIds...)
    78  
    79  	// Proto encoding is calculated from 1 tag byte and 1 size byte for each string.
    80  	want := (1 + 1) + len(req.Subscription) + // subscription field: 1 tag byte + 1 size byte
    81  		5*(1+1+3) // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)]
    82  	if size != want {
    83  		t.Errorf("pubsub: calculated ack req size of %d bytes, want %d", size, want)
    84  	}
    85  
    86  	req.Subscription = string(bytes.Repeat([]byte{'A'}, 300))
    87  	size = calcFieldSizeString(req.Subscription) + calcFieldSizeString(req.AckIds...)
    88  
    89  	// With a longer subscription name, we use an extra size byte.
    90  	want = (1 + 2) + len(req.Subscription) + // subscription field: 1 tag byte + 2 size bytes
    91  		5*(1+1+3) // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)]
    92  	if size != want {
    93  		t.Errorf("pubsub: calculated ack req size of %d bytes, want %d", size, want)
    94  	}
    95  
    96  	// Create a mock modack request to test.
    97  	modAckReq := &pb.ModifyAckDeadlineRequest{
    98  		Subscription:       "sub",
    99  		AckIds:             []string{"aaa", "bbb", "ccc", "ddd", "eee"},
   100  		AckDeadlineSeconds: 300,
   101  	}
   102  
   103  	size = calcFieldSizeString(modAckReq.Subscription) +
   104  		calcFieldSizeString(modAckReq.AckIds...) +
   105  		calcFieldSizeInt(int(modAckReq.AckDeadlineSeconds))
   106  
   107  	want = (1 + 1) + len(modAckReq.Subscription) + // subscription field: 1 tag byte + 1 size byte
   108  		5*(1+1+3) + // ackID size: 5 * [1 (tag byte) + 1 (size byte) + 3 (length of ackID)]
   109  		(1 + 2) // ackDeadline: 1 tag byte + 2 size bytes
   110  	if size != want {
   111  		t.Errorf("pubsub: calculated modAck req size of %d bytes, want %d", size, want)
   112  	}
   113  }
   114  
   115  func TestMaxExtensionPeriod(t *testing.T) {
   116  	srv := pstest.NewServer()
   117  	ctx, cancel := context.WithCancel(context.Background())
   118  	defer cancel()
   119  
   120  	srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
   121  
   122  	_, client, err := initConn(ctx, srv.Addr)
   123  	if err != nil {
   124  		t.Fatal(err)
   125  	}
   126  	want := 15 * time.Second
   127  	iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{
   128  		maxExtensionPeriod: want,
   129  	})
   130  
   131  	// Add a datapoint that's greater than maxExtensionPeriod.
   132  	receiveTime := time.Now().Add(time.Duration(-20) * time.Second)
   133  	iter.ackTimeDist.Record(int(time.Since(receiveTime) / time.Second))
   134  
   135  	if got := iter.ackDeadline(); got != want {
   136  		t.Fatalf("deadline got = %v, want %v", got, want)
   137  	}
   138  }
   139  
   140  func TestAckDistribution(t *testing.T) {
   141  	if testing.Short() {
   142  		t.SkipNow()
   143  	}
   144  	t.Skip("broken")
   145  
   146  	ctx, cancel := context.WithCancel(context.Background())
   147  	defer cancel()
   148  
   149  	minDurationPerLeaseExtension = 1 * time.Second
   150  	pstest.SetMinAckDeadline(minDurationPerLeaseExtension)
   151  	srv := pstest.NewServer()
   152  	defer srv.Close()
   153  	defer pstest.ResetMinAckDeadline()
   154  
   155  	// Create the topic via a Publish. It's convenient to do it here as opposed to client.CreateTopic because the client
   156  	// has not been established yet, and also because we want to create the topic once whereas the client is established
   157  	// below twice.
   158  	srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
   159  
   160  	queuedMsgs := make(chan int32, 1024)
   161  	go continuouslySend(ctx, srv, queuedMsgs)
   162  
   163  	for _, testcase := range []struct {
   164  		initialProcessSecs int32
   165  		finalProcessSecs   int32
   166  	}{
   167  		{initialProcessSecs: 3, finalProcessSecs: 5}, // Process time goes up
   168  		{initialProcessSecs: 5, finalProcessSecs: 3}, // Process time goes down
   169  	} {
   170  		t.Logf("Testing %d -> %d", testcase.initialProcessSecs, testcase.finalProcessSecs)
   171  
   172  		// processTimeSecs is used by the sender to coordinate with the receiver how long the receiver should
   173  		// pretend to process for. e.g. if we test 3s -> 5s, processTimeSecs will start at 3, causing receiver
   174  		// to process messages received for 3s while sender sends the first batch. Then, as sender begins to
   175  		// send the next batch, sender will swap processTimeSeconds to 5s and begin sending, and receiver will
   176  		// process each message for 5s. In this way we simulate a client whose time-to-ack (process time) changes.
   177  		processTimeSecs := testcase.initialProcessSecs
   178  
   179  		s, client, err := initConn(ctx, srv.Addr)
   180  		if err != nil {
   181  			t.Fatal(err)
   182  		}
   183  
   184  		// recvdWg increments for each message sent, and decrements for each message received.
   185  		recvdWg := &sync.WaitGroup{}
   186  
   187  		go startReceiving(ctx, t, s, recvdWg, &processTimeSecs)
   188  		startSending(t, queuedMsgs, &processTimeSecs, testcase.initialProcessSecs, testcase.finalProcessSecs, recvdWg)
   189  
   190  		recvdWg.Wait()
   191  		time.Sleep(100 * time.Millisecond) // Wait a bit more for resources to clean up
   192  		err = client.Close()
   193  		if err != nil {
   194  			t.Fatal(err)
   195  		}
   196  
   197  		modacks := modacksByTime(srv.Messages())
   198  		u := modackDeadlines(modacks)
   199  		initialDL := int32(minDurationPerLeaseExtension / time.Second)
   200  		if !setsAreEqual(u, []int32{initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs}) {
   201  			t.Fatalf("Expected modack deadlines to contain (exactly, and only) %ds, %ds, %ds. Instead, got %v",
   202  				initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs, toSet(u))
   203  		}
   204  	}
   205  }
   206  
   207  // modacksByTime buckets modacks by time.
   208  func modacksByTime(msgs []*pstest.Message) map[time.Time][]pstest.Modack {
   209  	modacks := map[time.Time][]pstest.Modack{}
   210  
   211  	for _, msg := range msgs {
   212  		for _, m := range msg.Modacks {
   213  			modacks[m.ReceivedAt] = append(modacks[m.ReceivedAt], m)
   214  		}
   215  	}
   216  	return modacks
   217  }
   218  
   219  // setsAreEqual reports whether a and b contain the same values, ignoring duplicates.
   220  func setsAreEqual(haystack, needles []int32) bool {
   221  	hMap := map[int32]bool{}
   222  	nMap := map[int32]bool{}
   223  
   224  	for _, n := range needles {
   225  		nMap[n] = true
   226  	}
   227  
   228  	for _, n := range haystack {
   229  		hMap[n] = true
   230  	}
   231  
   232  	return reflect.DeepEqual(nMap, hMap)
   233  }
   234  
   235  // startReceiving pretends to be a client. It calls s.Receive and acks messages after some random delay. It also
   236  // looks out for dupes - any message that arrives twice will cause a failure.
   237  func startReceiving(ctx context.Context, t *testing.T, s *Subscription, recvdWg *sync.WaitGroup, processTimeSecs *int32) {
   238  	t.Log("Receiving..")
   239  
   240  	var recvdMu sync.Mutex
   241  	recvd := map[string]bool{}
   242  
   243  	err := s.Receive(ctx, func(ctx context.Context, msg *Message) {
   244  		msgData := string(msg.Data)
   245  		recvdMu.Lock()
   246  		_, ok := recvd[msgData]
   247  		if ok {
   248  			recvdMu.Unlock()
   249  			t.Logf("already saw \"%s\"\n", msgData)
   250  			return
   251  		}
   252  		recvd[msgData] = true
   253  		recvdMu.Unlock()
   254  
   255  		select {
   256  		case <-ctx.Done():
   257  			msg.Nack()
   258  			recvdWg.Done()
   259  		case <-time.After(time.Duration(atomic.LoadInt32(processTimeSecs)) * time.Second):
   260  			msg.Ack()
   261  			recvdWg.Done()
   262  		}
   263  	})
   264  	if err != nil {
   265  		if status.Code(err) != codes.Canceled {
   266  			t.Error(err)
   267  		}
   268  	}
   269  }
   270  
   271  // startSending sends four batches of messages broken up by minDeadline, initialProcessSecs, and finalProcessSecs.
   272  func startSending(t *testing.T, queuedMsgs chan int32, processTimeSecs *int32, initialProcessSecs int32, finalProcessSecs int32, recvdWg *sync.WaitGroup) {
   273  	var msg int32
   274  
   275  	// We must send this block to force the receiver to send its initially-configured modack time. The time that
   276  	// gets sent should be ignorant of the distribution, since there haven't been enough (any, actually) messages
   277  	// to create a distribution yet.
   278  	t.Log("minAckDeadlineSecsSending an initial message")
   279  	recvdWg.Add(1)
   280  	msg++
   281  	queuedMsgs <- msg
   282  	<-time.After(minDurationPerLeaseExtension)
   283  
   284  	t.Logf("Sending some messages to update distribution to %d. This new distribution will be used "+
   285  		"when the next batch of messages go out.", initialProcessSecs)
   286  	for i := 0; i < 10; i++ {
   287  		recvdWg.Add(1)
   288  		msg++
   289  		queuedMsgs <- msg
   290  	}
   291  	atomic.SwapInt32(processTimeSecs, finalProcessSecs)
   292  	<-time.After(time.Duration(initialProcessSecs) * time.Second)
   293  
   294  	t.Logf("Sending many messages to update distribution to %d. This new distribution will be used "+
   295  		"when the next batch of messages go out.", finalProcessSecs)
   296  	for i := 0; i < 100; i++ {
   297  		recvdWg.Add(1)
   298  		msg++
   299  		queuedMsgs <- msg // Send many messages to drastically change distribution
   300  	}
   301  	<-time.After(time.Duration(finalProcessSecs) * time.Second)
   302  
   303  	t.Logf("Last message going out, whose deadline should be %d.", finalProcessSecs)
   304  	recvdWg.Add(1)
   305  	msg++
   306  	queuedMsgs <- msg
   307  }
   308  
   309  // continuouslySend continuously sends messages that exist on the queuedMsgs chan.
   310  func continuouslySend(ctx context.Context, srv *pstest.Server, queuedMsgs chan int32) {
   311  	for {
   312  		select {
   313  		case <-ctx.Done():
   314  			return
   315  		case m := <-queuedMsgs:
   316  			srv.Publish(fullyQualifiedTopicName, []byte(fmt.Sprintf("message %d", m)), nil)
   317  		}
   318  	}
   319  }
   320  
   321  func toSet(arr []int32) []int32 {
   322  	var s []int32
   323  	m := map[int32]bool{}
   324  
   325  	for _, v := range arr {
   326  		_, ok := m[v]
   327  		if !ok {
   328  			s = append(s, v)
   329  			m[v] = true
   330  		}
   331  	}
   332  
   333  	return s
   334  
   335  }
   336  
   337  func initConn(ctx context.Context, addr string) (*Subscription, *Client, error) {
   338  	conn, err := grpc.Dial(addr, grpc.WithInsecure())
   339  	if err != nil {
   340  		return nil, nil, err
   341  	}
   342  	e := testutil.DefaultHeadersEnforcer()
   343  	opts := append(e.CallOptions(), option.WithGRPCConn(conn))
   344  	client, err := NewClient(ctx, projName, opts...)
   345  	if err != nil {
   346  		return nil, nil, err
   347  	}
   348  
   349  	topic := client.Topic(topicName)
   350  	s, err := client.CreateSubscription(ctx, fmt.Sprintf("sub-%d", time.Now().UnixNano()), SubscriptionConfig{Topic: topic})
   351  	if err != nil {
   352  		return nil, nil, err
   353  	}
   354  
   355  	exists, err := s.Exists(ctx)
   356  	if !exists {
   357  		return nil, nil, errors.New("Subscription does not exist")
   358  	}
   359  	if err != nil {
   360  		return nil, nil, err
   361  	}
   362  
   363  	return s, client, nil
   364  }
   365  
   366  // modackDeadlines takes a map of time => Modack, gathers all the Modack.AckDeadlines,
   367  // and returns them as a slice
   368  func modackDeadlines(m map[time.Time][]pstest.Modack) []int32 {
   369  	var u []int32
   370  	for _, vv := range m {
   371  		for _, v := range vv {
   372  			u = append(u, v.AckDeadline)
   373  		}
   374  	}
   375  	return u
   376  }
   377  
   378  func TestIterator_ModifyAckContextDeadline(t *testing.T) {
   379  	// Test that all context deadline exceeded errors in ModAckDeadline
   380  	// are not propagated to the client.
   381  	opts := []pstest.ServerReactorOption{
   382  		pstest.WithErrorInjection("ModifyAckDeadline", codes.Unknown, "context deadline exceeded"),
   383  	}
   384  	srv := pstest.NewServer(opts...)
   385  	ctx, cancel := context.WithCancel(context.Background())
   386  	defer cancel()
   387  
   388  	srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
   389  	s, client, err := initConn(ctx, srv.Addr)
   390  	if err != nil {
   391  		t.Fatal(err)
   392  	}
   393  
   394  	srv.Publish(fullyQualifiedTopicName, []byte("some-message"), nil)
   395  	cctx, cancel := context.WithTimeout(ctx, time.Duration(5*time.Second))
   396  	defer cancel()
   397  	err = s.Receive(cctx, func(ctx context.Context, m *Message) {
   398  		m.Ack()
   399  	})
   400  	if err != nil {
   401  		t.Fatalf("Got error in Receive: %v", err)
   402  	}
   403  
   404  	err = client.Close()
   405  	if err != nil {
   406  		t.Fatal(err)
   407  	}
   408  }
   409  
   410  func TestIterator_SynchronousPullCancel(t *testing.T) {
   411  	srv := pstest.NewServer()
   412  	ctx, cancel := context.WithCancel(context.Background())
   413  	defer cancel()
   414  
   415  	srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
   416  
   417  	_, client, err := initConn(ctx, srv.Addr)
   418  	if err != nil {
   419  		t.Fatal(err)
   420  	}
   421  	iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{})
   422  
   423  	// Cancelling the iterator and pulling should not result in any errors.
   424  	iter.cancel()
   425  
   426  	if _, err := iter.pullMessages(100); err != nil {
   427  		t.Fatalf("Got error in pullMessages: %v", err)
   428  	}
   429  }
   430  
   431  func TestIterator_BoundedDuration(t *testing.T) {
   432  	// Use exported fields for time.Duration fields so they
   433  	// print nicely. Otherwise, they will print as integers.
   434  	//
   435  	// AckDeadline is bounded by min/max ack deadline, which are
   436  	// 10 seconds and 600 seconds respectively. This is
   437  	// true for the real distribution data points as well.
   438  	testCases := []struct {
   439  		desc        string
   440  		AckDeadline time.Duration
   441  		MinDuration time.Duration
   442  		MaxDuration time.Duration
   443  		exactlyOnce bool
   444  		Want        time.Duration
   445  	}{
   446  		{
   447  			desc:        "AckDeadline should be updated to the min duration",
   448  			AckDeadline: time.Duration(10 * time.Second),
   449  			MinDuration: time.Duration(15 * time.Second),
   450  			MaxDuration: time.Duration(10 * time.Minute),
   451  			exactlyOnce: false,
   452  			Want:        time.Duration(15 * time.Second),
   453  		},
   454  		{
   455  			desc:        "AckDeadline should be updated to 1 minute when using exactly once",
   456  			AckDeadline: time.Duration(10 * time.Second),
   457  			MinDuration: 0,
   458  			MaxDuration: time.Duration(10 * time.Minute),
   459  			exactlyOnce: true,
   460  			Want:        time.Duration(1 * time.Minute),
   461  		},
   462  		{
   463  			desc:        "AckDeadline should not be updated here, even though exactly once is enabled",
   464  			AckDeadline: time.Duration(10 * time.Second),
   465  			MinDuration: time.Duration(15 * time.Second),
   466  			MaxDuration: time.Duration(10 * time.Minute),
   467  			exactlyOnce: true,
   468  			Want:        time.Duration(15 * time.Second),
   469  		},
   470  		{
   471  			desc:        "AckDeadline should not be updated here",
   472  			AckDeadline: time.Duration(10 * time.Minute),
   473  			MinDuration: time.Duration(15 * time.Second),
   474  			MaxDuration: time.Duration(10 * time.Minute),
   475  			exactlyOnce: true,
   476  			Want:        time.Duration(10 * time.Minute),
   477  		},
   478  		{
   479  			desc:        "AckDeadline should not be updated when neither durations are set",
   480  			AckDeadline: time.Duration(5 * time.Minute),
   481  			MinDuration: 0,
   482  			MaxDuration: 0,
   483  			exactlyOnce: false,
   484  			Want:        time.Duration(5 * time.Minute),
   485  		},
   486  		{
   487  			desc:        "AckDeadline should should not be updated here since it is within both boundaries",
   488  			AckDeadline: time.Duration(5 * time.Minute),
   489  			MinDuration: time.Duration(1 * time.Minute),
   490  			MaxDuration: time.Duration(7 * time.Minute),
   491  			exactlyOnce: false,
   492  			Want:        time.Duration(5 * time.Minute),
   493  		},
   494  	}
   495  	for _, tc := range testCases {
   496  		t.Run(tc.desc, func(t *testing.T) {
   497  			got := boundedDuration(tc.AckDeadline, tc.MinDuration, tc.MaxDuration, tc.exactlyOnce)
   498  			if got != tc.Want {
   499  				t.Errorf("boundedDuration mismatch:\n%+v\ngot: %v, want: %v", tc, got, tc.Want)
   500  			}
   501  		})
   502  	}
   503  }
   504  
   505  func TestIterator_StreamingPullExactlyOnce(t *testing.T) {
   506  	srv := pstest.NewServer()
   507  	ctx, cancel := context.WithCancel(context.Background())
   508  	defer cancel()
   509  
   510  	srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
   511  
   512  	conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
   513  	if err != nil {
   514  		t.Fatal(err)
   515  	}
   516  	opts := withGRPCHeadersAssertion(t, option.WithGRPCConn(conn))
   517  	client, err := NewClient(ctx, projName, opts...)
   518  	if err != nil {
   519  		t.Fatal(err)
   520  	}
   521  
   522  	topic := client.Topic(topicName)
   523  	sc := SubscriptionConfig{
   524  		Topic:                     topic,
   525  		EnableMessageOrdering:     true,
   526  		EnableExactlyOnceDelivery: true,
   527  	}
   528  	_, err = client.CreateSubscription(ctx, subName, sc)
   529  	if err != nil {
   530  		t.Fatal(err)
   531  	}
   532  
   533  	// Make sure to call publish before constructing the iterator.
   534  	srv.Publish(fullyQualifiedTopicName, []byte("msg"), nil)
   535  
   536  	iter := newMessageIterator(client.subc, fullyQualifiedSubName, &pullOptions{
   537  		synchronous:            false,
   538  		maxOutstandingMessages: 100,
   539  		maxOutstandingBytes:    1e6,
   540  		maxPrefetch:            30,
   541  		maxExtension:           1 * time.Minute,
   542  		maxExtensionPeriod:     10 * time.Second,
   543  	})
   544  
   545  	if _, err := iter.receive(10); err != nil {
   546  		t.Fatalf("Got error in recvMessages: %v", err)
   547  	}
   548  
   549  	if !iter.enableExactlyOnceDelivery {
   550  		t.Fatalf("expected iter.enableExactlyOnce=true")
   551  	}
   552  }
   553  
   554  func TestAddToDistribution(t *testing.T) {
   555  	c, _ := newFake(t)
   556  
   557  	iter := newMessageIterator(c.subc, "some-sub", &pullOptions{})
   558  
   559  	// Start with a datapoint that's too small that should be bounded to 10s.
   560  	receiveTime := time.Now().Add(time.Duration(-1) * time.Second)
   561  	iter.addToDistribution(receiveTime)
   562  	deadline := iter.ackTimeDist.Percentile(.99)
   563  	want := 10
   564  	if deadline != want {
   565  		t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want)
   566  	}
   567  
   568  	// The next datapoint should not be bounded.
   569  	receiveTime = time.Now().Add(time.Duration(-300) * time.Second)
   570  	iter.addToDistribution(receiveTime)
   571  	deadline = iter.ackTimeDist.Percentile(.99)
   572  	want = 300
   573  	if deadline != want {
   574  		t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want)
   575  	}
   576  
   577  	// Lastly, add a datapoint that should be bounded to 600s
   578  	receiveTime = time.Now().Add(time.Duration(-1000) * time.Second)
   579  	iter.addToDistribution(receiveTime)
   580  	deadline = iter.ackTimeDist.Percentile(.99)
   581  	want = 600
   582  	if deadline != want {
   583  		t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want)
   584  	}
   585  }
   586  
   587  func TestPingStreamAckDeadline(t *testing.T) {
   588  	c, srv := newFake(t)
   589  	ctx, cancel := context.WithCancel(context.Background())
   590  	defer cancel()
   591  
   592  	srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
   593  	topic := c.Topic(topicName)
   594  	s, err := c.CreateSubscription(ctx, subName, SubscriptionConfig{Topic: topic})
   595  	if err != nil {
   596  		t.Errorf("failed to create subscription: %v", err)
   597  	}
   598  
   599  	iter := newMessageIterator(c.subc, fullyQualifiedSubName, &pullOptions{})
   600  	defer iter.stop()
   601  
   602  	iter.eoMu.RLock()
   603  	if iter.enableExactlyOnceDelivery {
   604  		t.Error("iter.enableExactlyOnceDelivery should be false")
   605  	}
   606  	iter.eoMu.RUnlock()
   607  
   608  	_, err = s.Update(ctx, SubscriptionConfigToUpdate{
   609  		EnableExactlyOnceDelivery: true,
   610  	})
   611  	if err != nil {
   612  		t.Error(err)
   613  	}
   614  	srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
   615  	// Receive one message via the stream to trigger the update to enableExactlyOnceDelivery
   616  	iter.receive(1)
   617  	iter.eoMu.RLock()
   618  	if !iter.enableExactlyOnceDelivery {
   619  		t.Error("iter.enableExactlyOnceDelivery should be true")
   620  	}
   621  	iter.eoMu.RUnlock()
   622  }
   623  
   624  func compareCompletedRetryLengths(t *testing.T, completed, retry map[string]*AckResult, wantCompleted, wantRetry int) {
   625  	if l := len(completed); l != wantCompleted {
   626  		t.Errorf("completed slice length got %d, want %d", l, wantCompleted)
   627  	}
   628  	if l := len(retry); l != wantRetry {
   629  		t.Errorf("retry slice length got %d, want %d", l, wantRetry)
   630  	}
   631  }
   632  
   633  func TestExactlyOnceProcessRequests(t *testing.T) {
   634  	ctx := context.Background()
   635  
   636  	t.Run("NoResults", func(t *testing.T) {
   637  		// If the ackResMap is nil, then the resulting slices should be empty.
   638  		// nil maps here behave the same as if they were empty maps.
   639  		completed, retry := processResults(nil, nil, nil)
   640  		compareCompletedRetryLengths(t, completed, retry, 0, 0)
   641  	})
   642  
   643  	t.Run("NoErrorsNilAckResult", func(t *testing.T) {
   644  		// No errors so request should be completed even without an AckResult.
   645  		ackReqMap := map[string]*AckResult{
   646  			"ackID": nil,
   647  		}
   648  		completed, retry := processResults(nil, ackReqMap, nil)
   649  		compareCompletedRetryLengths(t, completed, retry, 1, 0)
   650  	})
   651  
   652  	t.Run("NoErrors", func(t *testing.T) {
   653  		// No errors so AckResult should be completed with success.
   654  		r := ipubsub.NewAckResult()
   655  		ackReqMap := map[string]*AckResult{
   656  			"ackID1": r,
   657  		}
   658  		completed, retry := processResults(nil, ackReqMap, nil)
   659  		compareCompletedRetryLengths(t, completed, retry, 1, 0)
   660  
   661  		// We can obtain the AckStatus from AckResult if results are completed.
   662  		s, err := r.Get(ctx)
   663  		if err != nil {
   664  			t.Errorf("AckResult err: got %v, want nil", err)
   665  		}
   666  		if s != AcknowledgeStatusSuccess {
   667  			t.Errorf("got %v, want AcknowledgeStatusSuccess", s)
   668  		}
   669  	})
   670  
   671  	t.Run("PermanentErrorInvalidAckID", func(t *testing.T) {
   672  		r := ipubsub.NewAckResult()
   673  		ackReqMap := map[string]*AckResult{
   674  			"ackID1": r,
   675  		}
   676  		errorsMap := map[string]string{
   677  			"ackID1": permanentInvalidAckErrString,
   678  		}
   679  		completed, retry := processResults(nil, ackReqMap, errorsMap)
   680  		compareCompletedRetryLengths(t, completed, retry, 1, 0)
   681  		s, err := r.Get(ctx)
   682  		if err == nil {
   683  			t.Error("AckResult err: got nil, want err")
   684  		}
   685  		if s != AcknowledgeStatusInvalidAckID {
   686  			t.Errorf("got %v, want AcknowledgeStatusSuccess", s)
   687  		}
   688  	})
   689  
   690  	t.Run("TransientErrorRetry", func(t *testing.T) {
   691  		r := ipubsub.NewAckResult()
   692  		ackReqMap := map[string]*AckResult{
   693  			"ackID1": r,
   694  		}
   695  		errorsMap := map[string]string{
   696  			"ackID1": transientErrStringPrefix + "_FAILURE",
   697  		}
   698  		completed, retry := processResults(nil, ackReqMap, errorsMap)
   699  		compareCompletedRetryLengths(t, completed, retry, 0, 1)
   700  	})
   701  
   702  	t.Run("UnknownError", func(t *testing.T) {
   703  		r := ipubsub.NewAckResult()
   704  		ackReqMap := map[string]*AckResult{
   705  			"ackID1": r,
   706  		}
   707  		errorsMap := map[string]string{
   708  			"ackID1": "unknown_error",
   709  		}
   710  		completed, retry := processResults(nil, ackReqMap, errorsMap)
   711  		compareCompletedRetryLengths(t, completed, retry, 1, 0)
   712  
   713  		s, err := r.Get(ctx)
   714  		if s != AcknowledgeStatusOther {
   715  			t.Errorf("got %v, want AcknowledgeStatusOther", s)
   716  		}
   717  		if err == nil || err.Error() != "unknown_error" {
   718  			t.Errorf("AckResult err: got %s, want unknown_error", err.Error())
   719  		}
   720  	})
   721  
   722  	t.Run("PermissionDenied", func(t *testing.T) {
   723  		r := ipubsub.NewAckResult()
   724  		ackReqMap := map[string]*AckResult{
   725  			"ackID1": r,
   726  		}
   727  		st := status.New(codes.PermissionDenied, "permission denied")
   728  		completed, retry := processResults(st, ackReqMap, nil)
   729  		compareCompletedRetryLengths(t, completed, retry, 1, 0)
   730  		s, err := r.Get(ctx)
   731  		if err == nil {
   732  			t.Error("AckResult err: got nil, want err")
   733  		}
   734  		if s != AcknowledgeStatusPermissionDenied {
   735  			t.Errorf("got %v, want AcknowledgeStatusPermissionDenied", s)
   736  		}
   737  	})
   738  
   739  	t.Run("FailedPrecondition", func(t *testing.T) {
   740  		r := ipubsub.NewAckResult()
   741  		ackReqMap := map[string]*AckResult{
   742  			"ackID1": r,
   743  		}
   744  		st := status.New(codes.FailedPrecondition, "failed_precondition")
   745  		completed, retry := processResults(st, ackReqMap, nil)
   746  		compareCompletedRetryLengths(t, completed, retry, 1, 0)
   747  		s, err := r.Get(ctx)
   748  		if err == nil {
   749  			t.Error("AckResult err: got nil, want err")
   750  		}
   751  		if s != AcknowledgeStatusFailedPrecondition {
   752  			t.Errorf("got %v, want AcknowledgeStatusFailedPrecondition", s)
   753  		}
   754  	})
   755  
   756  	t.Run("OtherErrorStatus", func(t *testing.T) {
   757  		r := ipubsub.NewAckResult()
   758  		ackReqMap := map[string]*AckResult{
   759  			"ackID1": r,
   760  		}
   761  		st := status.New(codes.OutOfRange, "out of range")
   762  		completed, retry := processResults(st, ackReqMap, nil)
   763  		compareCompletedRetryLengths(t, completed, retry, 1, 0)
   764  		s, err := r.Get(ctx)
   765  		if err == nil {
   766  			t.Error("AckResult err: got nil, want err")
   767  		}
   768  		if s != AcknowledgeStatusOther {
   769  			t.Errorf("got %v, want AcknowledgeStatusOther", s)
   770  		}
   771  	})
   772  
   773  	t.Run("MixedSuccessFailureAcks", func(t *testing.T) {
   774  		r1 := ipubsub.NewAckResult()
   775  		r2 := ipubsub.NewAckResult()
   776  		r3 := ipubsub.NewAckResult()
   777  		ackReqMap := map[string]*AckResult{
   778  			"ackID1": r1,
   779  			"ackID2": r2,
   780  			"ackID3": r3,
   781  		}
   782  		errorsMap := map[string]string{
   783  			"ackID1": permanentInvalidAckErrString,
   784  			"ackID2": transientErrStringPrefix + "_FAILURE",
   785  		}
   786  		completed, retry := processResults(nil, ackReqMap, errorsMap)
   787  		compareCompletedRetryLengths(t, completed, retry, 2, 1)
   788  		// message with ackID "ackID1" fails
   789  		s, err := r1.Get(ctx)
   790  		if err == nil {
   791  			t.Error("r1: AckResult err: got nil, want err")
   792  		}
   793  		if s != AcknowledgeStatusInvalidAckID {
   794  			t.Errorf("r1: got %v, want AcknowledgeInvalidAckID", s)
   795  		}
   796  
   797  		// message with ackID "ackID2" is to be retried
   798  		ctx2, cancel := context.WithTimeout(ctx, 2*time.Second)
   799  		defer cancel()
   800  		_, err = r2.Get(ctx2)
   801  		if !errors.Is(err, context.DeadlineExceeded) {
   802  			t.Errorf("r2: AckResult.Get should timeout, got: %v", err)
   803  		}
   804  
   805  		// message with ackID "ackID3" succeeds
   806  		s, err = r3.Get(ctx)
   807  		if err != nil {
   808  			t.Errorf("r3: AckResult err: got %v, want nil\n", err)
   809  		}
   810  		if s != AcknowledgeStatusSuccess {
   811  			t.Errorf("r3: got %v, want AcknowledgeStatusSuccess", s)
   812  		}
   813  	})
   814  
   815  	t.Run("RetriableErrorStatusReturnsRequestForRetrying", func(t *testing.T) {
   816  		for c := range exactlyOnceDeliveryTemporaryRetryErrors {
   817  			r := ipubsub.NewAckResult()
   818  			ackReqMap := map[string]*AckResult{
   819  				"ackID1": r,
   820  			}
   821  			st := status.New(c, "")
   822  			completed, retry := processResults(st, ackReqMap, nil)
   823  			compareCompletedRetryLengths(t, completed, retry, 0, 1)
   824  		}
   825  	})
   826  }
   827  

View as plain text