1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "encoding/json"
20 "errors"
21 "flag"
22 "fmt"
23 "log"
24 "math/big"
25 "net/http"
26 "os"
27 "sort"
28 "strings"
29 "testing"
30 "time"
31
32 connection "cloud.google.com/go/bigquery/connection/apiv1"
33 "cloud.google.com/go/civil"
34 datacatalog "cloud.google.com/go/datacatalog/apiv1"
35 "cloud.google.com/go/datacatalog/apiv1/datacatalogpb"
36 "cloud.google.com/go/httpreplay"
37 "cloud.google.com/go/internal"
38 "cloud.google.com/go/internal/pretty"
39 "cloud.google.com/go/internal/testutil"
40 "cloud.google.com/go/internal/uid"
41 "cloud.google.com/go/storage"
42 "github.com/google/go-cmp/cmp"
43 "github.com/google/go-cmp/cmp/cmpopts"
44 gax "github.com/googleapis/gax-go/v2"
45 bq "google.golang.org/api/bigquery/v2"
46 "google.golang.org/api/googleapi"
47 "google.golang.org/api/iterator"
48 "google.golang.org/api/option"
49 )
50
51 const replayFilename = "bigquery.replay"
52
53 var record = flag.Bool("record", false, "record RPCs")
54
55 var (
56 client *Client
57 storageOptimizedClient *Client
58 storageClient *storage.Client
59 connectionsClient *connection.Client
60 policyTagManagerClient *datacatalog.PolicyTagManagerClient
61 dataset *Dataset
62 otherDataset *Dataset
63 schema = Schema{
64 {Name: "name", Type: StringFieldType},
65 {Name: "nums", Type: IntegerFieldType, Repeated: true},
66 {Name: "rec", Type: RecordFieldType, Schema: Schema{
67 {Name: "bool", Type: BooleanFieldType},
68 }},
69 }
70 testTableExpiration time.Time
71 datasetIDs, tableIDs, modelIDs, routineIDs *uid.Space
72 )
73
74
75
76
77 func TestMain(m *testing.M) {
78 cleanup := initIntegrationTest()
79 r := m.Run()
80 cleanup()
81 os.Exit(r)
82 }
83
84 func getClient(t *testing.T) *Client {
85 if client == nil {
86 t.Skip("Integration tests skipped")
87 }
88 return client
89 }
90
91 var grpcHeadersChecker = testutil.DefaultHeadersEnforcer()
92
93
94
95 func initIntegrationTest() func() {
96 ctx := context.Background()
97 flag.Parse()
98 projID := testutil.ProjID()
99 switch {
100 case testing.Short() && *record:
101 log.Fatal("cannot combine -short and -record")
102 return func() {}
103
104 case testing.Short() && httpreplay.Supported() && testutil.CanReplay(replayFilename) && projID != "":
105
106
107 log.Printf("replaying from %s", replayFilename)
108 httpreplay.DebugHeaders()
109 replayer, err := httpreplay.NewReplayer(replayFilename)
110 if err != nil {
111 log.Fatal(err)
112 }
113 var t time.Time
114 if err := json.Unmarshal(replayer.Initial(), &t); err != nil {
115 log.Fatal(err)
116 }
117 hc, err := replayer.Client(ctx)
118 if err != nil {
119 log.Fatal(err)
120 }
121 client, err = NewClient(ctx, projID, option.WithHTTPClient(hc))
122 if err != nil {
123 log.Fatal(err)
124 }
125 storageOptimizedClient, err = NewClient(ctx, projID, option.WithHTTPClient(hc))
126 if err != nil {
127 log.Fatal(err)
128 }
129 err = storageOptimizedClient.EnableStorageReadClient(ctx)
130 if err != nil {
131 log.Fatal(err)
132 }
133 storageClient, err = storage.NewClient(ctx, option.WithHTTPClient(hc))
134 if err != nil {
135 log.Fatal(err)
136 }
137 connectionsClient, err = connection.NewClient(ctx, option.WithHTTPClient(hc))
138 if err != nil {
139 log.Fatal(err)
140 }
141 policyTagManagerClient, err = datacatalog.NewPolicyTagManagerClient(ctx)
142 if err != nil {
143 log.Fatal(err)
144 }
145 cleanup := initTestState(client, t)
146 return func() {
147 cleanup()
148 _ = replayer.Close()
149 }
150
151 case testing.Short():
152
153 if testutil.CanReplay(replayFilename) && projID != "" {
154 log.Print("replay not supported for Go versions before 1.8")
155 }
156 client = nil
157 storageOptimizedClient = nil
158 storageClient = nil
159 connectionsClient = nil
160 return func() {}
161
162 default:
163 ts := testutil.TokenSource(ctx, Scope)
164 if ts == nil {
165 log.Println("Integration tests skipped. See CONTRIBUTING.md for details")
166 return func() {}
167 }
168 bqOpts := []option.ClientOption{option.WithTokenSource(ts)}
169 sOpts := []option.ClientOption{option.WithTokenSource(testutil.TokenSource(ctx, storage.ScopeFullControl))}
170 ptmOpts := []option.ClientOption{option.WithTokenSource(testutil.TokenSource(ctx, datacatalog.DefaultAuthScopes()...))}
171 connOpts := []option.ClientOption{option.WithTokenSource(testutil.TokenSource(ctx, connection.DefaultAuthScopes()...))}
172 cleanup := func() {}
173 now := time.Now().UTC()
174 if *record {
175 if !httpreplay.Supported() {
176 log.Print("record not supported for Go versions before 1.8")
177 } else {
178 nowBytes, err := json.Marshal(now)
179 if err != nil {
180 log.Fatal(err)
181 }
182 recorder, err := httpreplay.NewRecorder(replayFilename, nowBytes)
183 if err != nil {
184 log.Fatalf("could not record: %v", err)
185 }
186 log.Printf("recording to %s", replayFilename)
187 hc, err := recorder.Client(ctx, bqOpts...)
188 if err != nil {
189 log.Fatal(err)
190 }
191 bqOpts = append(bqOpts, option.WithHTTPClient(hc))
192 hc, err = recorder.Client(ctx, sOpts...)
193 if err != nil {
194 log.Fatal(err)
195 }
196 sOpts = append(sOpts, option.WithHTTPClient(hc))
197 cleanup = func() {
198 if err := recorder.Close(); err != nil {
199 log.Printf("saving recording: %v", err)
200 }
201 }
202 }
203 } else {
204
205
206
207 bqOpts = append(bqOpts, grpcHeadersChecker.CallOptions()...)
208 sOpts = append(sOpts, grpcHeadersChecker.CallOptions()...)
209 ptmOpts = append(ptmOpts, grpcHeadersChecker.CallOptions()...)
210 connOpts = append(connOpts, grpcHeadersChecker.CallOptions()...)
211 }
212 var err error
213 client, err = NewClient(ctx, projID, bqOpts...)
214 if err != nil {
215 log.Fatalf("NewClient: %v", err)
216 }
217 storageOptimizedClient, err = NewClient(ctx, projID, bqOpts...)
218 if err != nil {
219 log.Fatalf("NewClient: %v", err)
220 }
221 err = storageOptimizedClient.EnableStorageReadClient(ctx, bqOpts...)
222 if err != nil {
223 log.Fatalf("ConfigureStorageReadClient: %v", err)
224 }
225 storageClient, err = storage.NewClient(ctx, sOpts...)
226 if err != nil {
227 log.Fatalf("storage.NewClient: %v", err)
228 }
229 policyTagManagerClient, err = datacatalog.NewPolicyTagManagerClient(ctx, ptmOpts...)
230 if err != nil {
231 log.Fatalf("datacatalog.NewPolicyTagManagerClient: %v", err)
232 }
233 connectionsClient, err = connection.NewClient(ctx, connOpts...)
234 if err != nil {
235 log.Fatalf("connection.NewService: %v", err)
236 }
237 c := initTestState(client, now)
238 return func() { c(); cleanup() }
239 }
240 }
241
242 func initTestState(client *Client, t time.Time) func() {
243
244
245 ctx := context.Background()
246 opts := &uid.Options{Sep: '_', Time: t}
247 datasetIDs = uid.NewSpace("dataset", opts)
248 tableIDs = uid.NewSpace("table", opts)
249 modelIDs = uid.NewSpace("model", opts)
250 routineIDs = uid.NewSpace("routine", opts)
251 testTableExpiration = t.Add(2 * time.Hour).Round(time.Second)
252
253 Seed(t.UnixNano())
254
255 prefixes := []string{
256 "dataset_",
257 "managedwriter_test_dataset_",
258 }
259 for _, prefix := range prefixes {
260 deleteDatasets(ctx, prefix)
261 }
262
263 dataset = client.Dataset(datasetIDs.New())
264 otherDataset = client.Dataset(datasetIDs.New())
265
266 if err := dataset.Create(ctx, nil); err != nil {
267 log.Fatalf("creating dataset %s: %v", dataset.DatasetID, err)
268 }
269 if err := otherDataset.Create(ctx, nil); err != nil {
270 log.Fatalf("creating other dataset %s: %v", dataset.DatasetID, err)
271 }
272
273 return func() {
274 if err := dataset.DeleteWithContents(ctx); err != nil {
275 log.Printf("could not delete %s", dataset.DatasetID)
276 }
277 if err := otherDataset.DeleteWithContents(ctx); err != nil {
278 log.Printf("could not delete %s", dataset.DatasetID)
279 }
280 }
281 }
282
283
284
285 func isResourceStale(t time.Time) bool {
286 return time.Since(t).Hours() >= 24
287 }
288
289
290 func deleteDatasets(ctx context.Context, prefix string) {
291 it := client.Datasets(ctx)
292
293 for {
294 ds, err := it.Next()
295 if err == iterator.Done {
296 break
297 }
298 if err != nil {
299 fmt.Printf("failed to list project datasets: %v\n", err)
300 break
301 }
302 if !strings.HasPrefix(ds.DatasetID, prefix) {
303 continue
304 }
305
306 md, err := ds.Metadata(ctx)
307 if err != nil {
308 fmt.Printf("failed to get dataset `%s` metadata: %v\n", ds.DatasetID, err)
309 continue
310 }
311 if isResourceStale(md.CreationTime) {
312 fmt.Printf("found old dataset to delete: %s\n", ds.DatasetID)
313 err := ds.DeleteWithContents(ctx)
314 if err != nil {
315 fmt.Printf("failed to delete old dataset `%s`\n", ds.DatasetID)
316 }
317 }
318 }
319 }
320
321 func TestIntegration_DetectProjectID(t *testing.T) {
322 ctx := context.Background()
323 testCreds := testutil.Credentials(ctx)
324 if testCreds == nil {
325 t.Skip("test credentials not present, skipping")
326 }
327
328 if _, err := NewClient(ctx, DetectProjectID, option.WithCredentials(testCreds)); err != nil {
329 t.Errorf("test NewClient: %v", err)
330 }
331
332 badTS := testutil.ErroringTokenSource{}
333
334 if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil {
335 t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.Project())
336 }
337 }
338
339 func TestIntegration_JobFrom(t *testing.T) {
340 if client == nil {
341 t.Skip("Integration tests skipped")
342 }
343 ctx := context.Background()
344
345
346 q := client.Query("SELECT 123 as foo")
347 it, err := q.Read(ctx)
348 if err != nil {
349 t.Fatalf("failed to run test query: %v", err)
350 }
351 want := it.SourceJob()
352
353
354 otherClient, err := NewClient(ctx, "bad-project-id")
355 if err != nil {
356 t.Fatalf("failed to create other client: %v", err)
357 }
358 otherClient.Location = "badloc"
359
360 for _, tc := range []struct {
361 description string
362 f func(*Client) (*Job, error)
363 wantErr bool
364 }{
365 {
366 description: "JobFromID",
367 f: func(c *Client) (*Job, error) { return c.JobFromID(ctx, want.jobID) },
368 wantErr: true,
369 },
370 {
371 description: "JobFromIDLocation",
372 f: func(c *Client) (*Job, error) { return c.JobFromIDLocation(ctx, want.jobID, want.location) },
373 wantErr: true,
374 },
375 {
376 description: "JobFromProject",
377 f: func(c *Client) (*Job, error) { return c.JobFromProject(ctx, want.projectID, want.jobID, want.location) },
378 },
379 } {
380 got, err := tc.f(otherClient)
381 if err != nil {
382 if !tc.wantErr {
383 t.Errorf("case %q errored: %v", tc.description, err)
384 }
385 continue
386 }
387 if tc.wantErr {
388 t.Errorf("case %q got success, expected error", tc.description)
389 }
390 if got.projectID != want.projectID {
391 t.Errorf("case %q projectID mismatch, got %s want %s", tc.description, got.projectID, want.projectID)
392 }
393 if got.location != want.location {
394 t.Errorf("case %q location mismatch, got %s want %s", tc.description, got.location, want.location)
395 }
396 if got.jobID != want.jobID {
397 t.Errorf("case %q jobID mismatch, got %s want %s", tc.description, got.jobID, want.jobID)
398 }
399 if got.Email() == "" {
400 t.Errorf("case %q expected email to be populated, was empty", tc.description)
401 }
402 }
403
404 }
405
406 func TestIntegration_QueryContextTimeout(t *testing.T) {
407 if client == nil {
408 t.Skip("Integration tests skipped")
409 }
410 ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
411 defer cancel()
412
413 q := client.Query("select count(*) from unnest(generate_array(1,1000000)), unnest(generate_array(1, 1000)) as foo")
414 q.DisableQueryCache = true
415 before := time.Now()
416 _, err := q.Read(ctx)
417 if err != context.DeadlineExceeded {
418 t.Errorf("Read() error, wanted %v, got %v", context.DeadlineExceeded, err)
419 }
420 wantMaxDur := 500 * time.Millisecond
421 if d := time.Since(before); d > wantMaxDur {
422 t.Errorf("return duration too long, wanted max %v got %v", wantMaxDur, d)
423 }
424 }
425
426 func TestIntegration_SnapshotRestoreClone(t *testing.T) {
427
428 if client == nil {
429 t.Skip("Integration tests skipped")
430 }
431 ctx := context.Background()
432
433
434 baseTableID := tableIDs.New()
435 qualified := fmt.Sprintf("`%s`.%s.%s", testutil.ProjID(), dataset.DatasetID, baseTableID)
436 sql := fmt.Sprintf(`
437 CREATE TABLE %s
438 (
439 sample_value INT64,
440 groupid STRING,
441 )
442 AS
443 SELECT
444 CAST(RAND() * 100 AS INT64),
445 CONCAT("group", CAST(CAST(RAND()*10 AS INT64) AS STRING))
446 FROM
447 UNNEST(GENERATE_ARRAY(0,999))
448 `, qualified)
449 if _, _, err := runQuerySQL(ctx, sql); err != nil {
450 t.Fatalf("couldn't instantiate base table: %v", err)
451 }
452
453
454 targetTime := time.Now()
455 snapshotID := tableIDs.New()
456 copier := dataset.Table(snapshotID).CopierFrom(dataset.Table(fmt.Sprintf("%s@%d", baseTableID, targetTime.UnixNano()/1e6)))
457 copier.OperationType = SnapshotOperation
458 job, err := copier.Run(ctx)
459 if err != nil {
460 t.Fatalf("couldn't run snapshot: %v", err)
461 }
462 err = wait(ctx, job)
463 if err != nil {
464 t.Fatalf("snapshot failed: %v", err)
465 }
466
467
468 meta, err := dataset.Table(snapshotID).Metadata(ctx)
469 if err != nil {
470 t.Fatalf("couldn't get metadata from snapshot: %v", err)
471 }
472 if meta.Type != Snapshot {
473 t.Errorf("expected snapshot table type, got %s", meta.Type)
474 }
475 want := &SnapshotDefinition{
476 BaseTableReference: dataset.Table(baseTableID),
477 SnapshotTime: targetTime,
478 }
479 if diff := testutil.Diff(meta.SnapshotDefinition, want, cmp.AllowUnexported(Table{}), cmpopts.IgnoreUnexported(Client{}), cmpopts.EquateApproxTime(time.Millisecond)); diff != "" {
480 t.Fatalf("SnapshotDefinition differs. got=-, want=+:\n%s", diff)
481 }
482
483
484 restoreID := tableIDs.New()
485 restorer := dataset.Table(restoreID).CopierFrom(dataset.Table(snapshotID))
486 restorer.OperationType = RestoreOperation
487 job, err = restorer.Run(ctx)
488 if err != nil {
489 t.Fatalf("couldn't run restore: %v", err)
490 }
491 err = wait(ctx, job)
492 if err != nil {
493 t.Fatalf("restore failed: %v", err)
494 }
495
496 restoreMeta, err := dataset.Table(restoreID).Metadata(ctx)
497 if err != nil {
498 t.Fatalf("couldn't get restored table metadata: %v", err)
499 }
500
501 if meta.NumBytes != restoreMeta.NumBytes {
502 t.Errorf("bytes mismatch. snap had %d bytes, restore had %d bytes", meta.NumBytes, restoreMeta.NumBytes)
503 }
504 if meta.NumRows != restoreMeta.NumRows {
505 t.Errorf("row counts mismatch. snap had %d rows, restore had %d rows", meta.NumRows, restoreMeta.NumRows)
506 }
507 if restoreMeta.Type != RegularTable {
508 t.Errorf("table type mismatch, got %s want %s", restoreMeta.Type, RegularTable)
509 }
510
511
512 cloneID := tableIDs.New()
513 cloner := dataset.Table(cloneID).CopierFrom(dataset.Table(snapshotID))
514 cloner.OperationType = CloneOperation
515
516 job, err = cloner.Run(ctx)
517 if err != nil {
518 t.Fatalf("couldn't run clone: %v", err)
519 }
520 err = wait(ctx, job)
521 if err != nil {
522 t.Fatalf("clone failed: %v", err)
523 }
524
525 cloneMeta, err := dataset.Table(cloneID).Metadata(ctx)
526 if err != nil {
527 t.Fatalf("couldn't get restored table metadata: %v", err)
528 }
529 if meta.NumBytes != cloneMeta.NumBytes {
530 t.Errorf("bytes mismatch. snap had %d bytes, clone had %d bytes", meta.NumBytes, cloneMeta.NumBytes)
531 }
532 if meta.NumRows != cloneMeta.NumRows {
533 t.Errorf("row counts mismatch. snap had %d rows, clone had %d rows", meta.NumRows, cloneMeta.NumRows)
534 }
535 if cloneMeta.Type != RegularTable {
536 t.Errorf("table type mismatch, got %s want %s", cloneMeta.Type, RegularTable)
537 }
538 if cloneMeta.CloneDefinition == nil {
539 t.Errorf("expected CloneDefinition in (%q), was nil", cloneMeta.FullID)
540 }
541 if cloneMeta.CloneDefinition.BaseTableReference == nil {
542 t.Errorf("expected CloneDefinition.BaseTableReference, was nil")
543 }
544 wantBase := dataset.Table(snapshotID)
545 if !testutil.Equal(cloneMeta.CloneDefinition.BaseTableReference, wantBase, cmpopts.IgnoreUnexported(Table{})) {
546 t.Errorf("mismatch in CloneDefinition.BaseTableReference. Got %s, want %s", cloneMeta.CloneDefinition.BaseTableReference.FullyQualifiedName(), wantBase.FullyQualifiedName())
547 }
548 }
549
550 func TestIntegration_HourTimePartitioning(t *testing.T) {
551 if client == nil {
552 t.Skip("Integration tests skipped")
553 }
554 ctx := context.Background()
555 table := dataset.Table(tableIDs.New())
556
557 schema := Schema{
558 {Name: "name", Type: StringFieldType},
559 {Name: "somevalue", Type: IntegerFieldType},
560 }
561
562
563 wantedTimePartitioning := &TimePartitioning{
564 Type: HourPartitioningType,
565 }
566
567 err := table.Create(context.Background(), &TableMetadata{
568 Schema: schema,
569 TimePartitioning: wantedTimePartitioning,
570 })
571 if err != nil {
572 t.Fatal(err)
573 }
574 defer table.Delete(ctx)
575 md, err := table.Metadata(ctx)
576 if err != nil {
577 t.Fatal(err)
578 }
579
580 if md.TimePartitioning == nil {
581 t.Fatal("expected time partitioning, got nil")
582 }
583 if diff := testutil.Diff(md.TimePartitioning, wantedTimePartitioning); diff != "" {
584 t.Fatalf("got=-, want=+:\n%s", diff)
585 }
586 if md.TimePartitioning.Type != wantedTimePartitioning.Type {
587 t.Errorf("TimePartitioning interval mismatch: got %v, wanted %v", md.TimePartitioning.Type, wantedTimePartitioning.Type)
588 }
589 }
590
591 func TestIntegration_RangePartitioning(t *testing.T) {
592 if client == nil {
593 t.Skip("Integration tests skipped")
594 }
595 ctx := context.Background()
596 table := dataset.Table(tableIDs.New())
597
598 schema := Schema{
599 {Name: "name", Type: StringFieldType},
600 {Name: "somevalue", Type: IntegerFieldType},
601 }
602
603 wantedRange := &RangePartitioningRange{
604 Start: 0,
605 End: 135,
606 Interval: 25,
607 }
608
609 wantedPartitioning := &RangePartitioning{
610 Field: "somevalue",
611 Range: wantedRange,
612 }
613
614 err := table.Create(context.Background(), &TableMetadata{
615 Schema: schema,
616 RangePartitioning: wantedPartitioning,
617 })
618 if err != nil {
619 t.Fatal(err)
620 }
621 defer table.Delete(ctx)
622 md, err := table.Metadata(ctx)
623 if err != nil {
624 t.Fatal(err)
625 }
626
627 if md.RangePartitioning == nil {
628 t.Fatal("expected range partitioning, got nil")
629 }
630 got := md.RangePartitioning.Field
631 if wantedPartitioning.Field != got {
632 t.Errorf("RangePartitioning Field: got %v, want %v", got, wantedPartitioning.Field)
633 }
634 if md.RangePartitioning.Range == nil {
635 t.Fatal("expected a range definition, got nil")
636 }
637 gotInt64 := md.RangePartitioning.Range.Start
638 if gotInt64 != wantedRange.Start {
639 t.Errorf("Range.Start: got %v, wanted %v", gotInt64, wantedRange.Start)
640 }
641 gotInt64 = md.RangePartitioning.Range.End
642 if gotInt64 != wantedRange.End {
643 t.Errorf("Range.End: got %v, wanted %v", gotInt64, wantedRange.End)
644 }
645 gotInt64 = md.RangePartitioning.Range.Interval
646 if gotInt64 != wantedRange.Interval {
647 t.Errorf("Range.Interval: got %v, wanted %v", gotInt64, wantedRange.Interval)
648 }
649 }
650
651 func TestIntegration_RemoveTimePartitioning(t *testing.T) {
652 if client == nil {
653 t.Skip("Integration tests skipped")
654 }
655 ctx := context.Background()
656 table := dataset.Table(tableIDs.New())
657 want := 24 * time.Hour
658 err := table.Create(ctx, &TableMetadata{
659 ExpirationTime: testTableExpiration,
660 TimePartitioning: &TimePartitioning{
661 Expiration: want,
662 },
663 })
664 if err != nil {
665 t.Fatal(err)
666 }
667 defer table.Delete(ctx)
668
669 md, err := table.Metadata(ctx)
670 if err != nil {
671 t.Fatal(err)
672 }
673 if got := md.TimePartitioning.Expiration; got != want {
674 t.Fatalf("TimePartitioning expiration want = %v, got = %v", want, got)
675 }
676
677
678 md, err = table.Update(context.Background(), TableMetadataToUpdate{
679 TimePartitioning: &TimePartitioning{Expiration: 0},
680 }, md.ETag)
681 if err != nil {
682 t.Fatal(err)
683 }
684
685 want = time.Duration(0)
686 if got := md.TimePartitioning.Expiration; got != want {
687 t.Fatalf("TimeParitioning expiration want = %v, got = %v", want, got)
688 }
689 }
690
691
692
693
694 func setupPolicyTag(ctx context.Context) (string, func(), error) {
695 location := "us"
696 req := &datacatalogpb.CreateTaxonomyRequest{
697 Parent: fmt.Sprintf("projects/%s/locations/%s", testutil.ProjID(), location),
698 Taxonomy: &datacatalogpb.Taxonomy{
699
700 DisplayName: fmt.Sprintf("google-cloud-go bigquery testing taxonomy %d", time.Now().UnixNano()),
701 Description: "Taxonomy created for google-cloud-go integration tests",
702 ActivatedPolicyTypes: []datacatalogpb.Taxonomy_PolicyType{
703 datacatalogpb.Taxonomy_FINE_GRAINED_ACCESS_CONTROL,
704 },
705 },
706 }
707 resp, err := policyTagManagerClient.CreateTaxonomy(ctx, req)
708 if err != nil {
709 return "", nil, fmt.Errorf("datacatalog.CreateTaxonomy: %v", err)
710 }
711 taxonomyID := resp.GetName()
712 cleanupFunc := func() {
713 policyTagManagerClient.DeleteTaxonomy(ctx, &datacatalogpb.DeleteTaxonomyRequest{
714 Name: taxonomyID,
715 })
716 }
717
718 tagReq := &datacatalogpb.CreatePolicyTagRequest{
719 Parent: resp.GetName(),
720 PolicyTag: &datacatalogpb.PolicyTag{
721 DisplayName: "ExamplePolicyTag",
722 },
723 }
724 tagResp, err := policyTagManagerClient.CreatePolicyTag(ctx, tagReq)
725 if err != nil {
726
727 cleanupFunc()
728 return "", nil, fmt.Errorf("datacatalog.CreatePolicyTag: %v", err)
729 }
730 return tagResp.GetName(), cleanupFunc, nil
731 }
732
733 func TestIntegration_ColumnACLs(t *testing.T) {
734 if client == nil {
735 t.Skip("Integration tests skipped")
736 }
737 ctx := context.Background()
738 testSchema := Schema{
739 {Name: "name", Type: StringFieldType},
740 {Name: "ssn", Type: StringFieldType},
741 {Name: "acct_balance", Type: NumericFieldType},
742 }
743 table := newTable(t, testSchema)
744 defer table.Delete(ctx)
745
746 tagID, cleanupFunc, err := setupPolicyTag(ctx)
747 if err != nil {
748 t.Fatalf("failed to setup policy tag resources: %v", err)
749 }
750 defer cleanupFunc()
751
752 testSchema[1].PolicyTags = &PolicyTagList{
753 Names: []string{tagID},
754 }
755
756
757 _, err = table.Update(ctx, TableMetadataToUpdate{
758 Schema: testSchema,
759 }, "")
760 if err != nil {
761 t.Errorf("update with policyTag failed: %v", err)
762 }
763
764
765 newTable := dataset.Table(tableIDs.New())
766 if err = newTable.Create(ctx, &TableMetadata{
767 Schema: schema,
768 Description: "foo",
769 }); err != nil {
770 t.Errorf("failed to create new table with policy tag: %v", err)
771 }
772 }
773
774 func TestIntegration_SimpleRowResults(t *testing.T) {
775 if client == nil {
776 t.Skip("Integration tests skipped")
777 }
778 beforePreview := client.enableQueryPreview
779
780 defer func() {
781 client.enableQueryPreview = beforePreview
782 }()
783 ctx := context.Background()
784
785 testCases := []struct {
786 description string
787 query string
788 want [][]Value
789 }{
790 {
791 description: "literals",
792 query: "select 17 as foo",
793 want: [][]Value{{int64(17)}},
794 },
795 {
796 description: "empty results",
797 query: "SELECT * FROM (select 17 as foo) where false",
798 want: [][]Value{},
799 },
800 {
801
802
803
804 description: "ctas ddl",
805 query: fmt.Sprintf("CREATE OR REPLACE TABLE %s.%s AS SELECT 17 as foo", dataset.DatasetID, tableIDs.New()),
806 want: nil,
807 },
808 {
809
810 description: "long running",
811 query: "select count(*) from unnest(generate_array(1,1000000)), unnest(generate_array(1, 1000)) as foo",
812 want: [][]Value{{int64(1000000000)}},
813 },
814 {
815
816 description: "DML",
817 query: fmt.Sprintf("CREATE OR REPLACE TABLE %s.%s (foo STRING, bar INT64)", dataset.DatasetID, tableIDs.New()),
818 want: [][]Value{},
819 },
820 }
821
822 t.Run("nopreview_group", func(t *testing.T) {
823 client.enableQueryPreview = false
824 for _, tc := range testCases {
825 curCase := tc
826 t.Run(curCase.description, func(t *testing.T) {
827 t.Parallel()
828 q := client.Query(curCase.query)
829 it, err := q.Read(ctx)
830 if err != nil {
831 t.Fatalf("%s read error: %v", curCase.description, err)
832 }
833 checkReadAndTotalRows(t, curCase.description, it, curCase.want)
834 })
835 }
836 })
837 t.Run("preview_group", func(t *testing.T) {
838 client.enableQueryPreview = true
839 for _, tc := range testCases {
840 curCase := tc
841 t.Run(curCase.description, func(t *testing.T) {
842 t.Parallel()
843 q := client.Query(curCase.query)
844 it, err := q.Read(ctx)
845 if err != nil {
846 t.Fatalf("%s read error: %v", curCase.description, err)
847 }
848 checkReadAndTotalRows(t, curCase.description, it, curCase.want)
849 })
850 }
851 })
852
853 }
854
855 func TestIntegration_QueryIterationPager(t *testing.T) {
856 if client == nil {
857 t.Skip("Integration tests skipped")
858 }
859 ctx := context.Background()
860
861 sql := `
862 SELECT
863 num,
864 num * 2 as double
865 FROM
866 UNNEST(GENERATE_ARRAY(1,5)) as num`
867 q := client.Query(sql)
868 it, err := q.Read(ctx)
869 if err != nil {
870 t.Fatalf("Read: %v", err)
871 }
872 pager := iterator.NewPager(it, 2, "")
873 rowsFetched := 0
874 for {
875 var rows [][]Value
876 nextPageToken, err := pager.NextPage(&rows)
877 if err != nil {
878 t.Fatalf("NextPage: %v", err)
879 }
880 rowsFetched = rowsFetched + len(rows)
881
882 if nextPageToken == "" {
883 break
884 }
885 }
886
887 wantRows := 5
888 if rowsFetched != wantRows {
889 t.Errorf("Expected %d rows, got %d", wantRows, rowsFetched)
890 }
891 }
892
893 func TestIntegration_RoutineStoredProcedure(t *testing.T) {
894
895
896
897
898 if client == nil {
899 t.Skip("Integration tests skipped")
900 }
901 ctx := context.Background()
902
903
904 routineID := routineIDs.New()
905 routine := dataset.Routine(routineID)
906 routineSQLID, _ := routine.Identifier(StandardSQLID)
907 sql := fmt.Sprintf(`
908 CREATE OR REPLACE PROCEDURE %s(val INT64)
909 BEGIN
910 SELECT CURRENT_TIMESTAMP() as ts;
911 SELECT val * 2 as f2;
912 END`,
913 routineSQLID)
914
915 if _, _, err := runQuerySQL(ctx, sql); err != nil {
916 t.Fatal(err)
917 }
918 defer routine.Delete(ctx)
919
920
921 sql = fmt.Sprintf(`
922 CALL %s(5)`,
923 routineSQLID)
924
925 q := client.Query(sql)
926 it, err := q.Read(ctx)
927 if err != nil {
928 t.Fatalf("query.Read: %v", err)
929 }
930
931 checkReadAndTotalRows(t,
932 "expect result set from procedure",
933 it, [][]Value{{int64(10)}})
934 }
935
936 func TestIntegration_RoutineUserTVF(t *testing.T) {
937 if client == nil {
938 t.Skip("Integration tests skipped")
939 }
940 ctx := context.Background()
941
942 routineID := routineIDs.New()
943 routine := dataset.Routine(routineID)
944 inMeta := &RoutineMetadata{
945 Type: "TABLE_VALUED_FUNCTION",
946 Language: "SQL",
947 Arguments: []*RoutineArgument{
948 {Name: "filter",
949 DataType: &StandardSQLDataType{TypeKind: "INT64"},
950 }},
951 ReturnTableType: &StandardSQLTableType{
952 Columns: []*StandardSQLField{
953 {Name: "x", Type: &StandardSQLDataType{TypeKind: "INT64"}},
954 },
955 },
956 Body: "SELECT x FROM UNNEST([1,2,3]) x WHERE x = filter",
957 }
958 if err := routine.Create(ctx, inMeta); err != nil {
959 t.Fatalf("routine create: %v", err)
960 }
961 defer routine.Delete(ctx)
962
963 meta, err := routine.Metadata(ctx)
964 if err != nil {
965 t.Fatal(err)
966 }
967
968
969 if diff := testutil.Diff(inMeta, meta, cmpopts.IgnoreFields(RoutineMetadata{}, "CreationTime", "LastModifiedTime", "ETag")); diff != "" {
970 t.Errorf("routine metadata differs, got=-, want=+\n%s", diff)
971 }
972 }
973
974 func TestIntegration_InsertErrors(t *testing.T) {
975
976
977
978
979 if client == nil {
980 t.Skip("Integration tests skipped")
981 }
982 ctx := context.Background()
983 table := newTable(t, schema)
984 defer table.Delete(ctx)
985
986 ins := table.Inserter()
987 var saverRows []*ValuesSaver
988
989
990 badSaver := &ValuesSaver{
991 Schema: schema,
992 InsertID: NoDedupeID,
993 Row: []Value{strings.Repeat("X", 10485760), []Value{int64(1)}, []Value{true}},
994 }
995
996 saverRows = append(saverRows, badSaver)
997 err := ins.Put(ctx, saverRows)
998 if err == nil {
999 t.Errorf("Wanted row size error, got successful insert.")
1000 }
1001 var e1 *googleapi.Error
1002 ok := errors.As(err, &e1)
1003 if !ok {
1004 t.Errorf("Wanted googleapi.Error, got: %v", err)
1005 }
1006 if e1.Code != http.StatusRequestEntityTooLarge {
1007 want := "Request payload size exceeds the limit"
1008 if !strings.Contains(e1.Message, want) {
1009 t.Errorf("Error didn't contain expected message (%s): %#v", want, e1)
1010 }
1011 }
1012
1013
1014 saverRows = append(saverRows, badSaver)
1015 saverRows = append(saverRows, badSaver)
1016
1017 err = ins.Put(ctx, saverRows)
1018 if err == nil {
1019 t.Errorf("Wanted error, got successful insert.")
1020 }
1021 var e2 *googleapi.Error
1022 ok = errors.As(err, &e2)
1023 if !ok {
1024 t.Errorf("wanted googleapi.Error, got: %v", err)
1025 }
1026 if e2.Code != http.StatusBadRequest && e2.Code != http.StatusRequestEntityTooLarge {
1027 t.Errorf("Wanted HTTP 400 or 413, got %d", e2.Code)
1028 }
1029 }
1030
1031 func TestIntegration_InsertAndRead(t *testing.T) {
1032 if client == nil {
1033 t.Skip("Integration tests skipped")
1034 }
1035 ctx := context.Background()
1036 table := newTable(t, schema)
1037 defer table.Delete(ctx)
1038
1039
1040 ins := table.Inserter()
1041 var (
1042 wantRows [][]Value
1043 saverRows []*ValuesSaver
1044 )
1045 for i, name := range []string{"a", "b", "c"} {
1046 row := []Value{name, []Value{int64(i)}, []Value{true}}
1047 wantRows = append(wantRows, row)
1048 saverRows = append(saverRows, &ValuesSaver{
1049 Schema: schema,
1050 InsertID: name,
1051 Row: row,
1052 })
1053 }
1054 if err := ins.Put(ctx, saverRows); err != nil {
1055 t.Fatal(putError(err))
1056 }
1057
1058
1059
1060 if err := waitForRow(ctx, table); err != nil {
1061 t.Fatal(err)
1062 }
1063
1064 checkRead(t, "upload", table.Read(ctx), wantRows)
1065
1066
1067 q := client.Query(fmt.Sprintf("select name, nums, rec from %s", table.TableID))
1068 q.DefaultProjectID = dataset.ProjectID
1069 q.DefaultDatasetID = dataset.DatasetID
1070
1071 rit, err := q.Read(ctx)
1072 if err != nil {
1073 t.Fatal(err)
1074 }
1075 checkRead(t, "query", rit, wantRows)
1076
1077
1078 job1, err := q.Run(ctx)
1079 if err != nil {
1080 t.Fatal(err)
1081 }
1082 if job1.LastStatus() == nil {
1083 t.Error("no LastStatus")
1084 }
1085 job2, err := client.JobFromID(ctx, job1.ID())
1086 if err != nil {
1087 t.Fatal(err)
1088 }
1089 if job2.LastStatus() == nil {
1090 t.Error("no LastStatus")
1091 }
1092 rit, err = job2.Read(ctx)
1093 if err != nil {
1094 t.Fatal(err)
1095 }
1096 checkRead(t, "job.Read", rit, wantRows)
1097
1098
1099 jobStatus, err := job2.Status(ctx)
1100 if err != nil {
1101 t.Fatal(err)
1102 }
1103 if jobStatus.Statistics == nil {
1104 t.Fatal("jobStatus missing statistics")
1105 }
1106 if _, ok := jobStatus.Statistics.Details.(*QueryStatistics); !ok {
1107 t.Errorf("expected QueryStatistics, got %T", jobStatus.Statistics.Details)
1108 }
1109
1110
1111 valueLists, schema, _, err := readAll(table.Read(ctx))
1112 if err != nil {
1113 t.Fatal(err)
1114 }
1115 it := table.Read(ctx)
1116 for i, vl := range valueLists {
1117 var got []Value
1118 if err := it.Next(&got); err != nil {
1119 t.Fatal(err)
1120 }
1121 if !testutil.Equal(it.Schema, schema) {
1122 t.Fatalf("got schema %v, want %v", it.Schema, schema)
1123 }
1124 want := []Value(vl)
1125 if !testutil.Equal(got, want) {
1126 t.Errorf("%d: got %v, want %v", i, got, want)
1127 }
1128 }
1129
1130
1131 it = table.Read(ctx)
1132 for _, vl := range valueLists {
1133 var vm map[string]Value
1134 if err := it.Next(&vm); err != nil {
1135 t.Fatal(err)
1136 }
1137 if got, want := len(vm), len(vl); got != want {
1138 t.Fatalf("valueMap len: got %d, want %d", got, want)
1139 }
1140
1141 vl[2] = map[string]Value{"bool": vl[2].([]Value)[0]}
1142 for i, v := range vl {
1143 if got, want := vm[schema[i].Name], v; !testutil.Equal(got, want) {
1144 t.Errorf("%d, name=%s: got %#v, want %#v",
1145 i, schema[i].Name, got, want)
1146 }
1147 }
1148 }
1149
1150 }
1151
1152 type SubSubTestStruct struct {
1153 Integer int64
1154 }
1155
1156 type SubTestStruct struct {
1157 String string
1158 Record SubSubTestStruct
1159 RecordArray []SubSubTestStruct
1160 }
1161
1162 type TestStruct struct {
1163 Name string
1164 Bytes []byte
1165 Integer int64
1166 Float float64
1167 Boolean bool
1168 Timestamp time.Time
1169 Date civil.Date
1170 Time civil.Time
1171 DateTime civil.DateTime
1172 Numeric *big.Rat
1173 Geography string
1174 RangeDate *RangeValue `bigquery:"rangedate"`
1175 RangeDateTime *RangeValue `bigquery:"rangedatetime"`
1176 RangeTimestamp *RangeValue `bigquery:"rangetimestamp"`
1177 StringArray []string
1178 IntegerArray []int64
1179 FloatArray []float64
1180 BooleanArray []bool
1181 TimestampArray []time.Time
1182 DateArray []civil.Date
1183 TimeArray []civil.Time
1184 DateTimeArray []civil.DateTime
1185 NumericArray []*big.Rat
1186 GeographyArray []string
1187
1188 Record SubTestStruct
1189 RecordArray []SubTestStruct
1190 }
1191
1192
1193 var roundToMicros = cmp.Transformer("RoundToMicros",
1194 func(t time.Time) time.Time { return t.Round(time.Microsecond) })
1195
1196 func TestIntegration_InsertAndReadStructs(t *testing.T) {
1197 if client == nil {
1198 t.Skip("Integration tests skipped")
1199 }
1200 schema, err := InferSchema(TestStruct{})
1201 if err != nil {
1202 t.Fatal(err)
1203 }
1204
1205
1206 for idx, typ := range map[int]FieldType{
1207 11: DateFieldType,
1208 12: DateTimeFieldType,
1209 13: TimestampFieldType,
1210 } {
1211 if schema[idx].Type != RangeFieldType {
1212 t.Fatalf("mismatch in expected RANGE element in schema field %d", idx)
1213 } else {
1214 schema[idx].RangeElementType = &RangeElementType{Type: typ}
1215 }
1216 }
1217
1218 ctx := context.Background()
1219 table := newTable(t, schema)
1220 defer table.Delete(ctx)
1221
1222 d := civil.Date{Year: 2016, Month: 3, Day: 20}
1223 tm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000}
1224 ts := time.Date(2016, 3, 20, 15, 4, 5, 6000, time.UTC)
1225 dtm := civil.DateTime{Date: d, Time: tm}
1226 d2 := civil.Date{Year: 1994, Month: 5, Day: 15}
1227 tm2 := civil.Time{Hour: 1, Minute: 2, Second: 4, Nanosecond: 0}
1228 ts2 := time.Date(1994, 5, 15, 1, 2, 4, 0, time.UTC)
1229 dtm2 := civil.DateTime{Date: d2, Time: tm2}
1230 g := "POINT(-122.350220 47.649154)"
1231 g2 := "POINT(-122.0836791 37.421827)"
1232 rangedate := &RangeValue{Start: civil.Date{Year: 2024, Month: 04, Day: 11}}
1233 rangedatetime := &RangeValue{
1234 End: civil.DateTime{
1235 Date: civil.Date{Year: 2024, Month: 04, Day: 11},
1236 Time: civil.Time{Hour: 2, Minute: 4, Second: 6, Nanosecond: 0}},
1237 }
1238 rangetimestamp := &RangeValue{
1239 Start: time.Date(2016, 3, 20, 15, 4, 5, 6000, time.UTC),
1240 }
1241
1242
1243 ins := table.Inserter()
1244 want := []*TestStruct{
1245 {
1246 "a",
1247 []byte("byte"),
1248 42,
1249 3.14,
1250 true,
1251 ts,
1252 d,
1253 tm,
1254 dtm,
1255 big.NewRat(57, 100),
1256 g,
1257 rangedate,
1258 rangedatetime,
1259 rangetimestamp,
1260 []string{"a", "b"},
1261 []int64{1, 2},
1262 []float64{1, 1.41},
1263 []bool{true, false},
1264 []time.Time{ts, ts2},
1265 []civil.Date{d, d2},
1266 []civil.Time{tm, tm2},
1267 []civil.DateTime{dtm, dtm2},
1268 []*big.Rat{big.NewRat(1, 2), big.NewRat(3, 5)},
1269 []string{g, g2},
1270 SubTestStruct{
1271 "string",
1272 SubSubTestStruct{24},
1273 []SubSubTestStruct{{1}, {2}},
1274 },
1275 []SubTestStruct{
1276 {String: "empty"},
1277 {
1278 "full",
1279 SubSubTestStruct{1},
1280 []SubSubTestStruct{{1}, {2}},
1281 },
1282 },
1283 },
1284 {
1285 Name: "b",
1286 Bytes: []byte("byte2"),
1287 Integer: 24,
1288 Float: 4.13,
1289 Boolean: false,
1290 Timestamp: ts,
1291 Date: d,
1292 Time: tm,
1293 DateTime: dtm,
1294 Numeric: big.NewRat(4499, 10000),
1295 RangeDate: rangedate,
1296 RangeDateTime: rangedatetime,
1297 RangeTimestamp: rangetimestamp,
1298 },
1299 }
1300 var savers []*StructSaver
1301 for _, s := range want {
1302 savers = append(savers, &StructSaver{Schema: schema, Struct: s})
1303 }
1304 if err := ins.Put(ctx, savers); err != nil {
1305 t.Fatal(putError(err))
1306 }
1307
1308
1309
1310 if err := waitForRow(ctx, table); err != nil {
1311 t.Fatal(err)
1312 }
1313
1314
1315 it := table.Read(ctx)
1316 var got []*TestStruct
1317 for {
1318 var g TestStruct
1319 err := it.Next(&g)
1320 if err == iterator.Done {
1321 break
1322 }
1323 if err != nil {
1324 t.Fatal(err)
1325 }
1326 got = append(got, &g)
1327 }
1328 sort.Sort(byName(got))
1329
1330
1331 for i, g := range got {
1332 if i >= len(want) {
1333 t.Errorf("%d: got %v, past end of want", i, pretty.Value(g))
1334 } else if diff := testutil.Diff(g, want[i], roundToMicros); diff != "" {
1335 t.Errorf("%d: got=-, want=+:\n%s", i, diff)
1336 }
1337 }
1338 }
1339
1340 type byName []*TestStruct
1341
1342 func (b byName) Len() int { return len(b) }
1343 func (b byName) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
1344 func (b byName) Less(i, j int) bool { return b[i].Name < b[j].Name }
1345
1346 func TestIntegration_InsertAndReadNullable(t *testing.T) {
1347 if client == nil {
1348 t.Skip("Integration tests skipped")
1349 }
1350 ctm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000}
1351 cdt := civil.DateTime{Date: testDate, Time: ctm}
1352 rat := big.NewRat(33, 100)
1353 rat2 := big.NewRat(66, 10e10)
1354 geo := "POINT(-122.198939 47.669865)"
1355
1356
1357 testInsertAndReadNullable(t, testStructNullable{}, make([]Value, len(testStructNullableSchema)))
1358
1359
1360 testInsertAndReadNullable(t, testStructNullable{
1361 String: NullString{Valid: false},
1362 Integer: NullInt64{Valid: false},
1363 Float: NullFloat64{Valid: false},
1364 Boolean: NullBool{Valid: false},
1365 Timestamp: NullTimestamp{Valid: false},
1366 Date: NullDate{Valid: false},
1367 Time: NullTime{Valid: false},
1368 DateTime: NullDateTime{Valid: false},
1369 Geography: NullGeography{Valid: false},
1370 },
1371 make([]Value, len(testStructNullableSchema)))
1372
1373
1374 testInsertAndReadNullable(t, testStructNullable{
1375 String: NullString{"x", true},
1376 Bytes: []byte{1, 2, 3},
1377 Integer: NullInt64{1, true},
1378 Float: NullFloat64{2.3, true},
1379 Boolean: NullBool{true, true},
1380 Timestamp: NullTimestamp{testTimestamp, true},
1381 Date: NullDate{testDate, true},
1382 Time: NullTime{ctm, true},
1383 DateTime: NullDateTime{cdt, true},
1384 Numeric: rat,
1385 BigNumeric: rat2,
1386 Geography: NullGeography{geo, true},
1387 Record: &subNullable{X: NullInt64{4, true}},
1388 },
1389 []Value{"x", []byte{1, 2, 3}, int64(1), 2.3, true, testTimestamp, testDate, ctm, cdt, rat, rat2, geo, []Value{int64(4)}})
1390 }
1391
1392 func testInsertAndReadNullable(t *testing.T, ts testStructNullable, wantRow []Value) {
1393 ctx := context.Background()
1394 table := newTable(t, testStructNullableSchema)
1395 defer table.Delete(ctx)
1396
1397
1398 ins := table.Inserter()
1399 if err := ins.Put(ctx, []*StructSaver{{Schema: testStructNullableSchema, Struct: ts}}); err != nil {
1400 t.Fatal(putError(err))
1401 }
1402
1403
1404 if err := waitForRow(ctx, table); err != nil {
1405 t.Fatal(err)
1406 }
1407
1408
1409 iter := table.Read(ctx)
1410 gotRows, _, _, err := readAll(iter)
1411 if err != nil {
1412 t.Fatal(err)
1413 }
1414 if len(gotRows) != 1 {
1415 t.Fatalf("got %d rows, want 1", len(gotRows))
1416 }
1417 if diff := testutil.Diff(gotRows[0], wantRow, roundToMicros); diff != "" {
1418 t.Error(diff)
1419 }
1420
1421
1422 want := ts
1423 var sn testStructNullable
1424 it := table.Read(ctx)
1425 if err := it.Next(&sn); err != nil {
1426 t.Fatal(err)
1427 }
1428 if diff := testutil.Diff(sn, want, roundToMicros); diff != "" {
1429 t.Error(diff)
1430 }
1431 }
1432
1433 func TestIntegration_QueryStatistics(t *testing.T) {
1434
1435 if client == nil {
1436 t.Skip("Integration tests skipped")
1437 }
1438 ctx := context.Background()
1439
1440 q := client.Query("SELECT 17 as foo, 3.14 as bar")
1441
1442 q.DisableQueryCache = true
1443
1444 job, err := q.Run(ctx)
1445 if err != nil {
1446 t.Fatalf("job Run failure: %v", err)
1447 }
1448 status, err := job.Wait(ctx)
1449 if err != nil {
1450 t.Fatalf("job %q: Wait failure: %v", job.ID(), err)
1451 }
1452 if status.Statistics == nil {
1453 t.Fatal("expected job statistics, none found")
1454 }
1455
1456 if status.Statistics.NumChildJobs != 0 {
1457 t.Errorf("expected no children, %d reported", status.Statistics.NumChildJobs)
1458 }
1459
1460 if status.Statistics.ParentJobID != "" {
1461 t.Errorf("expected no parent, but parent present: %s", status.Statistics.ParentJobID)
1462 }
1463
1464 if status.Statistics.Details == nil {
1465 t.Fatal("expected job details, none present")
1466 }
1467
1468 qStats, ok := status.Statistics.Details.(*QueryStatistics)
1469 if !ok {
1470 t.Fatalf("expected query statistics not present")
1471 }
1472
1473 if qStats.CacheHit {
1474 t.Error("unexpected cache hit")
1475 }
1476
1477 if qStats.StatementType != "SELECT" {
1478 t.Errorf("expected SELECT statement type, got: %s", qStats.StatementType)
1479 }
1480
1481 if len(qStats.QueryPlan) == 0 {
1482 t.Error("expected query plan, none present")
1483 }
1484
1485 if len(qStats.Timeline) == 0 {
1486 t.Error("expected query timeline, none present")
1487 }
1488
1489 if qStats.BIEngineStatistics != nil {
1490 expectedMode := false
1491 for _, m := range []string{"FULL", "PARTIAL", "DISABLED"} {
1492 if qStats.BIEngineStatistics.BIEngineMode == m {
1493 expectedMode = true
1494 }
1495 }
1496 if !expectedMode {
1497 t.Errorf("unexpected BIEngineMode for BI Engine statistics, got %s", qStats.BIEngineStatistics.BIEngineMode)
1498 }
1499 }
1500 }
1501
1502 func TestIntegration_Load(t *testing.T) {
1503 if client == nil {
1504 t.Skip("Integration tests skipped")
1505 }
1506 ctx := context.Background()
1507
1508 table := newTable(t, Schema{
1509 {Name: "name", Type: StringFieldType},
1510 {Name: "nums", Type: IntegerFieldType},
1511 })
1512 defer table.Delete(ctx)
1513
1514
1515 r := strings.NewReader("a,0\nb,1\nc,2\n")
1516 wantRows := [][]Value{
1517 {"a", int64(0)},
1518 {"b", int64(1)},
1519 {"c", int64(2)},
1520 }
1521 rs := NewReaderSource(r)
1522 loader := table.LoaderFrom(rs)
1523 loader.WriteDisposition = WriteTruncate
1524 loader.Labels = map[string]string{"test": "go"}
1525 loader.MediaOptions = []googleapi.MediaOption{
1526 googleapi.ContentType("text/csv"),
1527 googleapi.ChunkSize(googleapi.MinUploadChunkSize),
1528 }
1529 job, err := loader.Run(ctx)
1530 if err != nil {
1531 t.Fatal(err)
1532 }
1533 if job.LastStatus() == nil {
1534 t.Error("no LastStatus")
1535 }
1536 conf, err := job.Config()
1537 if err != nil {
1538 t.Fatal(err)
1539 }
1540 config, ok := conf.(*LoadConfig)
1541 if !ok {
1542 t.Fatalf("got %T, want LoadConfig", conf)
1543 }
1544 diff := testutil.Diff(config, &loader.LoadConfig,
1545 cmp.AllowUnexported(Table{}),
1546 cmpopts.IgnoreUnexported(Client{}, ReaderSource{}),
1547
1548 cmpopts.IgnoreFields(FileConfig{}, "Schema"),
1549 cmpopts.IgnoreFields(LoadConfig{}, "MediaOptions"))
1550 if diff != "" {
1551 t.Errorf("got=-, want=+:\n%s", diff)
1552 }
1553 if err := wait(ctx, job); err != nil {
1554 t.Fatal(err)
1555 }
1556 checkReadAndTotalRows(t, "reader load", table.Read(ctx), wantRows)
1557 }
1558
1559 func TestIntegration_LoadWithSessionSupport(t *testing.T) {
1560 if client == nil {
1561 t.Skip("Integration tests skipped")
1562 }
1563
1564 ctx := context.Background()
1565 sessionDataset := client.Dataset("_SESSION")
1566 sessionTable := sessionDataset.Table("test_temp_destination_table")
1567
1568 schema := Schema{
1569 {Name: "username", Type: StringFieldType, Required: false},
1570 {Name: "tweet", Type: StringFieldType, Required: false},
1571 {Name: "timestamp", Type: StringFieldType, Required: false},
1572 {Name: "likes", Type: IntegerFieldType, Required: false},
1573 }
1574 sourceURIs := []string{
1575 "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.parquet",
1576 }
1577
1578 source := NewGCSReference(sourceURIs...)
1579 source.SourceFormat = Parquet
1580 source.Schema = schema
1581 loader := sessionTable.LoaderFrom(source)
1582 loader.CreateSession = true
1583 loader.CreateDisposition = CreateIfNeeded
1584
1585 job, err := loader.Run(ctx)
1586 if err != nil {
1587 t.Fatalf("loader.Run: %v", err)
1588 }
1589 err = wait(ctx, job)
1590 if err != nil {
1591 t.Fatalf("wait: %v", err)
1592 }
1593
1594 sessionInfo := job.lastStatus.Statistics.SessionInfo
1595 if sessionInfo == nil {
1596 t.Fatalf("empty job.lastStatus.Statistics.SessionInfo: %v", sessionInfo)
1597 }
1598
1599 sessionID := sessionInfo.SessionID
1600 loaderWithSession := sessionTable.LoaderFrom(source)
1601 loaderWithSession.CreateDisposition = CreateIfNeeded
1602 loaderWithSession.ConnectionProperties = []*ConnectionProperty{
1603 {
1604 Key: "session_id",
1605 Value: sessionID,
1606 },
1607 }
1608 jobWithSession, err := loaderWithSession.Run(ctx)
1609 if err != nil {
1610 t.Fatalf("loaderWithSession.Run: %v", err)
1611 }
1612 err = wait(ctx, jobWithSession)
1613 if err != nil {
1614 t.Fatalf("wait: %v", err)
1615 }
1616
1617 sessionJobInfo := jobWithSession.lastStatus.Statistics.SessionInfo
1618 if sessionJobInfo == nil {
1619 t.Fatalf("empty jobWithSession.lastStatus.Statistics.SessionInfo: %v", sessionJobInfo)
1620 }
1621
1622 if sessionID != sessionJobInfo.SessionID {
1623 t.Fatalf("expected session ID %q, but found %q", sessionID, sessionJobInfo.SessionID)
1624 }
1625
1626 sql := "SELECT * FROM _SESSION.test_temp_destination_table;"
1627 q := client.Query(sql)
1628 q.ConnectionProperties = []*ConnectionProperty{
1629 {
1630 Key: "session_id",
1631 Value: sessionID,
1632 },
1633 }
1634 sessionQueryJob, err := q.Run(ctx)
1635 err = wait(ctx, sessionQueryJob)
1636 if err != nil {
1637 t.Fatalf("wait: %v", err)
1638 }
1639 }
1640
1641 func TestIntegration_LoadWithReferenceSchemaFile(t *testing.T) {
1642 if client == nil {
1643 t.Skip("Integration tests skipped")
1644 }
1645
1646 formats := []DataFormat{Avro, Parquet}
1647 for _, format := range formats {
1648 ctx := context.Background()
1649 table := dataset.Table(tableIDs.New())
1650 defer table.Delete(ctx)
1651
1652 expectedSchema := Schema{
1653 {Name: "username", Type: StringFieldType, Required: false},
1654 {Name: "tweet", Type: StringFieldType, Required: false},
1655 {Name: "timestamp", Type: StringFieldType, Required: false},
1656 {Name: "likes", Type: IntegerFieldType, Required: false},
1657 }
1658 ext := strings.ToLower(string(format))
1659 sourceURIs := []string{
1660 "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter." + ext,
1661 "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter." + ext,
1662 "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/c-twitter." + ext,
1663 }
1664 referenceURI := "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter." + ext
1665 source := NewGCSReference(sourceURIs...)
1666 source.SourceFormat = format
1667 loader := table.LoaderFrom(source)
1668 loader.ReferenceFileSchemaURI = referenceURI
1669 job, err := loader.Run(ctx)
1670 if err != nil {
1671 t.Fatalf("loader.Run: %v", err)
1672 }
1673 err = wait(ctx, job)
1674 if err != nil {
1675 t.Fatalf("wait: %v", err)
1676 }
1677 metadata, err := table.Metadata(ctx)
1678 if err != nil {
1679 t.Fatalf("table.Metadata: %v", err)
1680 }
1681 diff := testutil.Diff(expectedSchema, metadata.Schema)
1682 if diff != "" {
1683 t.Errorf("got=-, want=+:\n%s", diff)
1684 }
1685 }
1686 }
1687
1688 func TestIntegration_ExternalTableWithReferenceSchemaFile(t *testing.T) {
1689 if client == nil {
1690 t.Skip("Integration tests skipped")
1691 }
1692
1693 formats := []DataFormat{Avro, Parquet}
1694 for _, format := range formats {
1695 ctx := context.Background()
1696 externalTable := dataset.Table(tableIDs.New())
1697 defer externalTable.Delete(ctx)
1698
1699 expectedSchema := Schema{
1700 {Name: "username", Type: StringFieldType, Required: false},
1701 {Name: "tweet", Type: StringFieldType, Required: false},
1702 {Name: "timestamp", Type: StringFieldType, Required: false},
1703 {Name: "likes", Type: IntegerFieldType, Required: false},
1704 }
1705 ext := strings.ToLower(string(format))
1706 sourceURIs := []string{
1707 "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter." + ext,
1708 "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter." + ext,
1709 "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/c-twitter." + ext,
1710 }
1711 referenceURI := "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter." + ext
1712
1713 err := externalTable.Create(ctx, &TableMetadata{
1714 ExternalDataConfig: &ExternalDataConfig{
1715 SourceFormat: format,
1716 SourceURIs: sourceURIs,
1717 ReferenceFileSchemaURI: referenceURI,
1718 },
1719 })
1720 if err != nil {
1721 t.Fatalf("table.Create: %v", err)
1722 }
1723
1724 metadata, err := externalTable.Metadata(ctx)
1725 if err != nil {
1726 t.Fatalf("table.Metadata: %v", err)
1727 }
1728 diff := testutil.Diff(expectedSchema, metadata.Schema)
1729 if diff != "" {
1730 t.Errorf("got=-, want=+:\n%s", diff)
1731 }
1732 }
1733 }
1734
1735 func TestIntegration_DML(t *testing.T) {
1736 if client == nil {
1737 t.Skip("Integration tests skipped")
1738 }
1739 ctx := context.Background()
1740 table := newTable(t, schema)
1741 defer table.Delete(ctx)
1742
1743 sql := fmt.Sprintf(`INSERT %s.%s (name, nums, rec)
1744 VALUES ('a', [0], STRUCT<BOOL>(TRUE)),
1745 ('b', [1], STRUCT<BOOL>(FALSE)),
1746 ('c', [2], STRUCT<BOOL>(TRUE))`,
1747 table.DatasetID, table.TableID)
1748 _, stats, err := runQuerySQL(ctx, sql)
1749 if err != nil {
1750 t.Fatal(err)
1751 }
1752 wantRows := [][]Value{
1753 {"a", []Value{int64(0)}, []Value{true}},
1754 {"b", []Value{int64(1)}, []Value{false}},
1755 {"c", []Value{int64(2)}, []Value{true}},
1756 }
1757 checkRead(t, "DML", table.Read(ctx), wantRows)
1758 if stats == nil {
1759 t.Fatalf("no query stats")
1760 }
1761 if stats.DMLStats == nil {
1762 t.Fatalf("no dml stats")
1763 }
1764 wantRowCount := int64(len(wantRows))
1765 if stats.DMLStats.InsertedRowCount != wantRowCount {
1766 t.Fatalf("dml stats mismatch. got %d inserted rows, want %d", stats.DMLStats.InsertedRowCount, wantRowCount)
1767 }
1768 }
1769
1770
1771 func runQuerySQL(ctx context.Context, sql string) (*JobStatistics, *QueryStatistics, error) {
1772 return runQueryJob(ctx, client.Query(sql))
1773 }
1774
1775
1776 func runQueryJob(ctx context.Context, q *Query) (*JobStatistics, *QueryStatistics, error) {
1777 var jobStats *JobStatistics
1778 var queryStats *QueryStatistics
1779 var err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
1780 job, err := q.Run(ctx)
1781 if err != nil {
1782 var e *googleapi.Error
1783 if ok := errors.As(err, &e); ok && e.Code < 500 {
1784 return true, err
1785 }
1786 return false, err
1787 }
1788 _, err = job.Wait(ctx)
1789 if err != nil {
1790 var e *googleapi.Error
1791 if ok := errors.As(err, &e); ok && e.Code < 500 {
1792 return true, err
1793 }
1794 return false, fmt.Errorf("%q: %v", job.ID(), err)
1795 }
1796 status := job.LastStatus()
1797 if status.Err() != nil {
1798 return false, fmt.Errorf("job %q terminated in err: %v", job.ID(), status.Err())
1799 }
1800 if status.Statistics != nil {
1801 jobStats = status.Statistics
1802 if qStats, ok := status.Statistics.Details.(*QueryStatistics); ok {
1803 queryStats = qStats
1804 }
1805 }
1806 return true, nil
1807 })
1808 return jobStats, queryStats, err
1809 }
1810
1811 func TestIntegration_TimeTypes(t *testing.T) {
1812 if client == nil {
1813 t.Skip("Integration tests skipped")
1814 }
1815 ctx := context.Background()
1816 dtSchema := Schema{
1817 {Name: "d", Type: DateFieldType},
1818 {Name: "t", Type: TimeFieldType},
1819 {Name: "dt", Type: DateTimeFieldType},
1820 {Name: "ts", Type: TimestampFieldType},
1821 }
1822 table := newTable(t, dtSchema)
1823 defer table.Delete(ctx)
1824
1825 d := civil.Date{Year: 2016, Month: 3, Day: 20}
1826 tm := civil.Time{Hour: 12, Minute: 30, Second: 0, Nanosecond: 6000}
1827 dtm := civil.DateTime{Date: d, Time: tm}
1828 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1829 wantRows := [][]Value{
1830 {d, tm, dtm, ts},
1831 }
1832 ins := table.Inserter()
1833 if err := ins.Put(ctx, []*ValuesSaver{
1834 {Schema: dtSchema, Row: wantRows[0]},
1835 }); err != nil {
1836 t.Fatal(putError(err))
1837 }
1838 if err := waitForRow(ctx, table); err != nil {
1839 t.Fatal(err)
1840 }
1841
1842
1843
1844 query := fmt.Sprintf("INSERT %s.%s (d, t, dt, ts) "+
1845 "VALUES ('%s', '%s', '%s', '%s')",
1846 table.DatasetID, table.TableID,
1847 d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05"))
1848 if _, _, err := runQuerySQL(ctx, query); err != nil {
1849 t.Fatal(err)
1850 }
1851 wantRows = append(wantRows, wantRows[0])
1852 checkRead(t, "TimeTypes", table.Read(ctx), wantRows)
1853 }
1854
1855 func TestIntegration_StandardQuery(t *testing.T) {
1856 if client == nil {
1857 t.Skip("Integration tests skipped")
1858 }
1859 ctx := context.Background()
1860
1861 d := civil.Date{Year: 2016, Month: 3, Day: 20}
1862 tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 0}
1863 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1864 dtm := ts.Format("2006-01-02 15:04:05")
1865
1866
1867 ints := func(args ...int) []Value {
1868 vals := make([]Value, len(args))
1869 for i, arg := range args {
1870 vals[i] = int64(arg)
1871 }
1872 return vals
1873 }
1874
1875 testCases := []struct {
1876 name string
1877 query string
1878 wantRow []Value
1879 }{
1880 {"Ints", "SELECT 1", ints(1)},
1881 {"Float", "SELECT 1.3", []Value{1.3}},
1882 {"NumericCast", "SELECT CAST(1.3 AS NUMERIC)", []Value{big.NewRat(13, 10)}},
1883 {"NumericLiteral", "SELECT NUMERIC '0.25'", []Value{big.NewRat(1, 4)}},
1884 {"Boolean", "SELECT TRUE", []Value{true}},
1885 {"String", "SELECT 'ABC'", []Value{"ABC"}},
1886 {"Bytes", "SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}},
1887 {"Timestamp", fmt.Sprintf("SELECT TIMESTAMP '%s'", dtm), []Value{ts}},
1888 {"TimestampArray", fmt.Sprintf("SELECT [TIMESTAMP '%s', TIMESTAMP '%s']", dtm, dtm), []Value{[]Value{ts, ts}}},
1889 {"AnonStruct", fmt.Sprintf("SELECT ('hello', TIMESTAMP '%s')", dtm), []Value{[]Value{"hello", ts}}},
1890 {"DatetimeCast", fmt.Sprintf("SELECT DATETIME(TIMESTAMP '%s')", dtm), []Value{civil.DateTime{Date: d, Time: tm}}},
1891 {"DateCast", fmt.Sprintf("SELECT DATE(TIMESTAMP '%s')", dtm), []Value{d}},
1892 {"TimeCast", fmt.Sprintf("SELECT TIME(TIMESTAMP '%s')", dtm), []Value{tm}},
1893 {"StructOfInts", "SELECT (1, 2)", []Value{ints(1, 2)}},
1894 {"IntArray", "SELECT [1, 2, 3]", []Value{ints(1, 2, 3)}},
1895 {"StructOfArrays", "SELECT ([1, 2], 3, [4, 5])", []Value{[]Value{ints(1, 2), int64(3), ints(4, 5)}}},
1896 {"ArrayOfStructs", "SELECT [(1, 2, 3), (4, 5, 6)]", []Value{[]Value{ints(1, 2, 3), ints(4, 5, 6)}}},
1897 {"ComplexNested", "SELECT [([1, 2, 3], 4), ([5, 6], 7)]", []Value{[]Value{[]Value{ints(1, 2, 3), int64(4)}, []Value{ints(5, 6), int64(7)}}}},
1898 {"SubSelectArray", "SELECT ARRAY(SELECT STRUCT([1, 2]))", []Value{[]Value{[]Value{ints(1, 2)}}}},
1899 {"RangeOofDateLiteral",
1900 "SELECT RANGE(DATE '2023-03-01', DATE '2024-04-16')",
1901 []Value{&RangeValue{Start: civil.Date{Year: 2023, Month: 03, Day: 01}, End: civil.Date{Year: 2024, Month: 04, Day: 16}}},
1902 },
1903 }
1904 for _, tc := range testCases {
1905 t.Run(tc.name, func(t *testing.T) {
1906 q := client.Query(tc.query)
1907 it, err := q.Read(ctx)
1908 if err != nil {
1909 t.Fatal(err)
1910 }
1911 checkRead(t, "StandardQuery", it, [][]Value{tc.wantRow})
1912 })
1913 }
1914 }
1915
1916 func TestIntegration_LegacyQuery(t *testing.T) {
1917 if client == nil {
1918 t.Skip("Integration tests skipped")
1919 }
1920 ctx := context.Background()
1921
1922 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
1923 dtm := ts.Format("2006-01-02 15:04:05")
1924
1925 testCases := []struct {
1926 name string
1927 query string
1928 wantRow []Value
1929 }{
1930 {"Int", "SELECT 1", []Value{int64(1)}},
1931 {"Float", "SELECT 1.3", []Value{1.3}},
1932 {"Boolean", "SELECT TRUE", []Value{true}},
1933 {"String", "SELECT 'ABC'", []Value{"ABC"}},
1934 {"Bytes", "SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}},
1935 {"Timestamp", fmt.Sprintf("SELECT TIMESTAMP('%s')", dtm), []Value{ts}},
1936 {"Date", fmt.Sprintf("SELECT DATE(TIMESTAMP('%s'))", dtm), []Value{"2016-03-20"}},
1937 {"Time", fmt.Sprintf("SELECT TIME(TIMESTAMP('%s'))", dtm), []Value{"15:04:05"}},
1938 }
1939 for _, tc := range testCases {
1940 q := client.Query(tc.query)
1941 q.UseLegacySQL = true
1942 it, err := q.Read(ctx)
1943 if err != nil {
1944 t.Fatal(err)
1945 }
1946 checkRead(t, "LegacyQuery", it, [][]Value{tc.wantRow})
1947 }
1948 }
1949
1950 func TestIntegration_IteratorSource(t *testing.T) {
1951 if client == nil {
1952 t.Skip("Integration tests skipped")
1953 }
1954 ctx := context.Background()
1955 q := client.Query("SELECT 17 as foo")
1956 it, err := q.Read(ctx)
1957 if err != nil {
1958 t.Errorf("Read: %v", err)
1959 }
1960 src := it.SourceJob()
1961 if src == nil {
1962 t.Errorf("wanted source job, got nil")
1963 }
1964 status, err := src.Status(ctx)
1965 if err != nil {
1966 t.Errorf("Status: %v", err)
1967 }
1968 if status == nil {
1969 t.Errorf("got nil status")
1970 }
1971 }
1972
1973 func TestIntegration_ExternalAutodetect(t *testing.T) {
1974 if client == nil {
1975 t.Skip("Integration tests skipped")
1976 }
1977 ctx := context.Background()
1978
1979 testTable := dataset.Table(tableIDs.New())
1980
1981 origExtCfg := &ExternalDataConfig{
1982 SourceFormat: Avro,
1983 SourceURIs: []string{"gs://cloud-samples-data/bigquery/autodetect-samples/original*.avro"},
1984 }
1985
1986 err := testTable.Create(ctx, &TableMetadata{
1987 ExternalDataConfig: origExtCfg,
1988 })
1989 if err != nil {
1990 t.Fatalf("Table.Create(%q): %v", testTable.FullyQualifiedName(), err)
1991 }
1992
1993 origMeta, err := testTable.Metadata(ctx)
1994 if err != nil {
1995 t.Fatalf("Table.Metadata(%q): %v", testTable.FullyQualifiedName(), err)
1996 }
1997
1998 wantSchema := Schema{
1999 {Name: "stringfield", Type: "STRING"},
2000 {Name: "int64field", Type: "INTEGER"},
2001 }
2002 if diff := testutil.Diff(origMeta.Schema, wantSchema); diff != "" {
2003 t.Fatalf("orig schema, got=-, want=+\n%s", diff)
2004 }
2005
2006
2007 newExtCfg := &ExternalDataConfig{
2008 SourceFormat: Avro,
2009 SourceURIs: []string{"gs://cloud-samples-data/bigquery/autodetect-samples/widened*.avro"},
2010 }
2011
2012 newMeta, err := testTable.Update(ctx, TableMetadataToUpdate{
2013 ExternalDataConfig: newExtCfg,
2014 }, origMeta.ETag)
2015 if err != nil {
2016 t.Fatalf("Table.Update(%q): %v", testTable.FullyQualifiedName(), err)
2017 }
2018 if diff := testutil.Diff(newMeta.Schema, wantSchema); diff != "" {
2019 t.Fatalf("new schema, got=-, want=+\n%s", diff)
2020 }
2021
2022
2023
2024 newMeta2, err := testTable.Update(ctx, TableMetadataToUpdate{}, newMeta.ETag, WithAutoDetectSchema(true))
2025 if err != nil {
2026 t.Fatalf("Table.Update(%q) with autodetect: %v", testTable.FullyQualifiedName(), err)
2027 }
2028
2029 wantSchema2 := Schema{
2030 {Name: "stringfield", Type: "STRING"},
2031 {Name: "int64field", Type: "INTEGER"},
2032 {Name: "otherfield", Type: "INTEGER"},
2033 }
2034 if diff := testutil.Diff(newMeta2.Schema, wantSchema2); diff != "" {
2035 t.Errorf("new schema after autodetect, got=-, want=+\n%s", diff)
2036 }
2037
2038 id, _ := testTable.Identifier(StandardSQLID)
2039 q := client.Query(fmt.Sprintf("SELECT * FROM %s", id))
2040 it, err := q.Read(ctx)
2041 if err != nil {
2042 t.Fatalf("query read: %v", err)
2043 }
2044 wantRows := [][]Value{
2045 {"bar", int64(32), int64(314)},
2046 }
2047 checkReadAndTotalRows(t, "row check", it, wantRows)
2048 }
2049
2050 func TestIntegration_QueryExternalHivePartitioning(t *testing.T) {
2051 if client == nil {
2052 t.Skip("Integration tests skipped")
2053 }
2054 ctx := context.Background()
2055
2056 autoTable := dataset.Table(tableIDs.New())
2057 customTable := dataset.Table(tableIDs.New())
2058
2059 err := autoTable.Create(ctx, &TableMetadata{
2060 ExternalDataConfig: &ExternalDataConfig{
2061 SourceFormat: Parquet,
2062 SourceURIs: []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/*"},
2063 AutoDetect: true,
2064 DecimalTargetTypes: []DecimalTargetType{StringTargetType},
2065 HivePartitioningOptions: &HivePartitioningOptions{
2066 Mode: AutoHivePartitioningMode,
2067 SourceURIPrefix: "gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/",
2068 RequirePartitionFilter: true,
2069 },
2070 },
2071 })
2072 if err != nil {
2073 t.Fatalf("table.Create(auto): %v", err)
2074 }
2075 defer autoTable.Delete(ctx)
2076
2077 err = customTable.Create(ctx, &TableMetadata{
2078 ExternalDataConfig: &ExternalDataConfig{
2079 SourceFormat: Parquet,
2080 SourceURIs: []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/*"},
2081 AutoDetect: true,
2082 DecimalTargetTypes: []DecimalTargetType{NumericTargetType, StringTargetType},
2083 HivePartitioningOptions: &HivePartitioningOptions{
2084 Mode: CustomHivePartitioningMode,
2085 SourceURIPrefix: "gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/{pkey:STRING}/",
2086 RequirePartitionFilter: true,
2087 },
2088 },
2089 })
2090 if err != nil {
2091 t.Fatalf("table.Create(custom): %v", err)
2092 }
2093 defer customTable.Delete(ctx)
2094
2095 customTableSQLID, _ := customTable.Identifier(StandardSQLID)
2096
2097
2098 sql := fmt.Sprintf("SELECT COUNT(*) as ct FROM %s WHERE pkey=\"foo\"", customTableSQLID)
2099 q := client.Query(sql)
2100 it, err := q.Read(ctx)
2101 if err != nil {
2102 t.Fatalf("Error querying: %v", err)
2103 }
2104 checkReadAndTotalRows(t, "HiveQuery", it, [][]Value{{int64(50)}})
2105 }
2106
2107 func TestIntegration_QuerySessionSupport(t *testing.T) {
2108 if client == nil {
2109 t.Skip("Integration tests skipped")
2110 }
2111 ctx := context.Background()
2112
2113 q := client.Query("CREATE TEMPORARY TABLE temptable AS SELECT 17 as foo")
2114 q.CreateSession = true
2115 jobStats, _, err := runQueryJob(ctx, q)
2116 if err != nil {
2117 t.Fatalf("error running CREATE TEMPORARY TABLE: %v", err)
2118 }
2119 if jobStats.SessionInfo == nil {
2120 t.Fatalf("expected session info, was nil")
2121 }
2122 sessionID := jobStats.SessionInfo.SessionID
2123 if len(sessionID) == 0 {
2124 t.Errorf("expected non-empty sessionID")
2125 }
2126
2127 q2 := client.Query("SELECT * FROM temptable")
2128 q2.ConnectionProperties = []*ConnectionProperty{
2129 {Key: "session_id", Value: sessionID},
2130 }
2131 jobStats, _, err = runQueryJob(ctx, q2)
2132 if err != nil {
2133 t.Errorf("error running SELECT: %v", err)
2134 }
2135 if jobStats.SessionInfo == nil {
2136 t.Fatalf("expected sessionInfo in second query, was nil")
2137 }
2138 got := jobStats.SessionInfo.SessionID
2139 if got != sessionID {
2140 t.Errorf("second query mismatched session ID, got %s want %s", got, sessionID)
2141 }
2142
2143 }
2144
2145 type queryParameterTestCase struct {
2146 name string
2147 query string
2148 parameters []QueryParameter
2149 wantRow []Value
2150 wantConfig interface{}
2151 }
2152
2153 var (
2154 queryParameterTestCases = []queryParameterTestCase{}
2155 )
2156
2157 func initQueryParameterTestCases() {
2158 d := civil.Date{Year: 2016, Month: 3, Day: 20}
2159 tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 3008}
2160 rtm := tm
2161 rtm.Nanosecond = 3000
2162 dtm := civil.DateTime{Date: d, Time: tm}
2163 ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
2164 rat := big.NewRat(13, 10)
2165 bigRat := big.NewRat(12345, 10e10)
2166 rangeTimestamp1 := &RangeValue{
2167 Start: time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC),
2168 }
2169 rangeTimestamp2 := &RangeValue{
2170 End: time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC),
2171 }
2172
2173 type ss struct {
2174 String string
2175 }
2176
2177 type s struct {
2178 Timestamp time.Time
2179 StringArray []string
2180 SubStruct ss
2181 SubStructArray []ss
2182 }
2183
2184 queryParameterTestCases = []queryParameterTestCase{
2185 {
2186 "Int64Param",
2187 "SELECT @val",
2188 []QueryParameter{{Name: "val", Value: 1}},
2189 []Value{int64(1)},
2190 int64(1),
2191 },
2192 {
2193 "FloatParam",
2194 "SELECT @val",
2195 []QueryParameter{{Name: "val", Value: 1.3}},
2196 []Value{1.3},
2197 1.3,
2198 },
2199 {
2200 "BigRatParam",
2201 "SELECT @val",
2202 []QueryParameter{{Name: "val", Value: rat}},
2203 []Value{rat},
2204 rat,
2205 },
2206 {
2207 "BoolParam",
2208 "SELECT @val",
2209 []QueryParameter{{Name: "val", Value: true}},
2210 []Value{true},
2211 true,
2212 },
2213 {
2214 "StringParam",
2215 "SELECT @val",
2216 []QueryParameter{{Name: "val", Value: "ABC"}},
2217 []Value{"ABC"},
2218 "ABC",
2219 },
2220 {
2221 "ByteParam",
2222 "SELECT @val",
2223 []QueryParameter{{Name: "val", Value: []byte("foo")}},
2224 []Value{[]byte("foo")},
2225 []byte("foo"),
2226 },
2227 {
2228 "TimestampParam",
2229 "SELECT @val",
2230 []QueryParameter{{Name: "val", Value: ts}},
2231 []Value{ts},
2232 ts,
2233 },
2234 {
2235 "TimestampArrayParam",
2236 "SELECT @val",
2237 []QueryParameter{{Name: "val", Value: []time.Time{ts, ts}}},
2238 []Value{[]Value{ts, ts}},
2239 []interface{}{ts, ts},
2240 },
2241 {
2242 "DatetimeParam",
2243 "SELECT @val",
2244 []QueryParameter{{Name: "val", Value: dtm}},
2245 []Value{civil.DateTime{Date: d, Time: rtm}},
2246 civil.DateTime{Date: d, Time: rtm},
2247 },
2248 {
2249 "DateParam",
2250 "SELECT @val",
2251 []QueryParameter{{Name: "val", Value: d}},
2252 []Value{d},
2253 d,
2254 },
2255 {
2256 "TimeParam",
2257 "SELECT @val",
2258 []QueryParameter{{Name: "val", Value: tm}},
2259 []Value{rtm},
2260 rtm,
2261 },
2262 {
2263 "JsonParam",
2264 "SELECT @val",
2265 []QueryParameter{
2266 {
2267 Name: "val",
2268 Value: &QueryParameterValue{
2269 Type: StandardSQLDataType{
2270 TypeKind: "JSON",
2271 },
2272 Value: "{\"alpha\":\"beta\"}",
2273 },
2274 },
2275 },
2276 []Value{"{\"alpha\":\"beta\"}"},
2277 "{\"alpha\":\"beta\"}",
2278 },
2279 {
2280 "RangeUnboundedStart",
2281 "SELECT @val",
2282 []QueryParameter{
2283 {
2284 Name: "val",
2285 Value: &QueryParameterValue{
2286 Type: StandardSQLDataType{
2287 TypeKind: "RANGE",
2288 RangeElementType: &StandardSQLDataType{
2289 TypeKind: "TIMESTAMP",
2290 },
2291 },
2292 Value: rangeTimestamp1,
2293 },
2294 },
2295 },
2296 []Value{rangeTimestamp1},
2297 rangeTimestamp1,
2298 },
2299 {
2300 "RangeUnboundedEnd",
2301 "SELECT @val",
2302 []QueryParameter{
2303 {
2304 Name: "val",
2305 Value: &QueryParameterValue{
2306 Type: StandardSQLDataType{
2307 TypeKind: "RANGE",
2308 RangeElementType: &StandardSQLDataType{
2309 TypeKind: "TIMESTAMP",
2310 },
2311 },
2312 Value: rangeTimestamp2,
2313 },
2314 },
2315 },
2316 []Value{rangeTimestamp2},
2317 rangeTimestamp2,
2318 },
2319 {
2320 "NestedStructParam",
2321 "SELECT @val",
2322 []QueryParameter{{Name: "val", Value: s{ts, []string{"a", "b"}, ss{"c"}, []ss{{"d"}, {"e"}}}}},
2323 []Value{[]Value{ts, []Value{"a", "b"}, []Value{"c"}, []Value{[]Value{"d"}, []Value{"e"}}}},
2324 map[string]interface{}{
2325 "Timestamp": ts,
2326 "StringArray": []interface{}{"a", "b"},
2327 "SubStruct": map[string]interface{}{"String": "c"},
2328 "SubStructArray": []interface{}{
2329 map[string]interface{}{"String": "d"},
2330 map[string]interface{}{"String": "e"},
2331 },
2332 },
2333 },
2334 {
2335 "StructFieldParam",
2336 "SELECT @val.Timestamp, @val.SubStruct.String",
2337 []QueryParameter{{Name: "val", Value: s{Timestamp: ts, SubStruct: ss{"a"}}}},
2338 []Value{ts, "a"},
2339 map[string]interface{}{
2340 "Timestamp": ts,
2341 "SubStruct": map[string]interface{}{"String": "a"},
2342 "StringArray": nil,
2343 "SubStructArray": nil,
2344 },
2345 },
2346 {
2347 "BigNumericExplicitParam",
2348 "SELECT @val",
2349 []QueryParameter{
2350 {
2351 Name: "val",
2352 Value: &QueryParameterValue{
2353 Type: StandardSQLDataType{
2354 TypeKind: "BIGNUMERIC",
2355 },
2356 Value: BigNumericString(bigRat),
2357 },
2358 },
2359 },
2360 []Value{bigRat},
2361 bigRat,
2362 },
2363 {
2364 "StringArrayExplicitParam",
2365 "SELECT @val",
2366 []QueryParameter{
2367 {
2368 Name: "val",
2369 Value: &QueryParameterValue{
2370 ArrayValue: []QueryParameterValue{
2371 {Value: "a"},
2372 {Value: "b"},
2373 },
2374 Type: StandardSQLDataType{
2375 ArrayElementType: &StandardSQLDataType{
2376 TypeKind: "STRING",
2377 },
2378 },
2379 },
2380 },
2381 },
2382 []Value{[]Value{"a", "b"}},
2383 []interface{}{"a", "b"},
2384 },
2385 {
2386 "StructExplicitParam",
2387 "SELECT @val",
2388 []QueryParameter{
2389 {
2390 Name: "val",
2391 Value: &QueryParameterValue{
2392 StructValue: map[string]QueryParameterValue{
2393 "Timestamp": {
2394 Value: ts,
2395 },
2396 "BigNumericArray": {
2397 ArrayValue: []QueryParameterValue{
2398 {Value: BigNumericString(bigRat)},
2399 {Value: BigNumericString(rat)},
2400 },
2401 },
2402 "ArraySingleValueStruct": {
2403 ArrayValue: []QueryParameterValue{
2404 {StructValue: map[string]QueryParameterValue{
2405 "Number": {
2406 Value: int64(42),
2407 },
2408 }},
2409 {StructValue: map[string]QueryParameterValue{
2410 "Number": {
2411 Value: int64(43),
2412 },
2413 }},
2414 },
2415 },
2416 "SubStruct": {
2417 StructValue: map[string]QueryParameterValue{
2418 "String": {
2419 Value: "c",
2420 },
2421 },
2422 },
2423 },
2424 Type: StandardSQLDataType{
2425 StructType: &StandardSQLStructType{
2426 Fields: []*StandardSQLField{
2427 {
2428 Name: "Timestamp",
2429 Type: &StandardSQLDataType{
2430 TypeKind: "TIMESTAMP",
2431 },
2432 },
2433 {
2434 Name: "BigNumericArray",
2435 Type: &StandardSQLDataType{
2436 ArrayElementType: &StandardSQLDataType{
2437 TypeKind: "BIGNUMERIC",
2438 },
2439 },
2440 },
2441 {
2442 Name: "ArraySingleValueStruct",
2443 Type: &StandardSQLDataType{
2444 ArrayElementType: &StandardSQLDataType{
2445 StructType: &StandardSQLStructType{
2446 Fields: []*StandardSQLField{
2447 {
2448 Name: "Number",
2449 Type: &StandardSQLDataType{
2450 TypeKind: "INT64",
2451 },
2452 },
2453 },
2454 },
2455 },
2456 },
2457 },
2458 {
2459 Name: "SubStruct",
2460 Type: &StandardSQLDataType{
2461 StructType: &StandardSQLStructType{
2462 Fields: []*StandardSQLField{
2463 {
2464 Name: "String",
2465 Type: &StandardSQLDataType{
2466 TypeKind: "STRING",
2467 },
2468 },
2469 },
2470 },
2471 },
2472 },
2473 },
2474 },
2475 },
2476 },
2477 },
2478 },
2479 []Value{[]Value{ts, []Value{bigRat, rat}, []Value{[]Value{int64(42)}, []Value{int64(43)}}, []Value{"c"}}},
2480 map[string]interface{}{
2481 "Timestamp": ts,
2482 "BigNumericArray": []interface{}{bigRat, rat},
2483 "ArraySingleValueStruct": []interface{}{
2484 map[string]interface{}{"Number": int64(42)},
2485 map[string]interface{}{"Number": int64(43)},
2486 },
2487 "SubStruct": map[string]interface{}{"String": "c"},
2488 },
2489 },
2490 }
2491 }
2492
2493 func TestIntegration_QueryParameters(t *testing.T) {
2494 if client == nil {
2495 t.Skip("Integration tests skipped")
2496 }
2497 ctx := context.Background()
2498
2499 initQueryParameterTestCases()
2500
2501 for _, tc := range queryParameterTestCases {
2502 t.Run(tc.name, func(t *testing.T) {
2503 q := client.Query(tc.query)
2504 q.Parameters = tc.parameters
2505 job, err := q.Run(ctx)
2506 if err != nil {
2507 t.Fatal(err)
2508 }
2509 if job.LastStatus() == nil {
2510 t.Error("no LastStatus")
2511 }
2512 it, err := job.Read(ctx)
2513 if err != nil {
2514 t.Fatal(err)
2515 }
2516 checkRead(t, "QueryParameters", it, [][]Value{tc.wantRow})
2517 config, err := job.Config()
2518 if err != nil {
2519 t.Fatal(err)
2520 }
2521 got := config.(*QueryConfig).Parameters[0].Value
2522 if !testutil.Equal(got, tc.wantConfig) {
2523 t.Errorf("param %[1]v (%[1]T): config:\ngot %[2]v (%[2]T)\nwant %[3]v (%[3]T)",
2524 tc.parameters[0].Value, got, tc.wantConfig)
2525 }
2526 })
2527 }
2528 }
2529
2530
2531
2532 func TestIntegration_TimestampFormat(t *testing.T) {
2533 if client == nil {
2534 t.Skip("Integration tests skipped")
2535 }
2536 ctx := context.Background()
2537 ts := time.Date(2020, 10, 15, 15, 04, 05, 0, time.UTC)
2538
2539 testCases := []struct {
2540 name string
2541 query string
2542 parameters []*bq.QueryParameter
2543 wantRow []Value
2544 wantConfig interface{}
2545 }{
2546 {
2547 "Literal",
2548 "SELECT @val",
2549 []*bq.QueryParameter{
2550 {
2551 Name: "val",
2552 ParameterType: &bq.QueryParameterType{
2553 Type: "TIMESTAMP",
2554 },
2555 ParameterValue: &bq.QueryParameterValue{
2556 Value: ts.Format(timestampFormat),
2557 },
2558 },
2559 },
2560 []Value{ts},
2561 ts,
2562 },
2563 {
2564 "RFC3339Nano",
2565 "SELECT @val",
2566 []*bq.QueryParameter{
2567 {
2568 Name: "val",
2569 ParameterType: &bq.QueryParameterType{
2570 Type: "TIMESTAMP",
2571 },
2572 ParameterValue: &bq.QueryParameterValue{
2573 Value: ts.Format(time.RFC3339Nano),
2574 },
2575 },
2576 },
2577 []Value{ts},
2578 ts,
2579 },
2580 {
2581 "DatetimeFormat",
2582 "SELECT @val",
2583 []*bq.QueryParameter{
2584 {
2585 Name: "val",
2586 ParameterType: &bq.QueryParameterType{
2587 Type: "TIMESTAMP",
2588 },
2589 ParameterValue: &bq.QueryParameterValue{
2590 Value: ts.Format(dateTimeFormat),
2591 },
2592 },
2593 },
2594 []Value{ts},
2595 ts,
2596 },
2597 {
2598 "RFC3339",
2599 "SELECT @val",
2600 []*bq.QueryParameter{
2601 {
2602 Name: "val",
2603 ParameterType: &bq.QueryParameterType{
2604 Type: "TIMESTAMP",
2605 },
2606 ParameterValue: &bq.QueryParameterValue{
2607 Value: ts.Format(time.RFC3339),
2608 },
2609 },
2610 },
2611 []Value{ts},
2612 ts,
2613 },
2614 }
2615 for _, tc := range testCases {
2616 t.Run(tc.name, func(t *testing.T) {
2617 q := client.Query(tc.query)
2618 bqJob, err := q.newJob()
2619 if err != nil {
2620 t.Fatal(err)
2621 }
2622 bqJob.Configuration.Query.QueryParameters = tc.parameters
2623
2624 job, err := q.client.insertJob(ctx, bqJob, nil)
2625 if err != nil {
2626 t.Fatal(err)
2627 }
2628 if job.LastStatus() == nil {
2629 t.Error("no LastStatus")
2630 }
2631 it, err := job.Read(ctx)
2632 if err != nil {
2633 t.Fatal(err)
2634 }
2635 checkRead(t, "QueryParameters", it, [][]Value{tc.wantRow})
2636 config, err := job.Config()
2637 if err != nil {
2638 t.Fatal(err)
2639 }
2640 got := config.(*QueryConfig).Parameters[0].Value
2641 if !testutil.Equal(got, tc.wantConfig) {
2642 t.Errorf("param %[1]v (%[1]T): config:\ngot %[2]v (%[2]T)\nwant %[3]v (%[3]T)",
2643 tc.parameters[0].ParameterValue.Value, got, tc.wantConfig)
2644 }
2645 })
2646 }
2647 }
2648
2649 func TestIntegration_QueryDryRun(t *testing.T) {
2650 if client == nil {
2651 t.Skip("Integration tests skipped")
2652 }
2653 ctx := context.Background()
2654 q := client.Query("SELECT word from " + stdName + " LIMIT 10")
2655 q.DryRun = true
2656 job, err := q.Run(ctx)
2657 if err != nil {
2658 t.Fatal(err)
2659 }
2660
2661 s := job.LastStatus()
2662 if s.State != Done {
2663 t.Errorf("state is %v, expected Done", s.State)
2664 }
2665 if s.Statistics == nil {
2666 t.Fatal("no statistics")
2667 }
2668 if s.Statistics.Details.(*QueryStatistics).Schema == nil {
2669 t.Fatal("no schema")
2670 }
2671 if s.Statistics.Details.(*QueryStatistics).TotalBytesProcessedAccuracy == "" {
2672 t.Fatal("no cost accuracy")
2673 }
2674 }
2675
2676 func TestIntegration_Scripting(t *testing.T) {
2677 if client == nil {
2678 t.Skip("Integration tests skipped")
2679 }
2680 ctx := context.Background()
2681 sql := `
2682 -- Declare a variable to hold names as an array.
2683 DECLARE top_names ARRAY<STRING>;
2684 BEGIN TRANSACTION;
2685 -- Build an array of the top 100 names from the year 2017.
2686 SET top_names = (
2687 SELECT ARRAY_AGG(name ORDER BY number DESC LIMIT 100)
2688 FROM ` + "`bigquery-public-data`" + `.usa_names.usa_1910_current
2689 WHERE year = 2017
2690 );
2691 -- Which names appear as words in Shakespeare's plays?
2692 SELECT
2693 name AS shakespeare_name
2694 FROM UNNEST(top_names) AS name
2695 WHERE name IN (
2696 SELECT word
2697 FROM ` + "`bigquery-public-data`" + `.samples.shakespeare
2698 );
2699 COMMIT TRANSACTION;
2700 `
2701 q := client.Query(sql)
2702 job, err := q.Run(ctx)
2703 if err != nil {
2704 t.Fatalf("failed to run parent job: %v", err)
2705 }
2706 status, err := job.Wait(ctx)
2707 if err != nil {
2708 t.Fatalf("job %q failed to wait for completion: %v", job.ID(), err)
2709 }
2710 if status.Err() != nil {
2711 t.Fatalf("job %q terminated with error: %v", job.ID(), err)
2712 }
2713
2714 queryStats, ok := status.Statistics.Details.(*QueryStatistics)
2715 if !ok {
2716 t.Fatalf("failed to fetch query statistics")
2717 }
2718
2719 want := "SCRIPT"
2720 if queryStats.StatementType != want {
2721 t.Errorf("statement type mismatch. got %s want %s", queryStats.StatementType, want)
2722 }
2723
2724 if status.Statistics.NumChildJobs <= 0 {
2725 t.Errorf("expected script to indicate nonzero child jobs, got %d", status.Statistics.NumChildJobs)
2726 }
2727
2728
2729 var childJobs []*Job
2730
2731 it := job.Children(ctx)
2732 for {
2733 job, err := it.Next()
2734 if err == iterator.Done {
2735 break
2736 }
2737 if err != nil {
2738 t.Fatal(err)
2739 }
2740 childJobs = append(childJobs, job)
2741 }
2742 if len(childJobs) == 0 {
2743 t.Fatal("Script had no child jobs.")
2744 }
2745
2746 for _, cj := range childJobs {
2747 cStatus := cj.LastStatus()
2748 if cStatus.Statistics.ParentJobID != job.ID() {
2749 t.Errorf("child job %q doesn't indicate parent. got %q, want %q", cj.ID(), cStatus.Statistics.ParentJobID, job.ID())
2750 }
2751 if cStatus.Statistics.ScriptStatistics == nil {
2752 t.Errorf("child job %q doesn't have script statistics present", cj.ID())
2753 }
2754 if cStatus.Statistics.ScriptStatistics.EvaluationKind == "" {
2755 t.Errorf("child job %q didn't indicate evaluation kind", cj.ID())
2756 }
2757 if cStatus.Statistics.TransactionInfo == nil {
2758 t.Errorf("child job %q didn't have transaction info present", cj.ID())
2759 }
2760 if cStatus.Statistics.TransactionInfo.TransactionID == "" {
2761 t.Errorf("child job %q didn't have transactionID present", cj.ID())
2762 }
2763 }
2764
2765 }
2766
2767 func TestIntegration_ExtractExternal(t *testing.T) {
2768
2769 if client == nil {
2770 t.Skip("Integration tests skipped")
2771 }
2772 ctx := context.Background()
2773 schema := Schema{
2774 {Name: "name", Type: StringFieldType},
2775 {Name: "num", Type: IntegerFieldType},
2776 }
2777 table := newTable(t, schema)
2778 defer table.Delete(ctx)
2779
2780
2781 sql := fmt.Sprintf(`INSERT %s.%s (name, num)
2782 VALUES ('a', 1), ('b', 2), ('c', 3)`,
2783 table.DatasetID, table.TableID)
2784 if _, _, err := runQuerySQL(ctx, sql); err != nil {
2785 t.Fatal(err)
2786 }
2787
2788 bucketName := testutil.ProjID()
2789 objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID)
2790 uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
2791 defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
2792 gr := NewGCSReference(uri)
2793 gr.DestinationFormat = CSV
2794 e := table.ExtractorTo(gr)
2795 job, err := e.Run(ctx)
2796 if err != nil {
2797 t.Fatal(err)
2798 }
2799 conf, err := job.Config()
2800 if err != nil {
2801 t.Fatal(err)
2802 }
2803 config, ok := conf.(*ExtractConfig)
2804 if !ok {
2805 t.Fatalf("got %T, want ExtractConfig", conf)
2806 }
2807 diff := testutil.Diff(config, &e.ExtractConfig,
2808 cmp.AllowUnexported(Table{}),
2809 cmpopts.IgnoreUnexported(Client{}))
2810 if diff != "" {
2811 t.Errorf("got=-, want=+:\n%s", diff)
2812 }
2813 if err := wait(ctx, job); err != nil {
2814 t.Fatal(err)
2815 }
2816
2817 edc := &ExternalDataConfig{
2818 SourceFormat: CSV,
2819 SourceURIs: []string{uri},
2820 Schema: schema,
2821 Options: &CSVOptions{
2822 SkipLeadingRows: 1,
2823
2824
2825 FieldDelimiter: ",",
2826 },
2827 }
2828
2829 q := client.Query("SELECT * FROM csv")
2830 q.TableDefinitions = map[string]ExternalData{"csv": edc}
2831 wantRows := [][]Value{
2832 {"a", int64(1)},
2833 {"b", int64(2)},
2834 {"c", int64(3)},
2835 }
2836 iter, err := q.Read(ctx)
2837 if err != nil {
2838 t.Fatal(err)
2839 }
2840 checkReadAndTotalRows(t, "external query", iter, wantRows)
2841
2842
2843
2844 table = dataset.Table(tableIDs.New())
2845 err = table.Create(context.Background(), &TableMetadata{
2846 Schema: schema,
2847 ExpirationTime: testTableExpiration,
2848 ExternalDataConfig: edc,
2849 })
2850 if err != nil {
2851 t.Fatal(err)
2852 }
2853 q = client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID))
2854 iter, err = q.Read(ctx)
2855 if err != nil {
2856 t.Fatal(err)
2857 }
2858 checkReadAndTotalRows(t, "external table", iter, wantRows)
2859
2860
2861 md, err := table.Metadata(ctx)
2862 if err != nil {
2863 t.Fatal(err)
2864 }
2865
2866
2867 md.ExternalDataConfig.Schema = md.Schema
2868 if diff := testutil.Diff(md.ExternalDataConfig, edc); diff != "" {
2869 t.Errorf("got=-, want=+\n%s", diff)
2870 }
2871 }
2872
2873 func TestIntegration_ExportDataStatistics(t *testing.T) {
2874
2875 if client == nil {
2876 t.Skip("Integration tests skipped")
2877 }
2878 ctx := context.Background()
2879 schema := Schema{
2880 {Name: "name", Type: StringFieldType},
2881 {Name: "num", Type: IntegerFieldType},
2882 }
2883 table := newTable(t, schema)
2884 defer table.Delete(ctx)
2885
2886
2887 bucketName := testutil.ProjID()
2888 uri := fmt.Sprintf("gs://%s/bq-export-test-*.csv", bucketName)
2889 defer func() {
2890 it := storageClient.Bucket(bucketName).Objects(ctx, &storage.Query{
2891 MatchGlob: "bq-export-test-*.csv",
2892 })
2893 for {
2894 obj, err := it.Next()
2895 if err == iterator.Done {
2896 break
2897 }
2898 if err != nil {
2899 t.Logf("failed to iterate through bucket %q: %v", bucketName, err)
2900 continue
2901 }
2902 err = storageClient.Bucket(bucketName).Object(obj.Name).Delete(ctx)
2903 }
2904 }()
2905
2906
2907 sql := fmt.Sprintf(`EXPORT DATA
2908 OPTIONS (
2909 uri = '%s',
2910 format = 'CSV',
2911 overwrite = true,
2912 header = true,
2913 field_delimiter = ';'
2914 )
2915 AS (
2916 SELECT 'a' as name, 1 as num
2917 UNION ALL
2918 SELECT 'b' as name, 2 as num
2919 UNION ALL
2920 SELECT 'c' as name, 3 as num
2921 );`,
2922 uri)
2923 stats, _, err := runQuerySQL(ctx, sql)
2924 if err != nil {
2925 t.Fatal(err)
2926 }
2927
2928 qStats, ok := stats.Details.(*QueryStatistics)
2929 if !ok {
2930 t.Fatalf("expected query statistics not present")
2931 }
2932
2933 if qStats.ExportDataStatistics == nil {
2934 t.Fatal("jobStatus missing ExportDataStatistics")
2935 }
2936 if qStats.ExportDataStatistics.FileCount != 1 {
2937 t.Fatalf("expected ExportDataStatistics to have 1 file, but got %d files", qStats.ExportDataStatistics.FileCount)
2938 }
2939 if qStats.ExportDataStatistics.RowCount != 3 {
2940 t.Fatalf("expected ExportDataStatistics to have 3 rows, got %d rows", qStats.ExportDataStatistics.RowCount)
2941 }
2942 }
2943
2944 func TestIntegration_ReadNullIntoStruct(t *testing.T) {
2945
2946 if client == nil {
2947 t.Skip("Integration tests skipped")
2948 }
2949 ctx := context.Background()
2950 table := newTable(t, schema)
2951 defer table.Delete(ctx)
2952
2953 ins := table.Inserter()
2954 row := &ValuesSaver{
2955 Schema: schema,
2956 Row: []Value{nil, []Value{}, []Value{nil}},
2957 }
2958 if err := ins.Put(ctx, []*ValuesSaver{row}); err != nil {
2959 t.Fatal(putError(err))
2960 }
2961 if err := waitForRow(ctx, table); err != nil {
2962 t.Fatal(err)
2963 }
2964
2965 q := client.Query(fmt.Sprintf("select name from %s", table.TableID))
2966 q.DefaultProjectID = dataset.ProjectID
2967 q.DefaultDatasetID = dataset.DatasetID
2968 it, err := q.Read(ctx)
2969 if err != nil {
2970 t.Fatal(err)
2971 }
2972 type S struct{ Name string }
2973 var s S
2974 if err := it.Next(&s); err == nil {
2975 t.Fatal("got nil, want error")
2976 }
2977 }
2978
2979 const (
2980 stdName = "`bigquery-public-data.samples.shakespeare`"
2981 legacyName = "[bigquery-public-data:samples.shakespeare]"
2982 )
2983
2984
2985
2986 var useLegacySQLTests = []struct {
2987 t string
2988 std, legacy bool
2989 err bool
2990 }{
2991 {t: legacyName, std: false, legacy: true, err: false},
2992 {t: legacyName, std: true, legacy: false, err: true},
2993 {t: legacyName, std: false, legacy: false, err: true},
2994 {t: legacyName, std: true, legacy: true, err: true},
2995 {t: stdName, std: false, legacy: true, err: true},
2996 {t: stdName, std: true, legacy: false, err: false},
2997 {t: stdName, std: false, legacy: false, err: false},
2998 {t: stdName, std: true, legacy: true, err: true},
2999 }
3000
3001 func TestIntegration_QueryUseLegacySQL(t *testing.T) {
3002
3003 if client == nil {
3004 t.Skip("Integration tests skipped")
3005 }
3006 ctx := context.Background()
3007 for _, test := range useLegacySQLTests {
3008 q := client.Query(fmt.Sprintf("select word from %s limit 1", test.t))
3009 q.UseStandardSQL = test.std
3010 q.UseLegacySQL = test.legacy
3011 _, err := q.Read(ctx)
3012 gotErr := err != nil
3013 if gotErr && !test.err {
3014 t.Errorf("%+v:\nunexpected error: %v", test, err)
3015 } else if !gotErr && test.err {
3016 t.Errorf("%+v:\nsucceeded, but want error", test)
3017 }
3018 }
3019 }
3020
3021 func TestIntegration_ListJobs(t *testing.T) {
3022
3023
3024
3025 if client == nil {
3026 t.Skip("Integration tests skipped")
3027 }
3028 ctx := context.Background()
3029
3030
3031 const max = 20
3032 var jobs []*Job
3033 it := client.Jobs(ctx)
3034 for {
3035 job, err := it.Next()
3036 if err == iterator.Done {
3037 break
3038 }
3039 if err != nil {
3040 t.Fatal(err)
3041 }
3042 jobs = append(jobs, job)
3043 if len(jobs) >= max {
3044 break
3045 }
3046 }
3047
3048 if len(jobs) == 0 {
3049 t.Fatal("did not get any jobs")
3050 }
3051 }
3052
3053 func TestIntegration_DeleteJob(t *testing.T) {
3054 if client == nil {
3055 t.Skip("Integration tests skipped")
3056 }
3057 ctx := context.Background()
3058
3059 q := client.Query("SELECT 17 as foo")
3060 q.Location = "us-east1"
3061
3062 job, err := q.Run(ctx)
3063 if err != nil {
3064 t.Fatalf("job Run failure: %v", err)
3065 }
3066 err = wait(ctx, job)
3067 if err != nil {
3068 t.Fatalf("job %q completion failure: %v", job.ID(), err)
3069 }
3070
3071 if err := job.Delete(ctx); err != nil {
3072 t.Fatalf("job.Delete failed: %v", err)
3073 }
3074 }
3075
3076 const tokyo = "asia-northeast1"
3077
3078 func TestIntegration_Location(t *testing.T) {
3079 if client == nil {
3080 t.Skip("Integration tests skipped")
3081 }
3082 client.Location = ""
3083 testLocation(t, tokyo)
3084 client.Location = tokyo
3085 defer func() {
3086 client.Location = ""
3087 }()
3088 testLocation(t, "")
3089 }
3090
3091 func testLocation(t *testing.T, loc string) {
3092 ctx := context.Background()
3093 tokyoDataset := client.Dataset("tokyo")
3094 err := tokyoDataset.Create(ctx, &DatasetMetadata{Location: loc})
3095 if err != nil && !hasStatusCode(err, 409) {
3096 t.Fatal(err)
3097 }
3098 md, err := tokyoDataset.Metadata(ctx)
3099 if err != nil {
3100 t.Fatal(err)
3101 }
3102 if md.Location != tokyo {
3103 t.Fatalf("dataset location: got %s, want %s", md.Location, tokyo)
3104 }
3105 table := tokyoDataset.Table(tableIDs.New())
3106 err = table.Create(context.Background(), &TableMetadata{
3107 Schema: Schema{
3108 {Name: "name", Type: StringFieldType},
3109 {Name: "nums", Type: IntegerFieldType},
3110 },
3111 ExpirationTime: testTableExpiration,
3112 })
3113 if err != nil {
3114 t.Fatal(err)
3115 }
3116
3117 tableMetadata, err := table.Metadata(ctx)
3118 if err != nil {
3119 t.Fatalf("failed to get table metadata: %v", err)
3120 }
3121 wantLoc := loc
3122 if loc == "" && client.Location != "" {
3123 wantLoc = client.Location
3124 }
3125 if tableMetadata.Location != wantLoc {
3126 t.Errorf("Location on table doesn't match. Got %s want %s", tableMetadata.Location, wantLoc)
3127 }
3128 defer table.Delete(ctx)
3129 loader := table.LoaderFrom(NewReaderSource(strings.NewReader("a,0\nb,1\nc,2\n")))
3130 loader.Location = loc
3131 job, err := loader.Run(ctx)
3132 if err != nil {
3133 t.Fatal("loader.Run", err)
3134 }
3135 if job.Location() != tokyo {
3136 t.Fatalf("job location: got %s, want %s", job.Location(), tokyo)
3137 }
3138 _, err = client.JobFromID(ctx, job.ID())
3139 if client.Location == "" && err == nil {
3140 t.Error("JobFromID with Tokyo job, no client location: want error, got nil")
3141 }
3142 if client.Location != "" && err != nil {
3143 t.Errorf("JobFromID with Tokyo job, with client location: want nil, got %v", err)
3144 }
3145 _, err = client.JobFromIDLocation(ctx, job.ID(), "US")
3146 if err == nil {
3147 t.Error("JobFromIDLocation with US: want error, got nil")
3148 }
3149 job2, err := client.JobFromIDLocation(ctx, job.ID(), loc)
3150 if loc == tokyo && err != nil {
3151 t.Errorf("loc=tokyo: %v", err)
3152 }
3153 if loc == "" && err == nil {
3154 t.Error("loc empty: got nil, want error")
3155 }
3156 if job2 != nil && (job2.ID() != job.ID() || job2.Location() != tokyo) {
3157 t.Errorf("got id %s loc %s, want id%s loc %s", job2.ID(), job2.Location(), job.ID(), tokyo)
3158 }
3159 if err := wait(ctx, job); err != nil {
3160 t.Fatal(err)
3161 }
3162
3163 if err := job.Cancel(ctx); err != nil {
3164 t.Fatal(err)
3165 }
3166
3167 q := client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID))
3168 q.Location = loc
3169 iter, err := q.Read(ctx)
3170 if err != nil {
3171 t.Fatal(err)
3172 }
3173 wantRows := [][]Value{
3174 {"a", int64(0)},
3175 {"b", int64(1)},
3176 {"c", int64(2)},
3177 }
3178 checkRead(t, "location", iter, wantRows)
3179
3180 table2 := tokyoDataset.Table(tableIDs.New())
3181 copier := table2.CopierFrom(table)
3182 copier.Location = loc
3183 if _, err := copier.Run(ctx); err != nil {
3184 t.Fatal(err)
3185 }
3186 bucketName := testutil.ProjID()
3187 objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID)
3188 uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
3189 defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
3190 gr := NewGCSReference(uri)
3191 gr.DestinationFormat = CSV
3192 e := table.ExtractorTo(gr)
3193 e.Location = loc
3194 if _, err := e.Run(ctx); err != nil {
3195 t.Fatal(err)
3196 }
3197 }
3198
3199 func TestIntegration_NumericErrors(t *testing.T) {
3200
3201 if client == nil {
3202 t.Skip("Integration tests skipped")
3203 }
3204 ctx := context.Background()
3205 schema := Schema{{Name: "n", Type: NumericFieldType}}
3206 table := newTable(t, schema)
3207 defer table.Delete(ctx)
3208 tooBigRat := &big.Rat{}
3209 if _, ok := tooBigRat.SetString("1e40"); !ok {
3210 t.Fatal("big.Rat.SetString failed")
3211 }
3212 ins := table.Inserter()
3213 err := ins.Put(ctx, []*ValuesSaver{{Schema: schema, Row: []Value{tooBigRat}}})
3214 if err == nil {
3215 t.Fatal("got nil, want error")
3216 }
3217 }
3218
3219 func TestIntegration_QueryErrors(t *testing.T) {
3220
3221 if client == nil {
3222 t.Skip("Integration tests skipped")
3223 }
3224 ctx := context.Background()
3225 q := client.Query("blah blah broken")
3226 _, err := q.Read(ctx)
3227 const want = "invalidQuery"
3228 if !strings.Contains(err.Error(), want) {
3229 t.Fatalf("got %q, want substring %q", err, want)
3230 }
3231 }
3232
3233 func TestIntegration_MaterializedViewLifecycle(t *testing.T) {
3234 if client == nil {
3235 t.Skip("Integration tests skipped")
3236 }
3237 ctx := context.Background()
3238
3239
3240 baseTableID := tableIDs.New()
3241 qualified := fmt.Sprintf("`%s`.%s.%s", testutil.ProjID(), dataset.DatasetID, baseTableID)
3242 sql := fmt.Sprintf(`
3243 CREATE TABLE %s
3244 (
3245 sample_value INT64,
3246 groupid STRING,
3247 )
3248 AS
3249 SELECT
3250 CAST(RAND() * 100 AS INT64),
3251 CONCAT("group", CAST(CAST(RAND()*10 AS INT64) AS STRING))
3252 FROM
3253 UNNEST(GENERATE_ARRAY(0,999))
3254 `, qualified)
3255 if _, _, err := runQuerySQL(ctx, sql); err != nil {
3256 t.Fatalf("couldn't instantiate base table: %v", err)
3257 }
3258
3259
3260 sql = fmt.Sprintf(`
3261 SELECT
3262 SUM(sample_value) as total,
3263 groupid
3264 FROM
3265 %s
3266 GROUP BY groupid
3267 `, qualified)
3268
3269
3270
3271 wantRefresh := 6 * time.Hour
3272 matViewID := tableIDs.New()
3273 view := dataset.Table(matViewID)
3274 if err := view.Create(ctx, &TableMetadata{
3275 MaterializedView: &MaterializedViewDefinition{
3276 Query: sql,
3277 RefreshInterval: wantRefresh,
3278 }}); err != nil {
3279 t.Fatal(err)
3280 }
3281
3282
3283 curMeta, err := view.Metadata(ctx)
3284 if err != nil {
3285 t.Fatal(err)
3286 }
3287
3288 if curMeta.MaterializedView == nil {
3289 t.Fatal("expected materialized view definition, was null")
3290 }
3291
3292 if curMeta.MaterializedView.Query != sql {
3293 t.Errorf("mismatch on view sql. Got %s want %s", curMeta.MaterializedView.Query, sql)
3294 }
3295
3296 if curMeta.MaterializedView.RefreshInterval != wantRefresh {
3297 t.Errorf("mismatch on refresh time: got %d usec want %d usec", 1000*curMeta.MaterializedView.RefreshInterval.Nanoseconds(), 1000*wantRefresh.Nanoseconds())
3298 }
3299
3300
3301 want := MaterializedView
3302 if curMeta.Type != want {
3303 t.Errorf("mismatch on table type. got %s want %s", curMeta.Type, want)
3304 }
3305
3306
3307 wantRefresh = time.Hour
3308 upd := TableMetadataToUpdate{
3309 MaterializedView: &MaterializedViewDefinition{
3310 Query: sql,
3311 RefreshInterval: wantRefresh,
3312 },
3313 }
3314
3315 newMeta, err := view.Update(ctx, upd, curMeta.ETag)
3316 if err != nil {
3317 t.Fatalf("failed to update view definition: %v", err)
3318 }
3319
3320 if newMeta.MaterializedView == nil {
3321 t.Error("MaterializeView missing in updated metadata")
3322 }
3323
3324 if newMeta.MaterializedView.RefreshInterval != wantRefresh {
3325 t.Errorf("mismatch on updated refresh time: got %d usec want %d usec", 1000*curMeta.MaterializedView.RefreshInterval.Nanoseconds(), 1000*wantRefresh.Nanoseconds())
3326 }
3327
3328
3329 if newMeta.MaterializedView.EnableRefresh {
3330 t.Error("expected EnableRefresh to be false, is true")
3331 }
3332
3333
3334
3335 it := dataset.Tables(ctx)
3336 seen := false
3337 for {
3338 tbl, err := it.Next()
3339 if err == iterator.Done {
3340 break
3341 }
3342 if err != nil {
3343 t.Fatal(err)
3344 }
3345 if tbl.TableID == matViewID {
3346 seen = true
3347 }
3348 }
3349 if !seen {
3350 t.Error("materialized view not listed in dataset")
3351 }
3352
3353
3354 if err := view.Delete(ctx); err != nil {
3355 t.Errorf("failed to delete materialized view: %v", err)
3356 }
3357
3358 }
3359
3360 func TestIntegration_ModelLifecycle(t *testing.T) {
3361 if client == nil {
3362 t.Skip("Integration tests skipped")
3363 }
3364 ctx := context.Background()
3365
3366
3367 modelID := modelIDs.New()
3368 model := dataset.Model(modelID)
3369 modelSQLID, _ := model.Identifier(StandardSQLID)
3370
3371 sql := fmt.Sprintf(`
3372 CREATE MODEL %s
3373 OPTIONS (
3374 model_type='linear_reg',
3375 max_iteration=1,
3376 learn_rate=0.4,
3377 learn_rate_strategy='constant'
3378 ) AS (
3379 SELECT 'a' AS f1, 2.0 AS label
3380 UNION ALL
3381 SELECT 'b' AS f1, 3.8 AS label
3382 )`, modelSQLID)
3383 if _, _, err := runQuerySQL(ctx, sql); err != nil {
3384 t.Fatal(err)
3385 }
3386 defer model.Delete(ctx)
3387
3388
3389 curMeta, err := model.Metadata(ctx)
3390 if err != nil {
3391 t.Fatalf("couldn't get metadata: %v", err)
3392 }
3393
3394 want := "LINEAR_REGRESSION"
3395 if curMeta.Type != want {
3396 t.Errorf("Model type mismatch. Want %s got %s", curMeta.Type, want)
3397 }
3398
3399
3400 runs := curMeta.RawTrainingRuns()
3401 if runs == nil {
3402 t.Errorf("training runs unpopulated.")
3403 }
3404 labelCols, err := curMeta.RawLabelColumns()
3405 if err != nil {
3406 t.Fatalf("failed to get label cols: %v", err)
3407 }
3408 if labelCols == nil {
3409 t.Errorf("label column information unpopulated.")
3410 }
3411 featureCols, err := curMeta.RawFeatureColumns()
3412 if err != nil {
3413 t.Fatalf("failed to get feature cols: %v", err)
3414 }
3415 if featureCols == nil {
3416 t.Errorf("feature column information unpopulated.")
3417 }
3418
3419
3420 expiry := time.Now().Add(24 * time.Hour).Truncate(time.Millisecond)
3421
3422 upd := ModelMetadataToUpdate{
3423 Description: "new",
3424 Name: "friendly",
3425 ExpirationTime: expiry,
3426 }
3427
3428 newMeta, err := model.Update(ctx, upd, curMeta.ETag)
3429 if err != nil {
3430 t.Fatalf("failed to update: %v", err)
3431 }
3432
3433 want = "new"
3434 if newMeta.Description != want {
3435 t.Fatalf("Description not updated. got %s want %s", newMeta.Description, want)
3436 }
3437 want = "friendly"
3438 if newMeta.Name != want {
3439 t.Fatalf("Description not updated. got %s want %s", newMeta.Description, want)
3440 }
3441 if newMeta.ExpirationTime != expiry {
3442 t.Fatalf("ExpirationTime not updated. got %v want %v", newMeta.ExpirationTime, expiry)
3443 }
3444
3445
3446 it := dataset.Models(ctx)
3447 seen := false
3448 for {
3449 mdl, err := it.Next()
3450 if err == iterator.Done {
3451 break
3452 }
3453 if err != nil {
3454 t.Fatal(err)
3455 }
3456 if mdl.ModelID == modelID {
3457 seen = true
3458 }
3459 }
3460 if !seen {
3461 t.Fatal("model not listed in dataset")
3462 }
3463
3464
3465 bucketName := testutil.ProjID()
3466 objectName := fmt.Sprintf("bq-model-extract-%s", modelID)
3467 uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
3468 defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
3469 gr := NewGCSReference(uri)
3470 gr.DestinationFormat = TFSavedModel
3471 extractor := model.ExtractorTo(gr)
3472 job, err := extractor.Run(ctx)
3473 if err != nil {
3474 t.Fatalf("failed to extract model to GCS: %v", err)
3475 }
3476 if err = wait(ctx, job); err != nil {
3477 t.Errorf("extract failed: %v", err)
3478 }
3479
3480
3481 if err := model.Delete(ctx); err != nil {
3482 t.Fatalf("failed to delete model: %v", err)
3483 }
3484 }
3485
3486
3487 func newTable(t *testing.T, s Schema) *Table {
3488 table := dataset.Table(tableIDs.New())
3489 err := table.Create(context.Background(), &TableMetadata{
3490 Schema: s,
3491 ExpirationTime: testTableExpiration,
3492 })
3493 if err != nil {
3494 t.Fatal(err)
3495 }
3496 return table
3497 }
3498
3499 func checkRead(t *testing.T, msg string, it *RowIterator, want [][]Value) {
3500 if msg2, ok := compareRead(it, want, false); !ok {
3501 t.Errorf("%s: %s", msg, msg2)
3502 }
3503 }
3504
3505 func checkReadAndTotalRows(t *testing.T, msg string, it *RowIterator, want [][]Value) {
3506 if msg2, ok := compareRead(it, want, true); !ok {
3507 t.Errorf("%s: %s", msg, msg2)
3508 }
3509 }
3510
3511 func compareRead(it *RowIterator, want [][]Value, compareTotalRows bool) (msg string, ok bool) {
3512 got, _, totalRows, err := readAll(it)
3513 jobStr := ""
3514 if it.SourceJob() != nil {
3515 jobStr = it.SourceJob().jobID
3516 }
3517 if jobStr != "" {
3518 jobStr = fmt.Sprintf("(Job: %s)", jobStr)
3519 }
3520 if err != nil {
3521 return err.Error(), false
3522 }
3523 if len(got) != len(want) {
3524 return fmt.Sprintf("%s got %d rows, want %d", jobStr, len(got), len(want)), false
3525 }
3526 if compareTotalRows && len(got) != int(totalRows) {
3527 return fmt.Sprintf("%s got %d rows, but totalRows = %d", jobStr, len(got), totalRows), false
3528 }
3529 sort.Sort(byCol0(got))
3530 for i, r := range got {
3531 gotRow := []Value(r)
3532 wantRow := want[i]
3533 if !testutil.Equal(gotRow, wantRow) {
3534 return fmt.Sprintf("%s #%d: got %#v, want %#v", jobStr, i, gotRow, wantRow), false
3535 }
3536 }
3537 return "", true
3538 }
3539
3540 func readAll(it *RowIterator) ([][]Value, Schema, uint64, error) {
3541 var (
3542 rows [][]Value
3543 schema Schema
3544 totalRows uint64
3545 )
3546 for {
3547 var vals []Value
3548 err := it.Next(&vals)
3549 if err == iterator.Done {
3550 return rows, schema, totalRows, nil
3551 }
3552 if err != nil {
3553 return nil, nil, 0, err
3554 }
3555 rows = append(rows, vals)
3556 schema = it.Schema
3557 totalRows = it.TotalRows
3558 }
3559 }
3560
3561 type byCol0 [][]Value
3562
3563 func (b byCol0) Len() int { return len(b) }
3564 func (b byCol0) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
3565 func (b byCol0) Less(i, j int) bool {
3566 switch a := b[i][0].(type) {
3567 case string:
3568 return a < b[j][0].(string)
3569 case civil.Date:
3570 return a.Before(b[j][0].(civil.Date))
3571 default:
3572 panic("unknown type")
3573 }
3574 }
3575
3576 func hasStatusCode(err error, code int) bool {
3577 var e *googleapi.Error
3578 if ok := errors.As(err, &e); ok && e.Code == code {
3579 return true
3580 }
3581 return false
3582 }
3583
3584
3585 func wait(ctx context.Context, job *Job) error {
3586 status, err := job.Wait(ctx)
3587 if err != nil {
3588 return fmt.Errorf("job %q error: %v", job.ID(), err)
3589 }
3590 if status.Err() != nil {
3591 return fmt.Errorf("job %q status error: %#v", job.ID(), status.Err())
3592 }
3593 if status.Statistics == nil {
3594 return fmt.Errorf("job %q nil Statistics", job.ID())
3595 }
3596 if status.Statistics.EndTime.IsZero() {
3597 return fmt.Errorf("job %q EndTime is zero", job.ID())
3598 }
3599 return nil
3600 }
3601
3602
3603
3604 func waitForRow(ctx context.Context, table *Table) error {
3605 for {
3606 it := table.Read(ctx)
3607 var v []Value
3608 err := it.Next(&v)
3609 if err == nil {
3610 return nil
3611 }
3612 if err != iterator.Done {
3613 return err
3614 }
3615 time.Sleep(1 * time.Second)
3616 }
3617 }
3618
3619 func putError(err error) string {
3620 pme, ok := err.(PutMultiError)
3621 if !ok {
3622 return err.Error()
3623 }
3624 var msgs []string
3625 for _, err := range pme {
3626 msgs = append(msgs, err.Error())
3627 }
3628 return strings.Join(msgs, "\n")
3629 }
3630
View as plain text