...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "bytes"
19 "encoding/binary"
20 "hash/crc32"
21 "time"
22
23 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
24 "google.golang.org/protobuf/proto"
25 "google.golang.org/protobuf/types/descriptorpb"
26 )
27
28
29
30
31
32
33 type sendOptimizer interface {
34
35
36 signalReset()
37
38
39 optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error
40
41
42 isMultiplexing() bool
43 }
44
45
46 type verboseOptimizer struct {
47 }
48
49 func (vo *verboseOptimizer) signalReset() {
50
51 }
52
53
54 func (vo *verboseOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error {
55 return arc.Send(pw.constructFullRequest(true))
56 }
57
58 func (vo *verboseOptimizer) isMultiplexing() bool {
59
60 return false
61 }
62
63
64
65
66
67
68
69
70
71
72 type simplexOptimizer struct {
73 haveSent bool
74 }
75
76 func (so *simplexOptimizer) signalReset() {
77 so.haveSent = false
78 }
79
80 func (so *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error {
81 var err error
82 if so.haveSent {
83
84 err = arc.Send(pw.req)
85 } else {
86
87 err = arc.Send(pw.constructFullRequest(true))
88 }
89 so.haveSent = err == nil
90 return err
91 }
92
93 func (so *simplexOptimizer) isMultiplexing() bool {
94
95 return false
96 }
97
98
99
100
101
102
103
104
105
106
107
108 type multiplexOptimizer struct {
109 prevStream string
110 prevTemplate *versionedTemplate
111 multiplexStreams bool
112 }
113
114 func (mo *multiplexOptimizer) signalReset() {
115 mo.prevStream = ""
116 mo.multiplexStreams = false
117 mo.prevTemplate = nil
118 }
119
120 func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error {
121 var err error
122 if mo.prevStream == "" {
123
124 req := pw.constructFullRequest(true)
125 err = arc.Send(req)
126 if err == nil {
127 mo.prevStream = req.GetWriteStream()
128 mo.prevTemplate = pw.reqTmpl
129 }
130 } else {
131
132 if mo.prevStream == pw.writeStreamID {
133
134 if pw.req.GetWriteStream() == "" {
135 pw.req.WriteStream = pw.writeStreamID
136 }
137
138 swapOnSuccess := false
139 req := pw.req
140 if mo.prevTemplate != nil {
141 if !mo.prevTemplate.Compatible(pw.reqTmpl) {
142 swapOnSuccess = true
143 req = pw.constructFullRequest(false)
144 }
145 }
146 err = arc.Send(req)
147 if err == nil && swapOnSuccess {
148 mo.prevTemplate = pw.reqTmpl
149 }
150 } else {
151
152 req := pw.constructFullRequest(false)
153 err = arc.Send(req)
154 if err == nil {
155
156 mo.prevStream = pw.writeStreamID
157 mo.prevTemplate = pw.reqTmpl
158 }
159
160
161 mo.multiplexStreams = true
162 }
163 }
164 return err
165 }
166
167 func (mo *multiplexOptimizer) isMultiplexing() bool {
168 return mo.multiplexStreams
169 }
170
171
172
173
174
175
176 type versionedTemplate struct {
177 versionTime time.Time
178 hashVal uint32
179 tmpl *storagepb.AppendRowsRequest
180 }
181
182 func newVersionedTemplate() *versionedTemplate {
183 vt := &versionedTemplate{
184 versionTime: time.Now(),
185 tmpl: &storagepb.AppendRowsRequest{},
186 }
187 vt.computeHash()
188 return vt
189 }
190
191
192
193 func (vt *versionedTemplate) computeHash() {
194 buf := new(bytes.Buffer)
195 if b, err := proto.Marshal(vt.tmpl); err == nil {
196 buf.Write(b)
197 } else {
198
199 binary.Write(buf, binary.LittleEndian, vt.versionTime.UnixNano())
200 }
201 vt.hashVal = crc32.ChecksumIEEE(buf.Bytes())
202 }
203
204 type templateRevisionF func(m *storagepb.AppendRowsRequest)
205
206
207
208
209 func (vt *versionedTemplate) revise(changes ...templateRevisionF) *versionedTemplate {
210 before := vt
211 if before == nil {
212 before = newVersionedTemplate()
213 }
214 if len(changes) == 0 {
215
216 return before
217 }
218 out := &versionedTemplate{
219 versionTime: time.Now(),
220 tmpl: proto.Clone(before.tmpl).(*storagepb.AppendRowsRequest),
221 }
222 for _, r := range changes {
223 r(out.tmpl)
224 }
225 out.computeHash()
226 if out.Compatible(before) {
227
228
229 return before
230 }
231 return out
232 }
233
234
235
236 func (vt *versionedTemplate) Compatible(other *versionedTemplate) bool {
237 if other == nil {
238 return vt == nil
239 }
240 return vt.hashVal == other.hashVal
241 }
242
243 func reviseProtoSchema(newSchema *descriptorpb.DescriptorProto) templateRevisionF {
244 return func(m *storagepb.AppendRowsRequest) {
245 if m != nil {
246 m.Rows = &storagepb.AppendRowsRequest_ProtoRows{
247 ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
248 WriterSchema: &storagepb.ProtoSchema{
249 ProtoDescriptor: proto.Clone(newSchema).(*descriptorpb.DescriptorProto),
250 },
251 },
252 }
253 }
254 }
255 }
256
257 func reviseMissingValueInterpretations(vi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF {
258 return func(m *storagepb.AppendRowsRequest) {
259 if m != nil {
260 m.MissingValueInterpretations = vi
261 }
262 }
263 }
264
265 func reviseDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF {
266 return func(m *storagepb.AppendRowsRequest) {
267 if m != nil {
268 m.DefaultMissingValueInterpretation = def
269 }
270 }
271 }
272
View as plain text