1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "sync"
19 "testing"
20
21 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
22 "github.com/google/go-cmp/cmp"
23 "github.com/google/go-cmp/cmp/cmpopts"
24 "github.com/googleapis/gax-go/v2"
25 "google.golang.org/api/option"
26 "google.golang.org/grpc"
27 "google.golang.org/protobuf/proto"
28 "google.golang.org/protobuf/testing/protocmp"
29 "google.golang.org/protobuf/types/descriptorpb"
30 )
31
32 func TestCustomClientOptions(t *testing.T) {
33 testCases := []struct {
34 desc string
35 options []option.ClientOption
36 want *writerClientConfig
37 }{
38 {
39 desc: "no options",
40 want: &writerClientConfig{},
41 },
42 {
43 desc: "multiplex enable",
44 options: []option.ClientOption{
45 WithMultiplexing(),
46 },
47 want: &writerClientConfig{
48 useMultiplex: true,
49 maxMultiplexPoolSize: 1,
50 },
51 },
52 {
53 desc: "multiplex max",
54 options: []option.ClientOption{
55 WithMultiplexPoolLimit(99),
56 },
57 want: &writerClientConfig{
58 maxMultiplexPoolSize: 99,
59 },
60 },
61 {
62 desc: "default requests",
63 options: []option.ClientOption{
64 WithDefaultInflightRequests(42),
65 },
66 want: &writerClientConfig{
67 defaultInflightRequests: 42,
68 },
69 },
70 {
71 desc: "default bytes",
72 options: []option.ClientOption{
73 WithDefaultInflightBytes(123),
74 },
75 want: &writerClientConfig{
76 defaultInflightBytes: 123,
77 },
78 },
79 {
80 desc: "default call options",
81 options: []option.ClientOption{
82 WithDefaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
83 },
84 want: &writerClientConfig{
85 defaultAppendRowsCallOptions: []gax.CallOption{
86 gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
87 },
88 },
89 },
90 {
91 desc: "unusual values",
92 options: []option.ClientOption{
93 WithMultiplexing(),
94 WithMultiplexPoolLimit(-8),
95 WithDefaultInflightBytes(-1),
96 WithDefaultInflightRequests(-99),
97 },
98 want: &writerClientConfig{
99 useMultiplex: true,
100 maxMultiplexPoolSize: 1,
101 defaultInflightRequests: 0,
102 defaultInflightBytes: 0,
103 },
104 },
105 {
106 desc: "multiple options",
107 options: []option.ClientOption{
108 WithMultiplexing(),
109 WithMultiplexPoolLimit(10),
110 WithDefaultInflightRequests(99),
111 WithDefaultInflightBytes(12345),
112 WithDefaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
113 },
114 want: &writerClientConfig{
115 useMultiplex: true,
116 maxMultiplexPoolSize: 10,
117 defaultInflightRequests: 99,
118 defaultInflightBytes: 12345,
119 defaultAppendRowsCallOptions: []gax.CallOption{
120 gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
121 },
122 },
123 },
124 }
125 for _, tc := range testCases {
126 gotCfg := newWriterClientConfig(tc.options...)
127
128 if diff := cmp.Diff(gotCfg, tc.want, cmp.AllowUnexported(writerClientConfig{})); diff != "" {
129 t.Errorf("diff in case (%s):\n%v", tc.desc, diff)
130 }
131 }
132 }
133
134 func TestWriterOptions(t *testing.T) {
135
136 testCases := []struct {
137 desc string
138 options []WriterOption
139 want *ManagedStream
140 }{
141 {
142 desc: "WithType",
143 options: []WriterOption{WithType(BufferedStream)},
144 want: func() *ManagedStream {
145 ms := &ManagedStream{
146 streamSettings: defaultStreamSettings(),
147 curTemplate: newVersionedTemplate(),
148 }
149 ms.streamSettings.streamType = BufferedStream
150 return ms
151 }(),
152 },
153 {
154 desc: "WithMaxInflightRequests",
155 options: []WriterOption{WithMaxInflightRequests(2)},
156 want: func() *ManagedStream {
157 ms := &ManagedStream{
158 streamSettings: defaultStreamSettings(),
159 curTemplate: newVersionedTemplate(),
160 }
161 ms.streamSettings.MaxInflightRequests = 2
162 return ms
163 }(),
164 },
165 {
166 desc: "WithMaxInflightBytes",
167 options: []WriterOption{WithMaxInflightBytes(5)},
168 want: func() *ManagedStream {
169 ms := &ManagedStream{
170 streamSettings: defaultStreamSettings(),
171 curTemplate: newVersionedTemplate(),
172 }
173 ms.streamSettings.MaxInflightBytes = 5
174 return ms
175 }(),
176 },
177 {
178 desc: "WithTraceID",
179 options: []WriterOption{WithTraceID("foo")},
180 want: func() *ManagedStream {
181 ms := &ManagedStream{
182 streamSettings: defaultStreamSettings(),
183 curTemplate: newVersionedTemplate(),
184 }
185 ms.streamSettings.TraceID = "foo"
186 return ms
187 }(),
188 },
189 {
190 desc: "WithDestinationTable",
191 options: []WriterOption{WithDestinationTable("foo")},
192 want: func() *ManagedStream {
193 ms := &ManagedStream{
194 streamSettings: defaultStreamSettings(),
195 curTemplate: newVersionedTemplate(),
196 }
197 ms.streamSettings.destinationTable = "foo"
198 return ms
199 }(),
200 },
201 {
202 desc: "WithDataOrigin",
203 options: []WriterOption{WithDataOrigin("origin")},
204 want: func() *ManagedStream {
205 ms := &ManagedStream{
206 streamSettings: defaultStreamSettings(),
207 curTemplate: newVersionedTemplate(),
208 }
209 ms.streamSettings.dataOrigin = "origin"
210 return ms
211 }(),
212 },
213 {
214 desc: "WithCallOption",
215 options: []WriterOption{WithAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)))},
216 want: func() *ManagedStream {
217 ms := &ManagedStream{
218 streamSettings: defaultStreamSettings(),
219 curTemplate: newVersionedTemplate(),
220 }
221 ms.streamSettings.appendCallOptions = append(ms.streamSettings.appendCallOptions,
222 gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)))
223 return ms
224 }(),
225 },
226 {
227 desc: "EnableRetries",
228 options: []WriterOption{EnableWriteRetries(true)},
229 want: func() *ManagedStream {
230 ms := &ManagedStream{
231 streamSettings: defaultStreamSettings(),
232 curTemplate: newVersionedTemplate(),
233 }
234 ms.retry = newStatelessRetryer()
235 return ms
236 }(),
237 },
238 {
239 desc: "WithSchemaDescriptor",
240 options: []WriterOption{WithSchemaDescriptor(&descriptorpb.DescriptorProto{Name: proto.String("name")})},
241 want: func() *ManagedStream {
242 ms := &ManagedStream{
243 streamSettings: defaultStreamSettings(),
244 curTemplate: newVersionedTemplate(),
245 }
246 ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{
247 Rows: &storagepb.AppendRowsRequest_ProtoRows{
248 ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
249 WriterSchema: &storagepb.ProtoSchema{
250 ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("name")},
251 },
252 },
253 },
254 }
255 return ms
256 }(),
257 },
258 {
259 desc: "WithDefaultMissingValueInterpretation",
260 options: []WriterOption{WithDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE)},
261 want: func() *ManagedStream {
262 ms := &ManagedStream{
263 streamSettings: defaultStreamSettings(),
264 curTemplate: newVersionedTemplate(),
265 }
266 ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{
267 DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_DEFAULT_VALUE,
268 }
269 return ms
270 }(),
271 },
272 {
273 desc: "WithtMissingValueInterpretations",
274 options: []WriterOption{WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
275 "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE,
276 "bar": storagepb.AppendRowsRequest_NULL_VALUE,
277 })},
278 want: func() *ManagedStream {
279 ms := &ManagedStream{
280 streamSettings: defaultStreamSettings(),
281 curTemplate: newVersionedTemplate(),
282 }
283 ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{
284 MissingValueInterpretations: map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
285 "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE,
286 "bar": storagepb.AppendRowsRequest_NULL_VALUE,
287 },
288 }
289 return ms
290 }(),
291 },
292 {
293 desc: "multiple",
294 options: []WriterOption{
295 WithType(PendingStream),
296 WithMaxInflightBytes(5),
297 WithTraceID("traceid"),
298 EnableWriteRetries(true),
299 WithSchemaDescriptor(&descriptorpb.DescriptorProto{Name: proto.String("name")}),
300 WithDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE),
301 WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
302 "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE,
303 "bar": storagepb.AppendRowsRequest_NULL_VALUE,
304 }),
305 },
306 want: func() *ManagedStream {
307 ms := &ManagedStream{
308 streamSettings: defaultStreamSettings(),
309 curTemplate: newVersionedTemplate(),
310 }
311 ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{
312 Rows: &storagepb.AppendRowsRequest_ProtoRows{
313 ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
314 WriterSchema: &storagepb.ProtoSchema{
315 ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("name")},
316 },
317 },
318 },
319 MissingValueInterpretations: map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
320 "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE,
321 "bar": storagepb.AppendRowsRequest_NULL_VALUE,
322 },
323 DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_DEFAULT_VALUE,
324 }
325 ms.streamSettings.MaxInflightBytes = 5
326 ms.streamSettings.streamType = PendingStream
327 ms.streamSettings.TraceID = "traceid"
328 ms.retry = newStatelessRetryer()
329 return ms
330 }(),
331 },
332 }
333
334 for _, tc := range testCases {
335 got := &ManagedStream{
336 streamSettings: defaultStreamSettings(),
337 curTemplate: newVersionedTemplate(),
338 }
339 for _, o := range tc.options {
340 o(got)
341 }
342
343 if diff := cmp.Diff(got, tc.want,
344 cmp.AllowUnexported(ManagedStream{}, streamSettings{}),
345 cmp.AllowUnexported(sync.Mutex{}),
346 cmp.AllowUnexported(versionedTemplate{}),
347 cmpopts.IgnoreFields(versionedTemplate{}, "versionTime", "hashVal"),
348 protocmp.Transform(),
349 cmpopts.IgnoreUnexported(statelessRetryer{})); diff != "" {
350 t.Errorf("diff in case (%s):\n%v", tc.desc, diff)
351 }
352
353 }
354 }
355
View as plain text