1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "context"
19 "fmt"
20 "io"
21 "sync"
22 "time"
23
24 "cloud.google.com/go/bigquery/internal"
25 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
26 "github.com/googleapis/gax-go/v2"
27 "go.opencensus.io/tag"
28 "google.golang.org/grpc"
29 grpcstatus "google.golang.org/grpc/status"
30 "google.golang.org/protobuf/types/known/wrapperspb"
31 )
32
33
34 type StreamType string
35
36 var (
37
38
39
40
41
42 DefaultStream StreamType = "DEFAULT"
43
44
45
46
47 CommittedStream StreamType = "COMMITTED"
48
49
50
51 BufferedStream StreamType = "BUFFERED"
52
53
54
55 PendingStream StreamType = "PENDING"
56 )
57
58 func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type {
59 switch t {
60 case CommittedStream:
61 return storagepb.WriteStream_COMMITTED
62 case PendingStream:
63 return storagepb.WriteStream_PENDING
64 case BufferedStream:
65 return storagepb.WriteStream_BUFFERED
66 default:
67 return storagepb.WriteStream_TYPE_UNSPECIFIED
68 }
69 }
70
71
72 type ManagedStream struct {
73
74 id string
75
76
77 pool *connectionPool
78
79 streamSettings *streamSettings
80
81 curTemplate *versionedTemplate
82 c *Client
83 retry *statelessRetryer
84
85
86 mu sync.Mutex
87 ctx context.Context
88 cancel context.CancelFunc
89 err error
90 }
91
92
93 type streamSettings struct {
94
95
96 streamID string
97
98
99
100 streamType StreamType
101
102
103
104 MaxInflightRequests int
105
106
107
108 MaxInflightBytes int
109
110
111
112 TraceID string
113
114
115
116 dataOrigin string
117
118
119 destinationTable string
120
121 appendCallOptions []gax.CallOption
122
123
124 multiplex bool
125
126
127 streamFunc streamClientFunc
128 }
129
130 func defaultStreamSettings() *streamSettings {
131 return &streamSettings{
132 streamType: DefaultStream,
133 MaxInflightRequests: 1000,
134 MaxInflightBytes: 0,
135 appendCallOptions: []gax.CallOption{
136 gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)),
137 },
138 }
139 }
140
141
142 func buildTraceID(s *streamSettings) string {
143 base := fmt.Sprintf("go-managedwriter:%s", internal.Version)
144 if s != nil && s.TraceID != "" {
145 return fmt.Sprintf("%s %s", base, s.TraceID)
146 }
147 return base
148 }
149
150
151 func (ms *ManagedStream) StreamName() string {
152 return ms.streamSettings.streamID
153 }
154
155
156 func (ms *ManagedStream) StreamType() StreamType {
157 return ms.streamSettings.streamType
158 }
159
160
161
162 func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...gax.CallOption) (int64, error) {
163 req := &storagepb.FlushRowsRequest{
164 WriteStream: ms.streamSettings.streamID,
165 Offset: &wrapperspb.Int64Value{
166 Value: offset,
167 },
168 }
169 resp, err := ms.c.rawClient.FlushRows(ctx, req, opts...)
170 recordWriterStat(ms, FlushRequests, 1)
171 if err != nil {
172 return 0, err
173 }
174 return resp.GetOffset(), nil
175 }
176
177
178
179
180
181
182 func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (int64, error) {
183
184 req := &storagepb.FinalizeWriteStreamRequest{
185 Name: ms.streamSettings.streamID,
186 }
187 resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req, opts...)
188 if err != nil {
189 return 0, err
190 }
191 return resp.GetRowCount(), nil
192 }
193
194
195
196
197 func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error {
198 for {
199 ms.mu.Lock()
200 err := ms.err
201 ms.mu.Unlock()
202 if err != nil {
203 return err
204 }
205 conn, err := ms.pool.selectConn(pw)
206 if err != nil {
207 pw.markDone(nil, err)
208 return err
209 }
210 appendErr := conn.lockingAppend(pw)
211 if appendErr != nil {
212
213 status := grpcstatus.Convert(appendErr)
214 if status != nil {
215 recordCtx := ms.ctx
216 if ctx, err := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())); err == nil {
217 recordCtx = ctx
218 }
219 recordStat(recordCtx, AppendRequestErrors, 1)
220 }
221 bo, shouldRetry := ms.statelessRetryer().Retry(appendErr, pw.attemptCount)
222 if shouldRetry {
223 if err := gax.Sleep(ms.ctx, bo); err != nil {
224 return err
225 }
226 continue
227 }
228
229
230
231 return appendErr
232 }
233 return nil
234 }
235 }
236
237
238 func (ms *ManagedStream) Close() error {
239
240 ms.mu.Lock()
241 defer ms.mu.Unlock()
242
243 var returned error
244
245 if ms.pool != nil {
246 if err := ms.pool.removeWriter(ms); err != nil {
247 returned = err
248 }
249 }
250
251
252 if ms.cancel != nil {
253 ms.cancel()
254 ms.cancel = nil
255 }
256
257
258 if ms.err == nil {
259 ms.err = io.EOF
260 }
261 if returned == nil {
262 returned = ms.err
263 }
264 return returned
265 }
266
267
268
269 func (ms *ManagedStream) buildRequest(data [][]byte) *storagepb.AppendRowsRequest {
270 return &storagepb.AppendRowsRequest{
271 Rows: &storagepb.AppendRowsRequest_ProtoRows{
272 ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
273 Rows: &storagepb.ProtoRows{
274 SerializedRows: data,
275 },
276 },
277 },
278 }
279 }
280
281
282
283
284
285
286
287
288
289
290
291
292 func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...AppendOption) (*AppendResult, error) {
293
294 ms.mu.Lock()
295 err := ms.err
296 ms.mu.Unlock()
297 if err != nil {
298 return nil, err
299 }
300
301 curTemplate := ms.curTemplate
302 req := ms.buildRequest(data)
303 pw := newPendingWrite(ctx, ms, req, curTemplate, ms.streamSettings.streamID, ms.streamSettings.TraceID)
304
305 for _, opt := range opts {
306 opt(pw)
307 }
308
309 if pw.reqTmpl != nil {
310 if pw.reqTmpl.tmpl != nil {
311
312 pw.req.MissingValueInterpretations = pw.reqTmpl.tmpl.GetMissingValueInterpretations()
313 }
314 }
315
316
317
318 errCh := make(chan error)
319 var appendErr error
320 go func() {
321 select {
322 case errCh <- ms.appendWithRetry(pw):
323 case <-ctx.Done():
324 case <-ms.ctx.Done():
325 }
326 close(errCh)
327 }()
328 select {
329 case <-ctx.Done():
330
331
332
333
334
335
336 return nil, ctx.Err()
337 case <-ms.ctx.Done():
338
339
340 ms.mu.Lock()
341 if ms.err == nil {
342 ms.err = ms.ctx.Err()
343 }
344 ms.mu.Unlock()
345 ms.Close()
346
347 return nil, ms.err
348 case appendErr = <-errCh:
349 if appendErr != nil {
350 return nil, appendErr
351 }
352 return pw.result, nil
353 }
354 }
355
356
357
358 func (ms *ManagedStream) processRetry(pw *pendingWrite, srcConn *connection, appendResp *storagepb.AppendRowsResponse, initialErr error) {
359 err := initialErr
360 for {
361 pause, shouldRetry := ms.statelessRetryer().Retry(err, pw.attemptCount)
362 if !shouldRetry {
363
364 pw.markDone(appendResp, err)
365 return
366 }
367 time.Sleep(pause)
368 err = ms.appendWithRetry(pw)
369 if err != nil {
370
371 continue
372 }
373
374
375 recordWriterStat(ms, AppendRetryCount, 1)
376 break
377 }
378 }
379
380
381
382 func (ms *ManagedStream) statelessRetryer() *statelessRetryer {
383 if ms.retry != nil {
384 return ms.retry
385 }
386 if ms.pool != nil {
387 return ms.pool.defaultRetryer()
388 }
389 return &statelessRetryer{
390 maxAttempts: 1,
391 }
392 }
393
View as plain text