1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "context"
19 "fmt"
20 "io"
21 "net/url"
22 "sync"
23 "time"
24
25 pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
26 gax "github.com/googleapis/gax-go/v2"
27 "google.golang.org/grpc"
28 )
29
30
31
32 type pullStream struct {
33 ctx context.Context
34 open func() (pb.Subscriber_StreamingPullClient, error)
35 cancel context.CancelFunc
36
37 mu sync.Mutex
38 spc *pb.Subscriber_StreamingPullClient
39 err error
40 }
41
42
43 type streamingPullFunc func(context.Context, ...gax.CallOption) (pb.Subscriber_StreamingPullClient, error)
44
45 func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream {
46 ctx = withSubscriptionKey(ctx, subName)
47 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(subName))}
48 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
49 ctx, cancel := context.WithCancel(ctx)
50 return &pullStream{
51 ctx: ctx,
52 cancel: cancel,
53 open: func() (pb.Subscriber_StreamingPullClient, error) {
54 spc, err := streamingPull(ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes)))
55 if err == nil {
56 recordStat(ctx, StreamRequestCount, 1)
57 streamAckDeadline := int32(maxDurationPerLeaseExtension / time.Second)
58
59
60 if streamAckDeadline <= 0 {
61 streamAckDeadline = 60
62 }
63 err = spc.Send(&pb.StreamingPullRequest{
64 Subscription: subName,
65 StreamAckDeadlineSeconds: streamAckDeadline,
66 MaxOutstandingMessages: int64(maxOutstandingMessages),
67 MaxOutstandingBytes: int64(maxOutstandingBytes),
68 })
69 }
70 if err != nil {
71 return nil, err
72 }
73 return spc, nil
74 },
75 }
76 }
77
78
79
80
81
82
83 func (s *pullStream) get(spc *pb.Subscriber_StreamingPullClient) (*pb.Subscriber_StreamingPullClient, error) {
84 s.mu.Lock()
85 defer s.mu.Unlock()
86
87 if s.err != nil {
88 return nil, s.err
89 }
90
91 s.err = s.ctx.Err()
92 if s.err != nil {
93 return nil, s.err
94 }
95
96
97
98
99 if spc != s.spc {
100 return s.spc, nil
101 }
102
103
104
105
106 s.spc = new(pb.Subscriber_StreamingPullClient)
107 *s.spc, s.err = s.openWithRetry()
108 return s.spc, s.err
109 }
110
111 func (s *pullStream) openWithRetry() (pb.Subscriber_StreamingPullClient, error) {
112 r := defaultRetryer{}
113 for {
114 recordStat(s.ctx, StreamOpenCount, 1)
115 spc, err := s.open()
116 bo, shouldRetry := r.Retry(err)
117 if err != nil && shouldRetry {
118 recordStat(s.ctx, StreamRetryCount, 1)
119 if err := gax.Sleep(s.ctx, bo); err != nil {
120 return nil, err
121 }
122 continue
123 }
124 return spc, err
125 }
126 }
127
128 func (s *pullStream) call(f func(pb.Subscriber_StreamingPullClient) error, opts ...gax.CallOption) error {
129 var settings gax.CallSettings
130 for _, opt := range opts {
131 opt.Resolve(&settings)
132 }
133 var r gax.Retryer = &defaultRetryer{}
134 if settings.Retry != nil {
135 r = settings.Retry()
136 }
137
138 var (
139 spc *pb.Subscriber_StreamingPullClient
140 err error
141 )
142 for {
143 spc, err = s.get(spc)
144 if err != nil {
145 return err
146 }
147 start := time.Now()
148 err = f(*spc)
149 if err != nil {
150 bo, shouldRetry := r.Retry(err)
151 if shouldRetry {
152 recordStat(s.ctx, StreamRetryCount, 1)
153 if time.Since(start) < 30*time.Second {
154 if err := gax.Sleep(s.ctx, bo); err != nil {
155 return err
156 }
157 }
158 continue
159 }
160 s.mu.Lock()
161 s.err = err
162 s.mu.Unlock()
163 }
164 return err
165 }
166 }
167
168 func (s *pullStream) Send(req *pb.StreamingPullRequest) error {
169 return s.call(func(spc pb.Subscriber_StreamingPullClient) error {
170 recordStat(s.ctx, AckCount, int64(len(req.AckIds)))
171 zeroes := 0
172 for _, mds := range req.ModifyDeadlineSeconds {
173 if mds == 0 {
174 zeroes++
175 }
176 }
177 recordStat(s.ctx, NackCount, int64(zeroes))
178 recordStat(s.ctx, ModAckCount, int64(len(req.ModifyDeadlineSeconds)-zeroes))
179 recordStat(s.ctx, StreamRequestCount, 1)
180 return spc.Send(req)
181 })
182 }
183
184 func (s *pullStream) Recv() (*pb.StreamingPullResponse, error) {
185 var res *pb.StreamingPullResponse
186 err := s.call(func(spc pb.Subscriber_StreamingPullClient) error {
187 var err error
188 recordStat(s.ctx, StreamResponseCount, 1)
189 res, err = spc.Recv()
190 return err
191 }, gax.WithRetry(func() gax.Retryer { return &streamingPullRetryer{defaultRetryer: &defaultRetryer{}} }))
192 return res, err
193 }
194
195 func (s *pullStream) CloseSend() error {
196 err := s.call(func(spc pb.Subscriber_StreamingPullClient) error {
197 return spc.CloseSend()
198 })
199 s.mu.Lock()
200 s.err = io.EOF
201 s.mu.Unlock()
202 return err
203 }
204
View as plain text