...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "context"
19
20 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
21 "github.com/googleapis/gax-go/v2/apierror"
22 grpcstatus "google.golang.org/grpc/status"
23 "google.golang.org/protobuf/proto"
24 )
25
26
27
28 const NoStreamOffset int64 = -1
29
30
31 type AppendResult struct {
32 ready chan struct{}
33
34
35 err error
36
37
38 response *storagepb.AppendRowsResponse
39
40
41 totalAttempts int
42 }
43
44 func newAppendResult() *AppendResult {
45 return &AppendResult{
46 ready: make(chan struct{}),
47 }
48 }
49
50
51
52 func (ar *AppendResult) Ready() <-chan struct{} { return ar.ready }
53
54
55
56
57
58 func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) {
59 select {
60 case <-ctx.Done():
61 return NoStreamOffset, ctx.Err()
62 case <-ar.Ready():
63 full, err := ar.FullResponse(ctx)
64 offset := NoStreamOffset
65 if full != nil {
66 if result := full.GetAppendResult(); result != nil {
67 if off := result.GetOffset(); off != nil {
68 offset = off.GetValue()
69 }
70 }
71 }
72 return offset, err
73 }
74 }
75
76
77
78
79
80
81
82
83
84 func (ar *AppendResult) FullResponse(ctx context.Context) (*storagepb.AppendRowsResponse, error) {
85 select {
86 case <-ctx.Done():
87 return nil, ctx.Err()
88 case <-ar.Ready():
89 var err error
90 if ar.err != nil {
91 err = ar.err
92 } else {
93 if ar.response != nil {
94 if status := ar.response.GetError(); status != nil {
95 statusErr := grpcstatus.ErrorProto(status)
96
97 if apiErr, ok := apierror.FromError(statusErr); ok {
98 err = apiErr
99 } else {
100 err = statusErr
101 }
102 }
103 }
104 }
105 if ar.response != nil {
106 return proto.Clone(ar.response).(*storagepb.AppendRowsResponse), err
107 }
108 return nil, err
109 }
110 }
111
112 func (ar *AppendResult) offset(ctx context.Context) int64 {
113 select {
114 case <-ctx.Done():
115 return NoStreamOffset
116 case <-ar.Ready():
117 if ar.response != nil {
118 if result := ar.response.GetAppendResult(); result != nil {
119 if off := result.GetOffset(); off != nil {
120 return off.GetValue()
121 }
122 }
123 }
124 return NoStreamOffset
125 }
126 }
127
128
129
130
131
132 func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSchema, error) {
133 select {
134 case <-ctx.Done():
135 return nil, ctx.Err()
136 case <-ar.Ready():
137 if ar.response != nil {
138 if schema := ar.response.GetUpdatedSchema(); schema != nil {
139 return proto.Clone(schema).(*storagepb.TableSchema), nil
140 }
141 }
142 return nil, nil
143 }
144 }
145
146
147
148
149 func (ar *AppendResult) TotalAttempts(ctx context.Context) (int, error) {
150 select {
151 case <-ctx.Done():
152 return 0, ctx.Err()
153 case <-ar.Ready():
154 return ar.totalAttempts, nil
155 }
156 }
157
158
159
160 type pendingWrite struct {
161
162
163 writer *ManagedStream
164
165
166
167
168 req *storagepb.AppendRowsRequest
169 reqTmpl *versionedTemplate
170 traceID string
171 writeStreamID string
172
173
174 result *AppendResult
175
176
177 reqSize int
178
179
180
181 reqCtx context.Context
182
183
184 attemptCount int
185 }
186
187
188
189
190
191 func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, reqTmpl *versionedTemplate, writeStreamID, traceID string) *pendingWrite {
192 pw := &pendingWrite{
193 writer: src,
194 result: newAppendResult(),
195 reqCtx: ctx,
196
197 req: req,
198 reqTmpl: reqTmpl,
199 writeStreamID: writeStreamID,
200 traceID: traceID,
201 }
202
203 pw.reqSize = proto.Size(pw.req) + len(writeStreamID) + len(traceID)
204 if pw.reqTmpl != nil {
205 pw.reqSize += proto.Size(pw.reqTmpl.tmpl)
206 }
207 return pw
208 }
209
210
211
212 func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error) {
213
214 if resp != nil {
215 pw.result.response = resp
216 }
217 pw.result.err = err
218 pw.result.totalAttempts = pw.attemptCount
219
220
221 close(pw.result.ready)
222
223 pw.req = nil
224 pw.reqTmpl = nil
225 pw.writer = nil
226 pw.reqCtx = nil
227 }
228
229 func (pw *pendingWrite) constructFullRequest(addTrace bool) *storagepb.AppendRowsRequest {
230 req := &storagepb.AppendRowsRequest{}
231 if pw.reqTmpl != nil {
232 req = proto.Clone(pw.reqTmpl.tmpl).(*storagepb.AppendRowsRequest)
233 }
234 if pw.req != nil {
235 proto.Merge(req, pw.req)
236 }
237 if addTrace {
238 req.TraceId = buildTraceID(&streamSettings{TraceID: pw.traceID})
239 }
240 req.WriteStream = pw.writeStreamID
241 return req
242 }
243
View as plain text