...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "context"
19 "log"
20 "sync/atomic"
21 "testing"
22 "time"
23
24 "cloud.google.com/go/pubsub/pstest"
25 "google.golang.org/api/option"
26 "google.golang.org/grpc"
27 )
28
29
30
31 func TestStreamTimeout(t *testing.T) {
32 t.Parallel()
33 log.SetFlags(log.Lmicroseconds)
34 ctx := context.Background()
35 srv := pstest.NewServer()
36 defer srv.Close()
37
38 srv.SetStreamTimeout(2 * time.Second)
39 conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
40 if err != nil {
41 t.Fatal(err)
42 }
43 defer conn.Close()
44
45 opts := withGRPCHeadersAssertion(t, option.WithGRPCConn(conn))
46 client, err := NewClient(ctx, "P", opts...)
47 if err != nil {
48 t.Fatal(err)
49 }
50 defer client.Close()
51
52 topic, err := client.CreateTopic(ctx, "T")
53 if err != nil {
54 t.Fatal(err)
55 }
56 sub, err := client.CreateSubscription(ctx, "sub", SubscriptionConfig{Topic: topic, AckDeadline: 10 * time.Second})
57 if err != nil {
58 t.Fatal(err)
59 }
60 const nPublish = 8
61 rctx, cancel := context.WithTimeout(ctx, 30*time.Second)
62 defer cancel()
63 errc := make(chan error)
64 var nSeen int64
65 go func() {
66 errc <- sub.Receive(rctx, func(ctx context.Context, m *Message) {
67 m.Ack()
68 n := atomic.AddInt64(&nSeen, 1)
69 if n >= nPublish {
70 cancel()
71 }
72 })
73 }()
74
75 for i := 0; i < nPublish; i++ {
76 pr := topic.Publish(ctx, &Message{Data: []byte("msg")})
77 _, err := pr.Get(ctx)
78 if err != nil {
79 t.Fatal(err)
80 }
81 time.Sleep(250 * time.Millisecond)
82 }
83
84 if err := <-errc; err != nil {
85 t.Fatal(err)
86 }
87 if err := sub.Delete(ctx); err != nil {
88 t.Fatal(err)
89 }
90 n := atomic.LoadInt64(&nSeen)
91 if n < nPublish {
92 t.Errorf("got %d messages, want %d", n, nPublish)
93 }
94 }
95
View as plain text