1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "context"
19 "testing"
20 "time"
21
22 "cloud.google.com/go/internal/testutil"
23 pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
24 "cloud.google.com/go/pubsub/pstest"
25 gax "github.com/googleapis/gax-go/v2"
26 "google.golang.org/api/option"
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
30 )
31
32 func TestPullStreamGet(t *testing.T) {
33
34
35
36 t.Parallel()
37 for _, test := range []struct {
38 desc string
39 errors []error
40 wantCode codes.Code
41 }{
42 {
43 desc: "nil error",
44 errors: []error{nil},
45 wantCode: codes.OK,
46 },
47 {
48 desc: "non-retryable error",
49 errors: []error{status.Errorf(codes.InvalidArgument, "")},
50 wantCode: codes.InvalidArgument,
51 },
52 {
53 desc: "retryable errors",
54 errors: []error{
55 status.Errorf(codes.Unavailable, "first"),
56 status.Errorf(codes.Unavailable, "second"),
57 nil,
58 },
59 wantCode: codes.OK,
60 },
61 } {
62 streamingPull := func(context.Context, ...gax.CallOption) (pb.Subscriber_StreamingPullClient, error) {
63 if len(test.errors) == 0 {
64 panic("out of errors")
65 }
66 err := test.errors[0]
67 test.errors = test.errors[1:]
68 return &testStreamingPullClient{sendError: err}, nil
69 }
70 ps := newPullStream(context.Background(), streamingPull, "", 100, 1000, 0)
71 _, err := ps.get(nil)
72 if got := status.Code(err); got != test.wantCode {
73 t.Errorf("%s: got %s, want %s", test.desc, got, test.wantCode)
74 }
75 }
76 }
77
78 func TestPullStreamGet_ResourceUnavailable(t *testing.T) {
79 ctx := context.Background()
80
81 srv, err := testutil.NewServer()
82 if err != nil {
83 t.Fatal(err)
84 }
85 defer srv.Close()
86
87 ps := pstest.NewServer()
88 defer ps.Close()
89
90 s := ExhaustedServer{&ps.GServer}
91 pb.RegisterPublisherServer(srv.Gsrv, &s)
92 pb.RegisterSubscriberServer(srv.Gsrv, &s)
93 srv.Start()
94
95 opts := withGRPCHeadersAssertion(t,
96 option.WithEndpoint(srv.Addr),
97 option.WithoutAuthentication(),
98 option.WithGRPCDialOption(grpc.WithInsecure()))
99 client, err := NewClient(ctx, "P", opts...)
100 if err != nil {
101 t.Fatal(err)
102 }
103 defer client.Close()
104 topic, err := client.CreateTopic(ctx, "foo")
105 if err != nil {
106 t.Fatal(err)
107 }
108 sub, err := client.CreateSubscription(ctx, "foo", SubscriptionConfig{
109 Topic: topic,
110 })
111 if err != nil {
112 t.Fatal(err)
113 }
114
115 errc := make(chan error)
116 go func() {
117 errc <- sub.Receive(ctx, func(context.Context, *Message) {
118 t.Error("should not have received any data")
119 })
120 }()
121
122 select {
123 case <-time.After(5 * time.Second):
124 t.Fatal("Receive should have failed immediately")
125 case err := <-errc:
126 if gerr, ok := status.FromError(err); ok {
127 if gerr.Code() != codes.ResourceExhausted {
128 t.Fatal("expected to receive a grpc ResourceExhausted error")
129 }
130 } else {
131 t.Fatalf("expected to receive a grpc ResourceExhausted error: %v", err)
132 }
133 }
134 }
135
136 type ExhaustedServer struct {
137 *pstest.GServer
138 }
139
140 func (*ExhaustedServer) StreamingPull(_ pb.Subscriber_StreamingPullServer) error {
141 return status.Errorf(codes.ResourceExhausted, "This server is exhausted!")
142 }
143
144 type testStreamingPullClient struct {
145 pb.Subscriber_StreamingPullClient
146 sendError error
147 }
148
149 func (c *testStreamingPullClient) Send(*pb.StreamingPullRequest) error { return c.sendError }
150
View as plain text