1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "strings"
19 "testing"
20 "time"
21
22 "cloud.google.com/go/internal/testutil"
23 "github.com/google/go-cmp/cmp"
24 "github.com/google/go-cmp/cmp/cmpopts"
25 bq "google.golang.org/api/bigquery/v2"
26 )
27
28 func defaultLoadJob() *bq.Job {
29 return &bq.Job{
30 JobReference: &bq.JobReference{JobId: "RANDOM", ProjectId: "client-project-id"},
31 Configuration: &bq.JobConfiguration{
32 Load: &bq.JobConfigurationLoad{
33 DestinationTable: &bq.TableReference{
34 ProjectId: "client-project-id",
35 DatasetId: "dataset-id",
36 TableId: "table-id",
37 },
38 SourceUris: []string{"uri"},
39 },
40 },
41 }
42 }
43
44 func stringFieldSchema() *FieldSchema {
45 return &FieldSchema{Name: "fieldname", Type: StringFieldType}
46 }
47
48 func nestedFieldSchema() *FieldSchema {
49 return &FieldSchema{
50 Name: "nested",
51 Type: RecordFieldType,
52 Schema: Schema{stringFieldSchema()},
53 }
54 }
55
56 func bqStringFieldSchema() *bq.TableFieldSchema {
57 return &bq.TableFieldSchema{
58 Name: "fieldname",
59 Type: "STRING",
60 }
61 }
62
63 func bqNestedFieldSchema() *bq.TableFieldSchema {
64 return &bq.TableFieldSchema{
65 Name: "nested",
66 Type: "RECORD",
67 Fields: []*bq.TableFieldSchema{bqStringFieldSchema()},
68 }
69 }
70
71 func TestLoad(t *testing.T) {
72 defer fixRandomID("RANDOM")()
73 c := &Client{projectID: "client-project-id"}
74
75 testCases := []struct {
76 dst *Table
77 src LoadSource
78 jobID string
79 location string
80 config LoadConfig
81 want *bq.Job
82 }{
83 {
84 dst: c.Dataset("dataset-id").Table("table-id"),
85 src: NewGCSReference("uri"),
86 want: defaultLoadJob(),
87 },
88 {
89 dst: c.Dataset("dataset-id").Table("table-id"),
90 src: NewGCSReference("uri"),
91 location: "loc",
92 want: func() *bq.Job {
93 j := defaultLoadJob()
94 j.JobReference.Location = "loc"
95 return j
96 }(),
97 },
98 {
99 dst: c.Dataset("dataset-id").Table("table-id"),
100 jobID: "ajob",
101 config: LoadConfig{
102 CreateDisposition: CreateNever,
103 WriteDisposition: WriteTruncate,
104 Labels: map[string]string{"a": "b"},
105 TimePartitioning: &TimePartitioning{Type: MonthPartitioningType, Expiration: 1234 * time.Millisecond},
106 Clustering: &Clustering{Fields: []string{"cfield1"}},
107 DestinationEncryptionConfig: &EncryptionConfig{KMSKeyName: "keyName"},
108 SchemaUpdateOptions: []string{"ALLOW_FIELD_ADDITION"},
109 },
110 src: NewGCSReference("uri"),
111 want: func() *bq.Job {
112 j := defaultLoadJob()
113 j.Configuration.Labels = map[string]string{"a": "b"}
114 j.Configuration.Load.CreateDisposition = "CREATE_NEVER"
115 j.Configuration.Load.WriteDisposition = "WRITE_TRUNCATE"
116 j.Configuration.Load.TimePartitioning = &bq.TimePartitioning{
117 Type: "MONTH",
118 ExpirationMs: 1234,
119 }
120 j.Configuration.Load.Clustering = &bq.Clustering{
121 Fields: []string{"cfield1"},
122 }
123 j.Configuration.Load.DestinationEncryptionConfiguration = &bq.EncryptionConfiguration{KmsKeyName: "keyName"}
124 j.JobReference = &bq.JobReference{
125 JobId: "ajob",
126 ProjectId: "client-project-id",
127 }
128 j.Configuration.Load.SchemaUpdateOptions = []string{"ALLOW_FIELD_ADDITION"}
129 return j
130 }(),
131 },
132 {
133 dst: c.Dataset("dataset-id").Table("table-id"),
134 src: func() *GCSReference {
135 g := NewGCSReference("uri")
136 g.MaxBadRecords = 1
137 g.AllowJaggedRows = true
138 g.AllowQuotedNewlines = true
139 g.IgnoreUnknownValues = true
140 return g
141 }(),
142 config: LoadConfig{
143 JobTimeout: 4 * time.Second,
144 },
145 want: func() *bq.Job {
146 j := defaultLoadJob()
147 j.Configuration.Load.MaxBadRecords = 1
148 j.Configuration.Load.AllowJaggedRows = true
149 j.Configuration.Load.AllowQuotedNewlines = true
150 j.Configuration.Load.IgnoreUnknownValues = true
151 j.Configuration.JobTimeoutMs = 4000
152 return j
153 }(),
154 },
155 {
156 dst: c.Dataset("dataset-id").Table("table-id"),
157 src: func() *GCSReference {
158 g := NewGCSReference("uri")
159 g.Schema = Schema{
160 stringFieldSchema(),
161 nestedFieldSchema(),
162 }
163 return g
164 }(),
165 want: func() *bq.Job {
166 j := defaultLoadJob()
167 j.Configuration.Load.Schema = &bq.TableSchema{
168 Fields: []*bq.TableFieldSchema{
169 bqStringFieldSchema(),
170 bqNestedFieldSchema(),
171 }}
172 return j
173 }(),
174 },
175 {
176 dst: c.Dataset("dataset-id").Table("table-id"),
177 src: func() *GCSReference {
178 g := NewGCSReference("uri")
179 g.SkipLeadingRows = 1
180 g.SourceFormat = JSON
181 g.Encoding = UTF_8
182 g.FieldDelimiter = "\t"
183 g.Quote = "-"
184 return g
185 }(),
186 want: func() *bq.Job {
187 j := defaultLoadJob()
188 j.Configuration.Load.SkipLeadingRows = 1
189 j.Configuration.Load.SourceFormat = "NEWLINE_DELIMITED_JSON"
190 j.Configuration.Load.Encoding = "UTF-8"
191 j.Configuration.Load.FieldDelimiter = "\t"
192 hyphen := "-"
193 j.Configuration.Load.Quote = &hyphen
194 return j
195 }(),
196 },
197 {
198 dst: c.Dataset("dataset-id").Table("table-id"),
199 src: NewGCSReference("uri"),
200 want: func() *bq.Job {
201 j := defaultLoadJob()
202
203 j.Configuration.Load.Quote = nil
204 return j
205 }(),
206 },
207 {
208 dst: c.Dataset("dataset-id").Table("table-id"),
209 src: func() *GCSReference {
210 g := NewGCSReference("uri")
211 g.ForceZeroQuote = true
212 return g
213 }(),
214 want: func() *bq.Job {
215 j := defaultLoadJob()
216 empty := ""
217 j.Configuration.Load.Quote = &empty
218 return j
219 }(),
220 },
221 {
222 dst: c.Dataset("dataset-id").Table("table-id"),
223 src: func() *ReaderSource {
224 r := NewReaderSource(strings.NewReader("foo"))
225 r.SkipLeadingRows = 1
226 r.SourceFormat = JSON
227 r.Encoding = UTF_8
228 r.FieldDelimiter = "\t"
229 r.Quote = "-"
230 return r
231 }(),
232 want: func() *bq.Job {
233 j := defaultLoadJob()
234 j.Configuration.Load.SourceUris = nil
235 j.Configuration.Load.SkipLeadingRows = 1
236 j.Configuration.Load.SourceFormat = "NEWLINE_DELIMITED_JSON"
237 j.Configuration.Load.Encoding = "UTF-8"
238 j.Configuration.Load.FieldDelimiter = "\t"
239 hyphen := "-"
240 j.Configuration.Load.Quote = &hyphen
241 return j
242 }(),
243 },
244 {
245 dst: c.Dataset("dataset-id").Table("table-id"),
246 src: func() *GCSReference {
247 g := NewGCSReference("uri")
248 g.SourceFormat = Avro
249 return g
250 }(),
251 config: LoadConfig{
252 UseAvroLogicalTypes: true,
253 },
254 want: func() *bq.Job {
255 j := defaultLoadJob()
256 j.Configuration.Load.SourceFormat = "AVRO"
257 j.Configuration.Load.UseAvroLogicalTypes = true
258 return j
259 }(),
260 },
261 {
262 dst: c.Dataset("dataset-id").Table("table-id"),
263 src: func() *ReaderSource {
264 r := NewReaderSource(strings.NewReader("foo"))
265 r.SourceFormat = Avro
266 return r
267 }(),
268 config: LoadConfig{
269 UseAvroLogicalTypes: true,
270 },
271 want: func() *bq.Job {
272 j := defaultLoadJob()
273 j.Configuration.Load.SourceUris = nil
274 j.Configuration.Load.SourceFormat = "AVRO"
275 j.Configuration.Load.UseAvroLogicalTypes = true
276 return j
277 }(),
278 },
279 {
280 dst: c.Dataset("dataset-id").Table("table-id"),
281 src: func() *ReaderSource {
282 r := NewReaderSource(strings.NewReader("foo"))
283 return r
284 }(),
285 config: LoadConfig{
286 TimePartitioning: &TimePartitioning{
287 Type: HourPartitioningType,
288 Field: "somefield",
289 },
290 },
291 want: func() *bq.Job {
292 j := defaultLoadJob()
293 j.Configuration.Load.SourceUris = nil
294 j.Configuration.Load.TimePartitioning = &bq.TimePartitioning{
295 Field: "somefield",
296 Type: "HOUR",
297 }
298 return j
299 }(),
300 },
301 {
302 dst: c.Dataset("dataset-id").Table("table-id"),
303 src: func() *ReaderSource {
304 r := NewReaderSource(strings.NewReader("foo"))
305 return r
306 }(),
307 config: LoadConfig{
308 RangePartitioning: &RangePartitioning{
309 Field: "somefield",
310 Range: &RangePartitioningRange{
311 Start: 1,
312 End: 2,
313 Interval: 3,
314 },
315 },
316 },
317 want: func() *bq.Job {
318 j := defaultLoadJob()
319 j.Configuration.Load.SourceUris = nil
320 j.Configuration.Load.RangePartitioning = &bq.RangePartitioning{
321 Field: "somefield",
322 Range: &bq.RangePartitioningRange{
323 Start: 1,
324 End: 2,
325 Interval: 3,
326 ForceSendFields: []string{"Start", "End", "Interval"},
327 },
328 }
329 return j
330 }(),
331 },
332 {
333 dst: c.Dataset("dataset-id").Table("table-id"),
334 src: func() *GCSReference {
335 g := NewGCSReference("uri")
336 g.SourceFormat = DatastoreBackup
337 return g
338 }(),
339 config: LoadConfig{
340 ProjectionFields: []string{"foo", "bar", "baz"},
341 },
342 want: func() *bq.Job {
343 j := defaultLoadJob()
344 j.Configuration.Load.SourceFormat = "DATASTORE_BACKUP"
345 j.Configuration.Load.ProjectionFields = []string{"foo", "bar", "baz"}
346 return j
347 }(),
348 },
349 {
350 dst: c.Dataset("dataset-id").Table("table-id"),
351 src: func() *GCSReference {
352 g := NewGCSReference("uri")
353 g.SourceFormat = Parquet
354 return g
355 }(),
356 config: LoadConfig{
357 HivePartitioningOptions: &HivePartitioningOptions{
358 Mode: CustomHivePartitioningMode,
359 SourceURIPrefix: "source_uri",
360 RequirePartitionFilter: true,
361 },
362 },
363 want: func() *bq.Job {
364 j := defaultLoadJob()
365 j.Configuration.Load.SourceFormat = "PARQUET"
366 j.Configuration.Load.HivePartitioningOptions = &bq.HivePartitioningOptions{
367 Mode: "CUSTOM",
368 RequirePartitionFilter: true,
369 SourceUriPrefix: "source_uri",
370 }
371 return j
372 }(),
373 },
374 {
375 dst: c.Dataset("dataset-id").Table("table-id"),
376 src: func() *GCSReference {
377 g := NewGCSReference("uri")
378 g.SourceFormat = Parquet
379 return g
380 }(),
381 config: LoadConfig{
382 DecimalTargetTypes: []DecimalTargetType{BigNumericTargetType, NumericTargetType, StringTargetType},
383 },
384 want: func() *bq.Job {
385 j := defaultLoadJob()
386 j.Configuration.Load.SourceFormat = "PARQUET"
387 j.Configuration.Load.DecimalTargetTypes = []string{"BIGNUMERIC", "NUMERIC", "STRING"}
388 return j
389 }(),
390 },
391 {
392 dst: c.Dataset("dataset-id").Table("table-id"),
393 src: func() *GCSReference {
394 g := NewGCSReference("uri")
395 g.SourceFormat = Parquet
396 return g
397 }(),
398 config: LoadConfig{
399 ReferenceFileSchemaURI: "schema.parquet",
400 },
401 want: func() *bq.Job {
402 j := defaultLoadJob()
403 j.Configuration.Load.SourceFormat = "PARQUET"
404 j.Configuration.Load.ReferenceFileSchemaUri = "schema.parquet"
405 return j
406 }(),
407 },
408 {
409 dst: c.Dataset("dataset-id").Table("table-id"),
410 src: func() *GCSReference {
411 g := NewGCSReference("uri")
412 return g
413 }(),
414 config: LoadConfig{
415 CreateSession: true,
416 ConnectionProperties: []*ConnectionProperty{
417 {
418 Key: "session_id",
419 Value: "session_id_1234567890",
420 },
421 },
422 },
423 want: func() *bq.Job {
424 j := defaultLoadJob()
425 j.Configuration.Load.CreateSession = true
426 j.Configuration.Load.ConnectionProperties = []*bq.ConnectionProperty{
427 {
428 Key: "session_id",
429 Value: "session_id_1234567890",
430 },
431 }
432 return j
433 }(),
434 },
435 }
436
437 for i, tc := range testCases {
438 loader := tc.dst.LoaderFrom(tc.src)
439 loader.JobID = tc.jobID
440 loader.Location = tc.location
441 tc.config.Src = tc.src
442 tc.config.Dst = tc.dst
443 loader.LoadConfig = tc.config
444 got, _ := loader.newJob()
445 checkJob(t, i, got, tc.want)
446
447 jc, err := bqToJobConfig(got.Configuration, c)
448 if err != nil {
449 t.Fatalf("#%d: %v", i, err)
450 }
451 diff := testutil.Diff(jc.(*LoadConfig), &loader.LoadConfig,
452 cmp.AllowUnexported(Table{}, Client{}),
453 cmpopts.IgnoreUnexported(ReaderSource{}))
454 if diff != "" {
455 t.Errorf("#%d: (got=-, want=+:\n%s", i, diff)
456 }
457 }
458 }
459
View as plain text