...

Source file src/cloud.google.com/go/pubsub/timeout_test.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2018 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"context"
    19  	"log"
    20  	"sync/atomic"
    21  	"testing"
    22  	"time"
    23  
    24  	"cloud.google.com/go/pubsub/pstest"
    25  	"google.golang.org/api/option"
    26  	"google.golang.org/grpc"
    27  )
    28  
    29  // Using the fake PubSub server in the pstest package, verify that streaming
    30  // pull resumes if the server stream times out.
    31  func TestStreamTimeout(t *testing.T) {
    32  	t.Parallel()
    33  	log.SetFlags(log.Lmicroseconds)
    34  	ctx := context.Background()
    35  	srv := pstest.NewServer()
    36  	defer srv.Close()
    37  
    38  	srv.SetStreamTimeout(2 * time.Second)
    39  	conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
    40  	if err != nil {
    41  		t.Fatal(err)
    42  	}
    43  	defer conn.Close()
    44  
    45  	opts := withGRPCHeadersAssertion(t, option.WithGRPCConn(conn))
    46  	client, err := NewClient(ctx, "P", opts...)
    47  	if err != nil {
    48  		t.Fatal(err)
    49  	}
    50  	defer client.Close()
    51  
    52  	topic, err := client.CreateTopic(ctx, "T")
    53  	if err != nil {
    54  		t.Fatal(err)
    55  	}
    56  	sub, err := client.CreateSubscription(ctx, "sub", SubscriptionConfig{Topic: topic, AckDeadline: 10 * time.Second})
    57  	if err != nil {
    58  		t.Fatal(err)
    59  	}
    60  	const nPublish = 8
    61  	rctx, cancel := context.WithTimeout(ctx, 30*time.Second)
    62  	defer cancel()
    63  	errc := make(chan error)
    64  	var nSeen int64
    65  	go func() {
    66  		errc <- sub.Receive(rctx, func(ctx context.Context, m *Message) {
    67  			m.Ack()
    68  			n := atomic.AddInt64(&nSeen, 1)
    69  			if n >= nPublish {
    70  				cancel()
    71  			}
    72  		})
    73  	}()
    74  
    75  	for i := 0; i < nPublish; i++ {
    76  		pr := topic.Publish(ctx, &Message{Data: []byte("msg")})
    77  		_, err := pr.Get(ctx)
    78  		if err != nil {
    79  			t.Fatal(err)
    80  		}
    81  		time.Sleep(250 * time.Millisecond)
    82  	}
    83  
    84  	if err := <-errc; err != nil {
    85  		t.Fatal(err)
    86  	}
    87  	if err := sub.Delete(ctx); err != nil {
    88  		t.Fatal(err)
    89  	}
    90  	n := atomic.LoadInt64(&nSeen)
    91  	if n < nPublish {
    92  		t.Errorf("got %d messages, want %d", n, nPublish)
    93  	}
    94  }
    95  

View as plain text