...

Source file src/cloud.google.com/go/bigquery/integration_test.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2015 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"context"
    19  	"encoding/json"
    20  	"errors"
    21  	"flag"
    22  	"fmt"
    23  	"log"
    24  	"math/big"
    25  	"net/http"
    26  	"os"
    27  	"sort"
    28  	"strings"
    29  	"testing"
    30  	"time"
    31  
    32  	connection "cloud.google.com/go/bigquery/connection/apiv1"
    33  	"cloud.google.com/go/civil"
    34  	datacatalog "cloud.google.com/go/datacatalog/apiv1"
    35  	"cloud.google.com/go/datacatalog/apiv1/datacatalogpb"
    36  	"cloud.google.com/go/httpreplay"
    37  	"cloud.google.com/go/internal"
    38  	"cloud.google.com/go/internal/pretty"
    39  	"cloud.google.com/go/internal/testutil"
    40  	"cloud.google.com/go/internal/uid"
    41  	"cloud.google.com/go/storage"
    42  	"github.com/google/go-cmp/cmp"
    43  	"github.com/google/go-cmp/cmp/cmpopts"
    44  	gax "github.com/googleapis/gax-go/v2"
    45  	bq "google.golang.org/api/bigquery/v2"
    46  	"google.golang.org/api/googleapi"
    47  	"google.golang.org/api/iterator"
    48  	"google.golang.org/api/option"
    49  )
    50  
    51  const replayFilename = "bigquery.replay"
    52  
    53  var record = flag.Bool("record", false, "record RPCs")
    54  
    55  var (
    56  	client                 *Client
    57  	storageOptimizedClient *Client
    58  	storageClient          *storage.Client
    59  	connectionsClient      *connection.Client
    60  	policyTagManagerClient *datacatalog.PolicyTagManagerClient
    61  	dataset                *Dataset
    62  	otherDataset           *Dataset
    63  	schema                 = Schema{
    64  		{Name: "name", Type: StringFieldType},
    65  		{Name: "nums", Type: IntegerFieldType, Repeated: true},
    66  		{Name: "rec", Type: RecordFieldType, Schema: Schema{
    67  			{Name: "bool", Type: BooleanFieldType},
    68  		}},
    69  	}
    70  	testTableExpiration                        time.Time
    71  	datasetIDs, tableIDs, modelIDs, routineIDs *uid.Space
    72  )
    73  
    74  // Note: integration tests cannot be run in parallel, because TestIntegration_Location
    75  // modifies the client.
    76  
    77  func TestMain(m *testing.M) {
    78  	cleanup := initIntegrationTest()
    79  	r := m.Run()
    80  	cleanup()
    81  	os.Exit(r)
    82  }
    83  
    84  func getClient(t *testing.T) *Client {
    85  	if client == nil {
    86  		t.Skip("Integration tests skipped")
    87  	}
    88  	return client
    89  }
    90  
    91  var grpcHeadersChecker = testutil.DefaultHeadersEnforcer()
    92  
    93  // If integration tests will be run, create a unique dataset for them.
    94  // Return a cleanup function.
    95  func initIntegrationTest() func() {
    96  	ctx := context.Background()
    97  	flag.Parse() // needed for testing.Short()
    98  	projID := testutil.ProjID()
    99  	switch {
   100  	case testing.Short() && *record:
   101  		log.Fatal("cannot combine -short and -record")
   102  		return func() {}
   103  
   104  	case testing.Short() && httpreplay.Supported() && testutil.CanReplay(replayFilename) && projID != "":
   105  		// go test -short with a replay file will replay the integration tests if the
   106  		// environment variables are set.
   107  		log.Printf("replaying from %s", replayFilename)
   108  		httpreplay.DebugHeaders()
   109  		replayer, err := httpreplay.NewReplayer(replayFilename)
   110  		if err != nil {
   111  			log.Fatal(err)
   112  		}
   113  		var t time.Time
   114  		if err := json.Unmarshal(replayer.Initial(), &t); err != nil {
   115  			log.Fatal(err)
   116  		}
   117  		hc, err := replayer.Client(ctx) // no creds needed
   118  		if err != nil {
   119  			log.Fatal(err)
   120  		}
   121  		client, err = NewClient(ctx, projID, option.WithHTTPClient(hc))
   122  		if err != nil {
   123  			log.Fatal(err)
   124  		}
   125  		storageOptimizedClient, err = NewClient(ctx, projID, option.WithHTTPClient(hc))
   126  		if err != nil {
   127  			log.Fatal(err)
   128  		}
   129  		err = storageOptimizedClient.EnableStorageReadClient(ctx)
   130  		if err != nil {
   131  			log.Fatal(err)
   132  		}
   133  		storageClient, err = storage.NewClient(ctx, option.WithHTTPClient(hc))
   134  		if err != nil {
   135  			log.Fatal(err)
   136  		}
   137  		connectionsClient, err = connection.NewClient(ctx, option.WithHTTPClient(hc))
   138  		if err != nil {
   139  			log.Fatal(err)
   140  		}
   141  		policyTagManagerClient, err = datacatalog.NewPolicyTagManagerClient(ctx)
   142  		if err != nil {
   143  			log.Fatal(err)
   144  		}
   145  		cleanup := initTestState(client, t)
   146  		return func() {
   147  			cleanup()
   148  			_ = replayer.Close() // No actionable error returned.
   149  		}
   150  
   151  	case testing.Short():
   152  		// go test -short without a replay file skips the integration tests.
   153  		if testutil.CanReplay(replayFilename) && projID != "" {
   154  			log.Print("replay not supported for Go versions before 1.8")
   155  		}
   156  		client = nil
   157  		storageOptimizedClient = nil
   158  		storageClient = nil
   159  		connectionsClient = nil
   160  		return func() {}
   161  
   162  	default: // Run integration tests against a real backend.
   163  		ts := testutil.TokenSource(ctx, Scope)
   164  		if ts == nil {
   165  			log.Println("Integration tests skipped. See CONTRIBUTING.md for details")
   166  			return func() {}
   167  		}
   168  		bqOpts := []option.ClientOption{option.WithTokenSource(ts)}
   169  		sOpts := []option.ClientOption{option.WithTokenSource(testutil.TokenSource(ctx, storage.ScopeFullControl))}
   170  		ptmOpts := []option.ClientOption{option.WithTokenSource(testutil.TokenSource(ctx, datacatalog.DefaultAuthScopes()...))}
   171  		connOpts := []option.ClientOption{option.WithTokenSource(testutil.TokenSource(ctx, connection.DefaultAuthScopes()...))}
   172  		cleanup := func() {}
   173  		now := time.Now().UTC()
   174  		if *record {
   175  			if !httpreplay.Supported() {
   176  				log.Print("record not supported for Go versions before 1.8")
   177  			} else {
   178  				nowBytes, err := json.Marshal(now)
   179  				if err != nil {
   180  					log.Fatal(err)
   181  				}
   182  				recorder, err := httpreplay.NewRecorder(replayFilename, nowBytes)
   183  				if err != nil {
   184  					log.Fatalf("could not record: %v", err)
   185  				}
   186  				log.Printf("recording to %s", replayFilename)
   187  				hc, err := recorder.Client(ctx, bqOpts...)
   188  				if err != nil {
   189  					log.Fatal(err)
   190  				}
   191  				bqOpts = append(bqOpts, option.WithHTTPClient(hc))
   192  				hc, err = recorder.Client(ctx, sOpts...)
   193  				if err != nil {
   194  					log.Fatal(err)
   195  				}
   196  				sOpts = append(sOpts, option.WithHTTPClient(hc))
   197  				cleanup = func() {
   198  					if err := recorder.Close(); err != nil {
   199  						log.Printf("saving recording: %v", err)
   200  					}
   201  				}
   202  			}
   203  		} else {
   204  			// When we're not recording, do http header checking.
   205  			// We can't check universally because option.WithHTTPClient is
   206  			// incompatible with gRPC options.
   207  			bqOpts = append(bqOpts, grpcHeadersChecker.CallOptions()...)
   208  			sOpts = append(sOpts, grpcHeadersChecker.CallOptions()...)
   209  			ptmOpts = append(ptmOpts, grpcHeadersChecker.CallOptions()...)
   210  			connOpts = append(connOpts, grpcHeadersChecker.CallOptions()...)
   211  		}
   212  		var err error
   213  		client, err = NewClient(ctx, projID, bqOpts...)
   214  		if err != nil {
   215  			log.Fatalf("NewClient: %v", err)
   216  		}
   217  		storageOptimizedClient, err = NewClient(ctx, projID, bqOpts...)
   218  		if err != nil {
   219  			log.Fatalf("NewClient: %v", err)
   220  		}
   221  		err = storageOptimizedClient.EnableStorageReadClient(ctx, bqOpts...)
   222  		if err != nil {
   223  			log.Fatalf("ConfigureStorageReadClient: %v", err)
   224  		}
   225  		storageClient, err = storage.NewClient(ctx, sOpts...)
   226  		if err != nil {
   227  			log.Fatalf("storage.NewClient: %v", err)
   228  		}
   229  		policyTagManagerClient, err = datacatalog.NewPolicyTagManagerClient(ctx, ptmOpts...)
   230  		if err != nil {
   231  			log.Fatalf("datacatalog.NewPolicyTagManagerClient: %v", err)
   232  		}
   233  		connectionsClient, err = connection.NewClient(ctx, connOpts...)
   234  		if err != nil {
   235  			log.Fatalf("connection.NewService: %v", err)
   236  		}
   237  		c := initTestState(client, now)
   238  		return func() { c(); cleanup() }
   239  	}
   240  }
   241  
   242  func initTestState(client *Client, t time.Time) func() {
   243  	// BigQuery does not accept hyphens in dataset or table IDs, so we create IDs
   244  	// with underscores.
   245  	ctx := context.Background()
   246  	opts := &uid.Options{Sep: '_', Time: t}
   247  	datasetIDs = uid.NewSpace("dataset", opts)
   248  	tableIDs = uid.NewSpace("table", opts)
   249  	modelIDs = uid.NewSpace("model", opts)
   250  	routineIDs = uid.NewSpace("routine", opts)
   251  	testTableExpiration = t.Add(2 * time.Hour).Round(time.Second)
   252  	// For replayability, seed the random source with t.
   253  	Seed(t.UnixNano())
   254  
   255  	prefixes := []string{
   256  		"dataset_",                    // bigquery package tests
   257  		"managedwriter_test_dataset_", // managedwriter package tests
   258  	}
   259  	for _, prefix := range prefixes {
   260  		deleteDatasets(ctx, prefix)
   261  	}
   262  
   263  	dataset = client.Dataset(datasetIDs.New())
   264  	otherDataset = client.Dataset(datasetIDs.New())
   265  
   266  	if err := dataset.Create(ctx, nil); err != nil {
   267  		log.Fatalf("creating dataset %s: %v", dataset.DatasetID, err)
   268  	}
   269  	if err := otherDataset.Create(ctx, nil); err != nil {
   270  		log.Fatalf("creating other dataset %s: %v", dataset.DatasetID, err)
   271  	}
   272  
   273  	return func() {
   274  		if err := dataset.DeleteWithContents(ctx); err != nil {
   275  			log.Printf("could not delete %s", dataset.DatasetID)
   276  		}
   277  		if err := otherDataset.DeleteWithContents(ctx); err != nil {
   278  			log.Printf("could not delete %s", dataset.DatasetID)
   279  		}
   280  	}
   281  }
   282  
   283  // delete a resource if it is older than a day
   284  // that will prevent collisions with parallel CI test runs.
   285  func isResourceStale(t time.Time) bool {
   286  	return time.Since(t).Hours() >= 24
   287  }
   288  
   289  // delete old datasets
   290  func deleteDatasets(ctx context.Context, prefix string) {
   291  	it := client.Datasets(ctx)
   292  
   293  	for {
   294  		ds, err := it.Next()
   295  		if err == iterator.Done {
   296  			break
   297  		}
   298  		if err != nil {
   299  			fmt.Printf("failed to list project datasets: %v\n", err)
   300  			break
   301  		}
   302  		if !strings.HasPrefix(ds.DatasetID, prefix) {
   303  			continue
   304  		}
   305  
   306  		md, err := ds.Metadata(ctx)
   307  		if err != nil {
   308  			fmt.Printf("failed to get dataset `%s` metadata: %v\n", ds.DatasetID, err)
   309  			continue
   310  		}
   311  		if isResourceStale(md.CreationTime) {
   312  			fmt.Printf("found old dataset to delete: %s\n", ds.DatasetID)
   313  			err := ds.DeleteWithContents(ctx)
   314  			if err != nil {
   315  				fmt.Printf("failed to delete old dataset `%s`\n", ds.DatasetID)
   316  			}
   317  		}
   318  	}
   319  }
   320  
   321  func TestIntegration_DetectProjectID(t *testing.T) {
   322  	ctx := context.Background()
   323  	testCreds := testutil.Credentials(ctx)
   324  	if testCreds == nil {
   325  		t.Skip("test credentials not present, skipping")
   326  	}
   327  
   328  	if _, err := NewClient(ctx, DetectProjectID, option.WithCredentials(testCreds)); err != nil {
   329  		t.Errorf("test NewClient: %v", err)
   330  	}
   331  
   332  	badTS := testutil.ErroringTokenSource{}
   333  
   334  	if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil {
   335  		t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.Project())
   336  	}
   337  }
   338  
   339  func TestIntegration_JobFrom(t *testing.T) {
   340  	if client == nil {
   341  		t.Skip("Integration tests skipped")
   342  	}
   343  	ctx := context.Background()
   344  
   345  	// Create a job we can use for referencing.
   346  	q := client.Query("SELECT 123 as foo")
   347  	it, err := q.Read(ctx)
   348  	if err != nil {
   349  		t.Fatalf("failed to run test query: %v", err)
   350  	}
   351  	want := it.SourceJob()
   352  
   353  	// establish a new client that's pointed at an invalid project/location.
   354  	otherClient, err := NewClient(ctx, "bad-project-id")
   355  	if err != nil {
   356  		t.Fatalf("failed to create other client: %v", err)
   357  	}
   358  	otherClient.Location = "badloc"
   359  
   360  	for _, tc := range []struct {
   361  		description string
   362  		f           func(*Client) (*Job, error)
   363  		wantErr     bool
   364  	}{
   365  		{
   366  			description: "JobFromID",
   367  			f:           func(c *Client) (*Job, error) { return c.JobFromID(ctx, want.jobID) },
   368  			wantErr:     true,
   369  		},
   370  		{
   371  			description: "JobFromIDLocation",
   372  			f:           func(c *Client) (*Job, error) { return c.JobFromIDLocation(ctx, want.jobID, want.location) },
   373  			wantErr:     true,
   374  		},
   375  		{
   376  			description: "JobFromProject",
   377  			f:           func(c *Client) (*Job, error) { return c.JobFromProject(ctx, want.projectID, want.jobID, want.location) },
   378  		},
   379  	} {
   380  		got, err := tc.f(otherClient)
   381  		if err != nil {
   382  			if !tc.wantErr {
   383  				t.Errorf("case %q errored: %v", tc.description, err)
   384  			}
   385  			continue
   386  		}
   387  		if tc.wantErr {
   388  			t.Errorf("case %q got success, expected error", tc.description)
   389  		}
   390  		if got.projectID != want.projectID {
   391  			t.Errorf("case %q projectID mismatch, got %s want %s", tc.description, got.projectID, want.projectID)
   392  		}
   393  		if got.location != want.location {
   394  			t.Errorf("case %q location mismatch, got %s want %s", tc.description, got.location, want.location)
   395  		}
   396  		if got.jobID != want.jobID {
   397  			t.Errorf("case %q jobID mismatch, got %s want %s", tc.description, got.jobID, want.jobID)
   398  		}
   399  		if got.Email() == "" {
   400  			t.Errorf("case %q expected email to be populated, was empty", tc.description)
   401  		}
   402  	}
   403  
   404  }
   405  
   406  func TestIntegration_QueryContextTimeout(t *testing.T) {
   407  	if client == nil {
   408  		t.Skip("Integration tests skipped")
   409  	}
   410  	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
   411  	defer cancel()
   412  
   413  	q := client.Query("select count(*) from unnest(generate_array(1,1000000)), unnest(generate_array(1, 1000)) as foo")
   414  	q.DisableQueryCache = true
   415  	before := time.Now()
   416  	_, err := q.Read(ctx)
   417  	if err != context.DeadlineExceeded {
   418  		t.Errorf("Read() error, wanted %v, got %v", context.DeadlineExceeded, err)
   419  	}
   420  	wantMaxDur := 500 * time.Millisecond
   421  	if d := time.Since(before); d > wantMaxDur {
   422  		t.Errorf("return duration too long, wanted max %v got %v", wantMaxDur, d)
   423  	}
   424  }
   425  
   426  func TestIntegration_SnapshotRestoreClone(t *testing.T) {
   427  
   428  	if client == nil {
   429  		t.Skip("Integration tests skipped")
   430  	}
   431  	ctx := context.Background()
   432  
   433  	// instantiate a base table via a CTAS
   434  	baseTableID := tableIDs.New()
   435  	qualified := fmt.Sprintf("`%s`.%s.%s", testutil.ProjID(), dataset.DatasetID, baseTableID)
   436  	sql := fmt.Sprintf(`
   437  		CREATE TABLE %s
   438  		(
   439  			sample_value INT64,
   440  			groupid STRING,
   441  		)
   442  		AS
   443  		SELECT
   444  		CAST(RAND() * 100 AS INT64),
   445  		CONCAT("group", CAST(CAST(RAND()*10 AS INT64) AS STRING))
   446  		FROM
   447  		UNNEST(GENERATE_ARRAY(0,999))
   448  		`, qualified)
   449  	if _, _, err := runQuerySQL(ctx, sql); err != nil {
   450  		t.Fatalf("couldn't instantiate base table: %v", err)
   451  	}
   452  
   453  	// Create a snapshot.  We'll select our snapshot time explicitly to validate the snapshot time is the same.
   454  	targetTime := time.Now()
   455  	snapshotID := tableIDs.New()
   456  	copier := dataset.Table(snapshotID).CopierFrom(dataset.Table(fmt.Sprintf("%s@%d", baseTableID, targetTime.UnixNano()/1e6)))
   457  	copier.OperationType = SnapshotOperation
   458  	job, err := copier.Run(ctx)
   459  	if err != nil {
   460  		t.Fatalf("couldn't run snapshot: %v", err)
   461  	}
   462  	err = wait(ctx, job)
   463  	if err != nil {
   464  		t.Fatalf("snapshot failed: %v", err)
   465  	}
   466  
   467  	// verify metadata on the snapshot
   468  	meta, err := dataset.Table(snapshotID).Metadata(ctx)
   469  	if err != nil {
   470  		t.Fatalf("couldn't get metadata from snapshot: %v", err)
   471  	}
   472  	if meta.Type != Snapshot {
   473  		t.Errorf("expected snapshot table type, got %s", meta.Type)
   474  	}
   475  	want := &SnapshotDefinition{
   476  		BaseTableReference: dataset.Table(baseTableID),
   477  		SnapshotTime:       targetTime,
   478  	}
   479  	if diff := testutil.Diff(meta.SnapshotDefinition, want, cmp.AllowUnexported(Table{}), cmpopts.IgnoreUnexported(Client{}), cmpopts.EquateApproxTime(time.Millisecond)); diff != "" {
   480  		t.Fatalf("SnapshotDefinition differs.  got=-, want=+:\n%s", diff)
   481  	}
   482  
   483  	// execute a restore using the snapshot.
   484  	restoreID := tableIDs.New()
   485  	restorer := dataset.Table(restoreID).CopierFrom(dataset.Table(snapshotID))
   486  	restorer.OperationType = RestoreOperation
   487  	job, err = restorer.Run(ctx)
   488  	if err != nil {
   489  		t.Fatalf("couldn't run restore: %v", err)
   490  	}
   491  	err = wait(ctx, job)
   492  	if err != nil {
   493  		t.Fatalf("restore failed: %v", err)
   494  	}
   495  
   496  	restoreMeta, err := dataset.Table(restoreID).Metadata(ctx)
   497  	if err != nil {
   498  		t.Fatalf("couldn't get restored table metadata: %v", err)
   499  	}
   500  
   501  	if meta.NumBytes != restoreMeta.NumBytes {
   502  		t.Errorf("bytes mismatch.  snap had %d bytes, restore had %d bytes", meta.NumBytes, restoreMeta.NumBytes)
   503  	}
   504  	if meta.NumRows != restoreMeta.NumRows {
   505  		t.Errorf("row counts mismatch.  snap had %d rows, restore had %d rows", meta.NumRows, restoreMeta.NumRows)
   506  	}
   507  	if restoreMeta.Type != RegularTable {
   508  		t.Errorf("table type mismatch, got %s want %s", restoreMeta.Type, RegularTable)
   509  	}
   510  
   511  	// Create a clone of the snapshot.
   512  	cloneID := tableIDs.New()
   513  	cloner := dataset.Table(cloneID).CopierFrom(dataset.Table(snapshotID))
   514  	cloner.OperationType = CloneOperation
   515  
   516  	job, err = cloner.Run(ctx)
   517  	if err != nil {
   518  		t.Fatalf("couldn't run clone: %v", err)
   519  	}
   520  	err = wait(ctx, job)
   521  	if err != nil {
   522  		t.Fatalf("clone failed: %v", err)
   523  	}
   524  
   525  	cloneMeta, err := dataset.Table(cloneID).Metadata(ctx)
   526  	if err != nil {
   527  		t.Fatalf("couldn't get restored table metadata: %v", err)
   528  	}
   529  	if meta.NumBytes != cloneMeta.NumBytes {
   530  		t.Errorf("bytes mismatch.  snap had %d bytes, clone had %d bytes", meta.NumBytes, cloneMeta.NumBytes)
   531  	}
   532  	if meta.NumRows != cloneMeta.NumRows {
   533  		t.Errorf("row counts mismatch.  snap had %d rows, clone had %d rows", meta.NumRows, cloneMeta.NumRows)
   534  	}
   535  	if cloneMeta.Type != RegularTable {
   536  		t.Errorf("table type mismatch, got %s want %s", cloneMeta.Type, RegularTable)
   537  	}
   538  	if cloneMeta.CloneDefinition == nil {
   539  		t.Errorf("expected CloneDefinition in (%q), was nil", cloneMeta.FullID)
   540  	}
   541  	if cloneMeta.CloneDefinition.BaseTableReference == nil {
   542  		t.Errorf("expected CloneDefinition.BaseTableReference, was nil")
   543  	}
   544  	wantBase := dataset.Table(snapshotID)
   545  	if !testutil.Equal(cloneMeta.CloneDefinition.BaseTableReference, wantBase, cmpopts.IgnoreUnexported(Table{})) {
   546  		t.Errorf("mismatch in CloneDefinition.BaseTableReference.  Got %s, want %s", cloneMeta.CloneDefinition.BaseTableReference.FullyQualifiedName(), wantBase.FullyQualifiedName())
   547  	}
   548  }
   549  
   550  func TestIntegration_HourTimePartitioning(t *testing.T) {
   551  	if client == nil {
   552  		t.Skip("Integration tests skipped")
   553  	}
   554  	ctx := context.Background()
   555  	table := dataset.Table(tableIDs.New())
   556  
   557  	schema := Schema{
   558  		{Name: "name", Type: StringFieldType},
   559  		{Name: "somevalue", Type: IntegerFieldType},
   560  	}
   561  
   562  	// define hourly ingestion-based partitioning.
   563  	wantedTimePartitioning := &TimePartitioning{
   564  		Type: HourPartitioningType,
   565  	}
   566  
   567  	err := table.Create(context.Background(), &TableMetadata{
   568  		Schema:           schema,
   569  		TimePartitioning: wantedTimePartitioning,
   570  	})
   571  	if err != nil {
   572  		t.Fatal(err)
   573  	}
   574  	defer table.Delete(ctx)
   575  	md, err := table.Metadata(ctx)
   576  	if err != nil {
   577  		t.Fatal(err)
   578  	}
   579  
   580  	if md.TimePartitioning == nil {
   581  		t.Fatal("expected time partitioning, got nil")
   582  	}
   583  	if diff := testutil.Diff(md.TimePartitioning, wantedTimePartitioning); diff != "" {
   584  		t.Fatalf("got=-, want=+:\n%s", diff)
   585  	}
   586  	if md.TimePartitioning.Type != wantedTimePartitioning.Type {
   587  		t.Errorf("TimePartitioning interval mismatch: got %v, wanted %v", md.TimePartitioning.Type, wantedTimePartitioning.Type)
   588  	}
   589  }
   590  
   591  func TestIntegration_RangePartitioning(t *testing.T) {
   592  	if client == nil {
   593  		t.Skip("Integration tests skipped")
   594  	}
   595  	ctx := context.Background()
   596  	table := dataset.Table(tableIDs.New())
   597  
   598  	schema := Schema{
   599  		{Name: "name", Type: StringFieldType},
   600  		{Name: "somevalue", Type: IntegerFieldType},
   601  	}
   602  
   603  	wantedRange := &RangePartitioningRange{
   604  		Start:    0,
   605  		End:      135,
   606  		Interval: 25,
   607  	}
   608  
   609  	wantedPartitioning := &RangePartitioning{
   610  		Field: "somevalue",
   611  		Range: wantedRange,
   612  	}
   613  
   614  	err := table.Create(context.Background(), &TableMetadata{
   615  		Schema:            schema,
   616  		RangePartitioning: wantedPartitioning,
   617  	})
   618  	if err != nil {
   619  		t.Fatal(err)
   620  	}
   621  	defer table.Delete(ctx)
   622  	md, err := table.Metadata(ctx)
   623  	if err != nil {
   624  		t.Fatal(err)
   625  	}
   626  
   627  	if md.RangePartitioning == nil {
   628  		t.Fatal("expected range partitioning, got nil")
   629  	}
   630  	got := md.RangePartitioning.Field
   631  	if wantedPartitioning.Field != got {
   632  		t.Errorf("RangePartitioning Field: got %v, want %v", got, wantedPartitioning.Field)
   633  	}
   634  	if md.RangePartitioning.Range == nil {
   635  		t.Fatal("expected a range definition, got nil")
   636  	}
   637  	gotInt64 := md.RangePartitioning.Range.Start
   638  	if gotInt64 != wantedRange.Start {
   639  		t.Errorf("Range.Start: got %v, wanted %v", gotInt64, wantedRange.Start)
   640  	}
   641  	gotInt64 = md.RangePartitioning.Range.End
   642  	if gotInt64 != wantedRange.End {
   643  		t.Errorf("Range.End: got %v, wanted %v", gotInt64, wantedRange.End)
   644  	}
   645  	gotInt64 = md.RangePartitioning.Range.Interval
   646  	if gotInt64 != wantedRange.Interval {
   647  		t.Errorf("Range.Interval: got %v, wanted %v", gotInt64, wantedRange.Interval)
   648  	}
   649  }
   650  
   651  func TestIntegration_RemoveTimePartitioning(t *testing.T) {
   652  	if client == nil {
   653  		t.Skip("Integration tests skipped")
   654  	}
   655  	ctx := context.Background()
   656  	table := dataset.Table(tableIDs.New())
   657  	want := 24 * time.Hour
   658  	err := table.Create(ctx, &TableMetadata{
   659  		ExpirationTime: testTableExpiration,
   660  		TimePartitioning: &TimePartitioning{
   661  			Expiration: want,
   662  		},
   663  	})
   664  	if err != nil {
   665  		t.Fatal(err)
   666  	}
   667  	defer table.Delete(ctx)
   668  
   669  	md, err := table.Metadata(ctx)
   670  	if err != nil {
   671  		t.Fatal(err)
   672  	}
   673  	if got := md.TimePartitioning.Expiration; got != want {
   674  		t.Fatalf("TimePartitioning expiration want = %v, got = %v", want, got)
   675  	}
   676  
   677  	// Remove time partitioning expiration
   678  	md, err = table.Update(context.Background(), TableMetadataToUpdate{
   679  		TimePartitioning: &TimePartitioning{Expiration: 0},
   680  	}, md.ETag)
   681  	if err != nil {
   682  		t.Fatal(err)
   683  	}
   684  
   685  	want = time.Duration(0)
   686  	if got := md.TimePartitioning.Expiration; got != want {
   687  		t.Fatalf("TimeParitioning expiration want = %v, got = %v", want, got)
   688  	}
   689  }
   690  
   691  // setupPolicyTag is a helper for setting up policy tags in the datacatalog service.
   692  //
   693  // It returns a string for a policy tag identifier and a cleanup function, or an error.
   694  func setupPolicyTag(ctx context.Context) (string, func(), error) {
   695  	location := "us"
   696  	req := &datacatalogpb.CreateTaxonomyRequest{
   697  		Parent: fmt.Sprintf("projects/%s/locations/%s", testutil.ProjID(), location),
   698  		Taxonomy: &datacatalogpb.Taxonomy{
   699  			// DisplayName must be unique across org.
   700  			DisplayName: fmt.Sprintf("google-cloud-go bigquery testing taxonomy %d", time.Now().UnixNano()),
   701  			Description: "Taxonomy created for google-cloud-go integration tests",
   702  			ActivatedPolicyTypes: []datacatalogpb.Taxonomy_PolicyType{
   703  				datacatalogpb.Taxonomy_FINE_GRAINED_ACCESS_CONTROL,
   704  			},
   705  		},
   706  	}
   707  	resp, err := policyTagManagerClient.CreateTaxonomy(ctx, req)
   708  	if err != nil {
   709  		return "", nil, fmt.Errorf("datacatalog.CreateTaxonomy: %v", err)
   710  	}
   711  	taxonomyID := resp.GetName()
   712  	cleanupFunc := func() {
   713  		policyTagManagerClient.DeleteTaxonomy(ctx, &datacatalogpb.DeleteTaxonomyRequest{
   714  			Name: taxonomyID,
   715  		})
   716  	}
   717  
   718  	tagReq := &datacatalogpb.CreatePolicyTagRequest{
   719  		Parent: resp.GetName(),
   720  		PolicyTag: &datacatalogpb.PolicyTag{
   721  			DisplayName: "ExamplePolicyTag",
   722  		},
   723  	}
   724  	tagResp, err := policyTagManagerClient.CreatePolicyTag(ctx, tagReq)
   725  	if err != nil {
   726  		// we're failed to create tags, but we did create taxonomy. clean it up and signal error.
   727  		cleanupFunc()
   728  		return "", nil, fmt.Errorf("datacatalog.CreatePolicyTag: %v", err)
   729  	}
   730  	return tagResp.GetName(), cleanupFunc, nil
   731  }
   732  
   733  func TestIntegration_ColumnACLs(t *testing.T) {
   734  	if client == nil {
   735  		t.Skip("Integration tests skipped")
   736  	}
   737  	ctx := context.Background()
   738  	testSchema := Schema{
   739  		{Name: "name", Type: StringFieldType},
   740  		{Name: "ssn", Type: StringFieldType},
   741  		{Name: "acct_balance", Type: NumericFieldType},
   742  	}
   743  	table := newTable(t, testSchema)
   744  	defer table.Delete(ctx)
   745  
   746  	tagID, cleanupFunc, err := setupPolicyTag(ctx)
   747  	if err != nil {
   748  		t.Fatalf("failed to setup policy tag resources: %v", err)
   749  	}
   750  	defer cleanupFunc()
   751  	// amend the test schema to add a policy tag
   752  	testSchema[1].PolicyTags = &PolicyTagList{
   753  		Names: []string{tagID},
   754  	}
   755  
   756  	// Test: Amend an existing schema with a policy tag.
   757  	_, err = table.Update(ctx, TableMetadataToUpdate{
   758  		Schema: testSchema,
   759  	}, "")
   760  	if err != nil {
   761  		t.Errorf("update with policyTag failed: %v", err)
   762  	}
   763  
   764  	// Test: Create a new table with a policy tag defined.
   765  	newTable := dataset.Table(tableIDs.New())
   766  	if err = newTable.Create(ctx, &TableMetadata{
   767  		Schema:      schema,
   768  		Description: "foo",
   769  	}); err != nil {
   770  		t.Errorf("failed to create new table with policy tag: %v", err)
   771  	}
   772  }
   773  
   774  func TestIntegration_SimpleRowResults(t *testing.T) {
   775  	if client == nil {
   776  		t.Skip("Integration tests skipped")
   777  	}
   778  	beforePreview := client.enableQueryPreview
   779  	// ensure we restore the preview setting on test exit
   780  	defer func() {
   781  		client.enableQueryPreview = beforePreview
   782  	}()
   783  	ctx := context.Background()
   784  
   785  	testCases := []struct {
   786  		description string
   787  		query       string
   788  		want        [][]Value
   789  	}{
   790  		{
   791  			description: "literals",
   792  			query:       "select 17 as foo",
   793  			want:        [][]Value{{int64(17)}},
   794  		},
   795  		{
   796  			description: "empty results",
   797  			query:       "SELECT * FROM (select 17 as foo) where false",
   798  			want:        [][]Value{},
   799  		},
   800  		{
   801  			// Previously this would return rows due to the destination reference being present
   802  			// in the job config, but switching to relying on jobs.getQueryResults allows the
   803  			// service to decide the behavior.
   804  			description: "ctas ddl",
   805  			query:       fmt.Sprintf("CREATE OR REPLACE TABLE %s.%s AS SELECT 17 as foo", dataset.DatasetID, tableIDs.New()),
   806  			want:        nil,
   807  		},
   808  		{
   809  			// This is a longer running query to ensure probing works as expected.
   810  			description: "long running",
   811  			query:       "select count(*) from unnest(generate_array(1,1000000)), unnest(generate_array(1, 1000)) as foo",
   812  			want:        [][]Value{{int64(1000000000)}},
   813  		},
   814  		{
   815  			// Query doesn't yield a result.
   816  			description: "DML",
   817  			query:       fmt.Sprintf("CREATE OR REPLACE TABLE %s.%s (foo STRING, bar INT64)", dataset.DatasetID, tableIDs.New()),
   818  			want:        [][]Value{},
   819  		},
   820  	}
   821  
   822  	t.Run("nopreview_group", func(t *testing.T) {
   823  		client.enableQueryPreview = false
   824  		for _, tc := range testCases {
   825  			curCase := tc
   826  			t.Run(curCase.description, func(t *testing.T) {
   827  				t.Parallel()
   828  				q := client.Query(curCase.query)
   829  				it, err := q.Read(ctx)
   830  				if err != nil {
   831  					t.Fatalf("%s read error: %v", curCase.description, err)
   832  				}
   833  				checkReadAndTotalRows(t, curCase.description, it, curCase.want)
   834  			})
   835  		}
   836  	})
   837  	t.Run("preview_group", func(t *testing.T) {
   838  		client.enableQueryPreview = true
   839  		for _, tc := range testCases {
   840  			curCase := tc
   841  			t.Run(curCase.description, func(t *testing.T) {
   842  				t.Parallel()
   843  				q := client.Query(curCase.query)
   844  				it, err := q.Read(ctx)
   845  				if err != nil {
   846  					t.Fatalf("%s read error: %v", curCase.description, err)
   847  				}
   848  				checkReadAndTotalRows(t, curCase.description, it, curCase.want)
   849  			})
   850  		}
   851  	})
   852  
   853  }
   854  
   855  func TestIntegration_QueryIterationPager(t *testing.T) {
   856  	if client == nil {
   857  		t.Skip("Integration tests skipped")
   858  	}
   859  	ctx := context.Background()
   860  
   861  	sql := `
   862  	SELECT
   863  		num,
   864  		num * 2 as double
   865  	FROM
   866  		UNNEST(GENERATE_ARRAY(1,5)) as num`
   867  	q := client.Query(sql)
   868  	it, err := q.Read(ctx)
   869  	if err != nil {
   870  		t.Fatalf("Read: %v", err)
   871  	}
   872  	pager := iterator.NewPager(it, 2, "")
   873  	rowsFetched := 0
   874  	for {
   875  		var rows [][]Value
   876  		nextPageToken, err := pager.NextPage(&rows)
   877  		if err != nil {
   878  			t.Fatalf("NextPage: %v", err)
   879  		}
   880  		rowsFetched = rowsFetched + len(rows)
   881  
   882  		if nextPageToken == "" {
   883  			break
   884  		}
   885  	}
   886  
   887  	wantRows := 5
   888  	if rowsFetched != wantRows {
   889  		t.Errorf("Expected %d rows, got %d", wantRows, rowsFetched)
   890  	}
   891  }
   892  
   893  func TestIntegration_RoutineStoredProcedure(t *testing.T) {
   894  	// Verifies we're exhibiting documented behavior, where we're expected
   895  	// to return the last resultset in a script as the response from a script
   896  	// job.
   897  	// https://github.com/googleapis/google-cloud-go/issues/1974
   898  	if client == nil {
   899  		t.Skip("Integration tests skipped")
   900  	}
   901  	ctx := context.Background()
   902  
   903  	// Define a simple stored procedure via DDL.
   904  	routineID := routineIDs.New()
   905  	routine := dataset.Routine(routineID)
   906  	routineSQLID, _ := routine.Identifier(StandardSQLID)
   907  	sql := fmt.Sprintf(`
   908  		CREATE OR REPLACE PROCEDURE %s(val INT64)
   909  		BEGIN
   910  			SELECT CURRENT_TIMESTAMP() as ts;
   911  			SELECT val * 2 as f2;
   912  		END`,
   913  		routineSQLID)
   914  
   915  	if _, _, err := runQuerySQL(ctx, sql); err != nil {
   916  		t.Fatal(err)
   917  	}
   918  	defer routine.Delete(ctx)
   919  
   920  	// Invoke the stored procedure.
   921  	sql = fmt.Sprintf(`
   922  	CALL %s(5)`,
   923  		routineSQLID)
   924  
   925  	q := client.Query(sql)
   926  	it, err := q.Read(ctx)
   927  	if err != nil {
   928  		t.Fatalf("query.Read: %v", err)
   929  	}
   930  
   931  	checkReadAndTotalRows(t,
   932  		"expect result set from procedure",
   933  		it, [][]Value{{int64(10)}})
   934  }
   935  
   936  func TestIntegration_RoutineUserTVF(t *testing.T) {
   937  	if client == nil {
   938  		t.Skip("Integration tests skipped")
   939  	}
   940  	ctx := context.Background()
   941  
   942  	routineID := routineIDs.New()
   943  	routine := dataset.Routine(routineID)
   944  	inMeta := &RoutineMetadata{
   945  		Type:     "TABLE_VALUED_FUNCTION",
   946  		Language: "SQL",
   947  		Arguments: []*RoutineArgument{
   948  			{Name: "filter",
   949  				DataType: &StandardSQLDataType{TypeKind: "INT64"},
   950  			}},
   951  		ReturnTableType: &StandardSQLTableType{
   952  			Columns: []*StandardSQLField{
   953  				{Name: "x", Type: &StandardSQLDataType{TypeKind: "INT64"}},
   954  			},
   955  		},
   956  		Body: "SELECT x FROM UNNEST([1,2,3]) x WHERE x = filter",
   957  	}
   958  	if err := routine.Create(ctx, inMeta); err != nil {
   959  		t.Fatalf("routine create: %v", err)
   960  	}
   961  	defer routine.Delete(ctx)
   962  
   963  	meta, err := routine.Metadata(ctx)
   964  	if err != nil {
   965  		t.Fatal(err)
   966  	}
   967  
   968  	// Now, compare the input meta to the output meta
   969  	if diff := testutil.Diff(inMeta, meta, cmpopts.IgnoreFields(RoutineMetadata{}, "CreationTime", "LastModifiedTime", "ETag")); diff != "" {
   970  		t.Errorf("routine metadata differs, got=-, want=+\n%s", diff)
   971  	}
   972  }
   973  
   974  func TestIntegration_InsertErrors(t *testing.T) {
   975  	// This test serves to verify streaming behavior in the face of oversized data.
   976  	// BigQuery will reject insertAll payloads that exceed a defined limit (10MB).
   977  	// Additionally, if a payload vastly exceeds this limit, the request is rejected
   978  	// by the intermediate architecture.
   979  	if client == nil {
   980  		t.Skip("Integration tests skipped")
   981  	}
   982  	ctx := context.Background()
   983  	table := newTable(t, schema)
   984  	defer table.Delete(ctx)
   985  
   986  	ins := table.Inserter()
   987  	var saverRows []*ValuesSaver
   988  
   989  	// badSaver represents an excessively sized (>10MB) row message for insertion.
   990  	badSaver := &ValuesSaver{
   991  		Schema:   schema,
   992  		InsertID: NoDedupeID,
   993  		Row:      []Value{strings.Repeat("X", 10485760), []Value{int64(1)}, []Value{true}},
   994  	}
   995  
   996  	saverRows = append(saverRows, badSaver)
   997  	err := ins.Put(ctx, saverRows)
   998  	if err == nil {
   999  		t.Errorf("Wanted row size error, got successful insert.")
  1000  	}
  1001  	var e1 *googleapi.Error
  1002  	ok := errors.As(err, &e1)
  1003  	if !ok {
  1004  		t.Errorf("Wanted googleapi.Error, got: %v", err)
  1005  	}
  1006  	if e1.Code != http.StatusRequestEntityTooLarge {
  1007  		want := "Request payload size exceeds the limit"
  1008  		if !strings.Contains(e1.Message, want) {
  1009  			t.Errorf("Error didn't contain expected message (%s): %#v", want, e1)
  1010  		}
  1011  	}
  1012  	// Case 2: Very Large Request
  1013  	// Request so large it gets rejected by intermediate infra (3x 10MB rows)
  1014  	saverRows = append(saverRows, badSaver)
  1015  	saverRows = append(saverRows, badSaver)
  1016  
  1017  	err = ins.Put(ctx, saverRows)
  1018  	if err == nil {
  1019  		t.Errorf("Wanted error, got successful insert.")
  1020  	}
  1021  	var e2 *googleapi.Error
  1022  	ok = errors.As(err, &e2)
  1023  	if !ok {
  1024  		t.Errorf("wanted googleapi.Error, got: %v", err)
  1025  	}
  1026  	if e2.Code != http.StatusBadRequest && e2.Code != http.StatusRequestEntityTooLarge {
  1027  		t.Errorf("Wanted HTTP 400 or 413, got %d", e2.Code)
  1028  	}
  1029  }
  1030  
  1031  func TestIntegration_InsertAndRead(t *testing.T) {
  1032  	if client == nil {
  1033  		t.Skip("Integration tests skipped")
  1034  	}
  1035  	ctx := context.Background()
  1036  	table := newTable(t, schema)
  1037  	defer table.Delete(ctx)
  1038  
  1039  	// Populate the table.
  1040  	ins := table.Inserter()
  1041  	var (
  1042  		wantRows  [][]Value
  1043  		saverRows []*ValuesSaver
  1044  	)
  1045  	for i, name := range []string{"a", "b", "c"} {
  1046  		row := []Value{name, []Value{int64(i)}, []Value{true}}
  1047  		wantRows = append(wantRows, row)
  1048  		saverRows = append(saverRows, &ValuesSaver{
  1049  			Schema:   schema,
  1050  			InsertID: name,
  1051  			Row:      row,
  1052  		})
  1053  	}
  1054  	if err := ins.Put(ctx, saverRows); err != nil {
  1055  		t.Fatal(putError(err))
  1056  	}
  1057  
  1058  	// Wait until the data has been uploaded. This can take a few seconds, according
  1059  	// to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
  1060  	if err := waitForRow(ctx, table); err != nil {
  1061  		t.Fatal(err)
  1062  	}
  1063  	// Read the table.
  1064  	checkRead(t, "upload", table.Read(ctx), wantRows)
  1065  
  1066  	// Query the table.
  1067  	q := client.Query(fmt.Sprintf("select name, nums, rec from %s", table.TableID))
  1068  	q.DefaultProjectID = dataset.ProjectID
  1069  	q.DefaultDatasetID = dataset.DatasetID
  1070  
  1071  	rit, err := q.Read(ctx)
  1072  	if err != nil {
  1073  		t.Fatal(err)
  1074  	}
  1075  	checkRead(t, "query", rit, wantRows)
  1076  
  1077  	// Query the long way.
  1078  	job1, err := q.Run(ctx)
  1079  	if err != nil {
  1080  		t.Fatal(err)
  1081  	}
  1082  	if job1.LastStatus() == nil {
  1083  		t.Error("no LastStatus")
  1084  	}
  1085  	job2, err := client.JobFromID(ctx, job1.ID())
  1086  	if err != nil {
  1087  		t.Fatal(err)
  1088  	}
  1089  	if job2.LastStatus() == nil {
  1090  		t.Error("no LastStatus")
  1091  	}
  1092  	rit, err = job2.Read(ctx)
  1093  	if err != nil {
  1094  		t.Fatal(err)
  1095  	}
  1096  	checkRead(t, "job.Read", rit, wantRows)
  1097  
  1098  	// Get statistics.
  1099  	jobStatus, err := job2.Status(ctx)
  1100  	if err != nil {
  1101  		t.Fatal(err)
  1102  	}
  1103  	if jobStatus.Statistics == nil {
  1104  		t.Fatal("jobStatus missing statistics")
  1105  	}
  1106  	if _, ok := jobStatus.Statistics.Details.(*QueryStatistics); !ok {
  1107  		t.Errorf("expected QueryStatistics, got %T", jobStatus.Statistics.Details)
  1108  	}
  1109  
  1110  	// Test reading directly into a []Value.
  1111  	valueLists, schema, _, err := readAll(table.Read(ctx))
  1112  	if err != nil {
  1113  		t.Fatal(err)
  1114  	}
  1115  	it := table.Read(ctx)
  1116  	for i, vl := range valueLists {
  1117  		var got []Value
  1118  		if err := it.Next(&got); err != nil {
  1119  			t.Fatal(err)
  1120  		}
  1121  		if !testutil.Equal(it.Schema, schema) {
  1122  			t.Fatalf("got schema %v, want %v", it.Schema, schema)
  1123  		}
  1124  		want := []Value(vl)
  1125  		if !testutil.Equal(got, want) {
  1126  			t.Errorf("%d: got %v, want %v", i, got, want)
  1127  		}
  1128  	}
  1129  
  1130  	// Test reading into a map.
  1131  	it = table.Read(ctx)
  1132  	for _, vl := range valueLists {
  1133  		var vm map[string]Value
  1134  		if err := it.Next(&vm); err != nil {
  1135  			t.Fatal(err)
  1136  		}
  1137  		if got, want := len(vm), len(vl); got != want {
  1138  			t.Fatalf("valueMap len: got %d, want %d", got, want)
  1139  		}
  1140  		// With maps, structs become nested maps.
  1141  		vl[2] = map[string]Value{"bool": vl[2].([]Value)[0]}
  1142  		for i, v := range vl {
  1143  			if got, want := vm[schema[i].Name], v; !testutil.Equal(got, want) {
  1144  				t.Errorf("%d, name=%s: got %#v, want %#v",
  1145  					i, schema[i].Name, got, want)
  1146  			}
  1147  		}
  1148  	}
  1149  
  1150  }
  1151  
  1152  type SubSubTestStruct struct {
  1153  	Integer int64
  1154  }
  1155  
  1156  type SubTestStruct struct {
  1157  	String      string
  1158  	Record      SubSubTestStruct
  1159  	RecordArray []SubSubTestStruct
  1160  }
  1161  
  1162  type TestStruct struct {
  1163  	Name           string
  1164  	Bytes          []byte
  1165  	Integer        int64
  1166  	Float          float64
  1167  	Boolean        bool
  1168  	Timestamp      time.Time
  1169  	Date           civil.Date
  1170  	Time           civil.Time
  1171  	DateTime       civil.DateTime
  1172  	Numeric        *big.Rat
  1173  	Geography      string
  1174  	RangeDate      *RangeValue `bigquery:"rangedate"` //TODO: remove tags when field normalization works
  1175  	RangeDateTime  *RangeValue `bigquery:"rangedatetime"`
  1176  	RangeTimestamp *RangeValue `bigquery:"rangetimestamp"`
  1177  	StringArray    []string
  1178  	IntegerArray   []int64
  1179  	FloatArray     []float64
  1180  	BooleanArray   []bool
  1181  	TimestampArray []time.Time
  1182  	DateArray      []civil.Date
  1183  	TimeArray      []civil.Time
  1184  	DateTimeArray  []civil.DateTime
  1185  	NumericArray   []*big.Rat
  1186  	GeographyArray []string
  1187  
  1188  	Record      SubTestStruct
  1189  	RecordArray []SubTestStruct
  1190  }
  1191  
  1192  // Round times to the microsecond for comparison purposes.
  1193  var roundToMicros = cmp.Transformer("RoundToMicros",
  1194  	func(t time.Time) time.Time { return t.Round(time.Microsecond) })
  1195  
  1196  func TestIntegration_InsertAndReadStructs(t *testing.T) {
  1197  	if client == nil {
  1198  		t.Skip("Integration tests skipped")
  1199  	}
  1200  	schema, err := InferSchema(TestStruct{})
  1201  	if err != nil {
  1202  		t.Fatal(err)
  1203  	}
  1204  
  1205  	// Finish declaring the ambigous range element types.
  1206  	for idx, typ := range map[int]FieldType{
  1207  		11: DateFieldType,
  1208  		12: DateTimeFieldType,
  1209  		13: TimestampFieldType,
  1210  	} {
  1211  		if schema[idx].Type != RangeFieldType {
  1212  			t.Fatalf("mismatch in expected RANGE element in schema field %d", idx)
  1213  		} else {
  1214  			schema[idx].RangeElementType = &RangeElementType{Type: typ}
  1215  		}
  1216  	}
  1217  
  1218  	ctx := context.Background()
  1219  	table := newTable(t, schema)
  1220  	defer table.Delete(ctx)
  1221  
  1222  	d := civil.Date{Year: 2016, Month: 3, Day: 20}
  1223  	tm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000}
  1224  	ts := time.Date(2016, 3, 20, 15, 4, 5, 6000, time.UTC)
  1225  	dtm := civil.DateTime{Date: d, Time: tm}
  1226  	d2 := civil.Date{Year: 1994, Month: 5, Day: 15}
  1227  	tm2 := civil.Time{Hour: 1, Minute: 2, Second: 4, Nanosecond: 0}
  1228  	ts2 := time.Date(1994, 5, 15, 1, 2, 4, 0, time.UTC)
  1229  	dtm2 := civil.DateTime{Date: d2, Time: tm2}
  1230  	g := "POINT(-122.350220 47.649154)"
  1231  	g2 := "POINT(-122.0836791 37.421827)"
  1232  	rangedate := &RangeValue{Start: civil.Date{Year: 2024, Month: 04, Day: 11}}
  1233  	rangedatetime := &RangeValue{
  1234  		End: civil.DateTime{
  1235  			Date: civil.Date{Year: 2024, Month: 04, Day: 11},
  1236  			Time: civil.Time{Hour: 2, Minute: 4, Second: 6, Nanosecond: 0}},
  1237  	}
  1238  	rangetimestamp := &RangeValue{
  1239  		Start: time.Date(2016, 3, 20, 15, 4, 5, 6000, time.UTC),
  1240  	}
  1241  
  1242  	// Populate the table.
  1243  	ins := table.Inserter()
  1244  	want := []*TestStruct{
  1245  		{
  1246  			"a",
  1247  			[]byte("byte"),
  1248  			42,
  1249  			3.14,
  1250  			true,
  1251  			ts,
  1252  			d,
  1253  			tm,
  1254  			dtm,
  1255  			big.NewRat(57, 100),
  1256  			g,
  1257  			rangedate,
  1258  			rangedatetime,
  1259  			rangetimestamp,
  1260  			[]string{"a", "b"},
  1261  			[]int64{1, 2},
  1262  			[]float64{1, 1.41},
  1263  			[]bool{true, false},
  1264  			[]time.Time{ts, ts2},
  1265  			[]civil.Date{d, d2},
  1266  			[]civil.Time{tm, tm2},
  1267  			[]civil.DateTime{dtm, dtm2},
  1268  			[]*big.Rat{big.NewRat(1, 2), big.NewRat(3, 5)},
  1269  			[]string{g, g2},
  1270  			SubTestStruct{
  1271  				"string",
  1272  				SubSubTestStruct{24},
  1273  				[]SubSubTestStruct{{1}, {2}},
  1274  			},
  1275  			[]SubTestStruct{
  1276  				{String: "empty"},
  1277  				{
  1278  					"full",
  1279  					SubSubTestStruct{1},
  1280  					[]SubSubTestStruct{{1}, {2}},
  1281  				},
  1282  			},
  1283  		},
  1284  		{
  1285  			Name:           "b",
  1286  			Bytes:          []byte("byte2"),
  1287  			Integer:        24,
  1288  			Float:          4.13,
  1289  			Boolean:        false,
  1290  			Timestamp:      ts,
  1291  			Date:           d,
  1292  			Time:           tm,
  1293  			DateTime:       dtm,
  1294  			Numeric:        big.NewRat(4499, 10000),
  1295  			RangeDate:      rangedate,
  1296  			RangeDateTime:  rangedatetime,
  1297  			RangeTimestamp: rangetimestamp,
  1298  		},
  1299  	}
  1300  	var savers []*StructSaver
  1301  	for _, s := range want {
  1302  		savers = append(savers, &StructSaver{Schema: schema, Struct: s})
  1303  	}
  1304  	if err := ins.Put(ctx, savers); err != nil {
  1305  		t.Fatal(putError(err))
  1306  	}
  1307  
  1308  	// Wait until the data has been uploaded. This can take a few seconds, according
  1309  	// to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
  1310  	if err := waitForRow(ctx, table); err != nil {
  1311  		t.Fatal(err)
  1312  	}
  1313  
  1314  	// Test iteration with structs.
  1315  	it := table.Read(ctx)
  1316  	var got []*TestStruct
  1317  	for {
  1318  		var g TestStruct
  1319  		err := it.Next(&g)
  1320  		if err == iterator.Done {
  1321  			break
  1322  		}
  1323  		if err != nil {
  1324  			t.Fatal(err)
  1325  		}
  1326  		got = append(got, &g)
  1327  	}
  1328  	sort.Sort(byName(got))
  1329  
  1330  	// BigQuery does not elide nils. It reports an error for nil fields.
  1331  	for i, g := range got {
  1332  		if i >= len(want) {
  1333  			t.Errorf("%d: got %v, past end of want", i, pretty.Value(g))
  1334  		} else if diff := testutil.Diff(g, want[i], roundToMicros); diff != "" {
  1335  			t.Errorf("%d: got=-, want=+:\n%s", i, diff)
  1336  		}
  1337  	}
  1338  }
  1339  
  1340  type byName []*TestStruct
  1341  
  1342  func (b byName) Len() int           { return len(b) }
  1343  func (b byName) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
  1344  func (b byName) Less(i, j int) bool { return b[i].Name < b[j].Name }
  1345  
  1346  func TestIntegration_InsertAndReadNullable(t *testing.T) {
  1347  	if client == nil {
  1348  		t.Skip("Integration tests skipped")
  1349  	}
  1350  	ctm := civil.Time{Hour: 15, Minute: 4, Second: 5, Nanosecond: 6000}
  1351  	cdt := civil.DateTime{Date: testDate, Time: ctm}
  1352  	rat := big.NewRat(33, 100)
  1353  	rat2 := big.NewRat(66, 10e10)
  1354  	geo := "POINT(-122.198939 47.669865)"
  1355  
  1356  	// Nil fields in the struct.
  1357  	testInsertAndReadNullable(t, testStructNullable{}, make([]Value, len(testStructNullableSchema)))
  1358  
  1359  	// Explicitly invalidate the Null* types within the struct.
  1360  	testInsertAndReadNullable(t, testStructNullable{
  1361  		String:    NullString{Valid: false},
  1362  		Integer:   NullInt64{Valid: false},
  1363  		Float:     NullFloat64{Valid: false},
  1364  		Boolean:   NullBool{Valid: false},
  1365  		Timestamp: NullTimestamp{Valid: false},
  1366  		Date:      NullDate{Valid: false},
  1367  		Time:      NullTime{Valid: false},
  1368  		DateTime:  NullDateTime{Valid: false},
  1369  		Geography: NullGeography{Valid: false},
  1370  	},
  1371  		make([]Value, len(testStructNullableSchema)))
  1372  
  1373  	// Populate the struct with values.
  1374  	testInsertAndReadNullable(t, testStructNullable{
  1375  		String:     NullString{"x", true},
  1376  		Bytes:      []byte{1, 2, 3},
  1377  		Integer:    NullInt64{1, true},
  1378  		Float:      NullFloat64{2.3, true},
  1379  		Boolean:    NullBool{true, true},
  1380  		Timestamp:  NullTimestamp{testTimestamp, true},
  1381  		Date:       NullDate{testDate, true},
  1382  		Time:       NullTime{ctm, true},
  1383  		DateTime:   NullDateTime{cdt, true},
  1384  		Numeric:    rat,
  1385  		BigNumeric: rat2,
  1386  		Geography:  NullGeography{geo, true},
  1387  		Record:     &subNullable{X: NullInt64{4, true}},
  1388  	},
  1389  		[]Value{"x", []byte{1, 2, 3}, int64(1), 2.3, true, testTimestamp, testDate, ctm, cdt, rat, rat2, geo, []Value{int64(4)}})
  1390  }
  1391  
  1392  func testInsertAndReadNullable(t *testing.T, ts testStructNullable, wantRow []Value) {
  1393  	ctx := context.Background()
  1394  	table := newTable(t, testStructNullableSchema)
  1395  	defer table.Delete(ctx)
  1396  
  1397  	// Populate the table.
  1398  	ins := table.Inserter()
  1399  	if err := ins.Put(ctx, []*StructSaver{{Schema: testStructNullableSchema, Struct: ts}}); err != nil {
  1400  		t.Fatal(putError(err))
  1401  	}
  1402  	// Wait until the data has been uploaded. This can take a few seconds, according
  1403  	// to https://cloud.google.com/bigquery/streaming-data-into-bigquery.
  1404  	if err := waitForRow(ctx, table); err != nil {
  1405  		t.Fatal(err)
  1406  	}
  1407  
  1408  	// Read into a []Value.
  1409  	iter := table.Read(ctx)
  1410  	gotRows, _, _, err := readAll(iter)
  1411  	if err != nil {
  1412  		t.Fatal(err)
  1413  	}
  1414  	if len(gotRows) != 1 {
  1415  		t.Fatalf("got %d rows, want 1", len(gotRows))
  1416  	}
  1417  	if diff := testutil.Diff(gotRows[0], wantRow, roundToMicros); diff != "" {
  1418  		t.Error(diff)
  1419  	}
  1420  
  1421  	// Read into a struct.
  1422  	want := ts
  1423  	var sn testStructNullable
  1424  	it := table.Read(ctx)
  1425  	if err := it.Next(&sn); err != nil {
  1426  		t.Fatal(err)
  1427  	}
  1428  	if diff := testutil.Diff(sn, want, roundToMicros); diff != "" {
  1429  		t.Error(diff)
  1430  	}
  1431  }
  1432  
  1433  func TestIntegration_QueryStatistics(t *testing.T) {
  1434  	// Make a bunch of assertions on a simple query.
  1435  	if client == nil {
  1436  		t.Skip("Integration tests skipped")
  1437  	}
  1438  	ctx := context.Background()
  1439  
  1440  	q := client.Query("SELECT 17 as foo, 3.14 as bar")
  1441  	// disable cache to ensure we have query statistics
  1442  	q.DisableQueryCache = true
  1443  
  1444  	job, err := q.Run(ctx)
  1445  	if err != nil {
  1446  		t.Fatalf("job Run failure: %v", err)
  1447  	}
  1448  	status, err := job.Wait(ctx)
  1449  	if err != nil {
  1450  		t.Fatalf("job %q: Wait failure: %v", job.ID(), err)
  1451  	}
  1452  	if status.Statistics == nil {
  1453  		t.Fatal("expected job statistics, none found")
  1454  	}
  1455  
  1456  	if status.Statistics.NumChildJobs != 0 {
  1457  		t.Errorf("expected no children, %d reported", status.Statistics.NumChildJobs)
  1458  	}
  1459  
  1460  	if status.Statistics.ParentJobID != "" {
  1461  		t.Errorf("expected no parent, but parent present: %s", status.Statistics.ParentJobID)
  1462  	}
  1463  
  1464  	if status.Statistics.Details == nil {
  1465  		t.Fatal("expected job details, none present")
  1466  	}
  1467  
  1468  	qStats, ok := status.Statistics.Details.(*QueryStatistics)
  1469  	if !ok {
  1470  		t.Fatalf("expected query statistics not present")
  1471  	}
  1472  
  1473  	if qStats.CacheHit {
  1474  		t.Error("unexpected cache hit")
  1475  	}
  1476  
  1477  	if qStats.StatementType != "SELECT" {
  1478  		t.Errorf("expected SELECT statement type, got: %s", qStats.StatementType)
  1479  	}
  1480  
  1481  	if len(qStats.QueryPlan) == 0 {
  1482  		t.Error("expected query plan, none present")
  1483  	}
  1484  
  1485  	if len(qStats.Timeline) == 0 {
  1486  		t.Error("expected query timeline, none present")
  1487  	}
  1488  
  1489  	if qStats.BIEngineStatistics != nil {
  1490  		expectedMode := false
  1491  		for _, m := range []string{"FULL", "PARTIAL", "DISABLED"} {
  1492  			if qStats.BIEngineStatistics.BIEngineMode == m {
  1493  				expectedMode = true
  1494  			}
  1495  		}
  1496  		if !expectedMode {
  1497  			t.Errorf("unexpected BIEngineMode for BI Engine statistics, got %s", qStats.BIEngineStatistics.BIEngineMode)
  1498  		}
  1499  	}
  1500  }
  1501  
  1502  func TestIntegration_Load(t *testing.T) {
  1503  	if client == nil {
  1504  		t.Skip("Integration tests skipped")
  1505  	}
  1506  	ctx := context.Background()
  1507  	// CSV data can't be loaded into a repeated field, so we use a different schema.
  1508  	table := newTable(t, Schema{
  1509  		{Name: "name", Type: StringFieldType},
  1510  		{Name: "nums", Type: IntegerFieldType},
  1511  	})
  1512  	defer table.Delete(ctx)
  1513  
  1514  	// Load the table from a reader.
  1515  	r := strings.NewReader("a,0\nb,1\nc,2\n")
  1516  	wantRows := [][]Value{
  1517  		{"a", int64(0)},
  1518  		{"b", int64(1)},
  1519  		{"c", int64(2)},
  1520  	}
  1521  	rs := NewReaderSource(r)
  1522  	loader := table.LoaderFrom(rs)
  1523  	loader.WriteDisposition = WriteTruncate
  1524  	loader.Labels = map[string]string{"test": "go"}
  1525  	loader.MediaOptions = []googleapi.MediaOption{
  1526  		googleapi.ContentType("text/csv"),
  1527  		googleapi.ChunkSize(googleapi.MinUploadChunkSize),
  1528  	}
  1529  	job, err := loader.Run(ctx)
  1530  	if err != nil {
  1531  		t.Fatal(err)
  1532  	}
  1533  	if job.LastStatus() == nil {
  1534  		t.Error("no LastStatus")
  1535  	}
  1536  	conf, err := job.Config()
  1537  	if err != nil {
  1538  		t.Fatal(err)
  1539  	}
  1540  	config, ok := conf.(*LoadConfig)
  1541  	if !ok {
  1542  		t.Fatalf("got %T, want LoadConfig", conf)
  1543  	}
  1544  	diff := testutil.Diff(config, &loader.LoadConfig,
  1545  		cmp.AllowUnexported(Table{}),
  1546  		cmpopts.IgnoreUnexported(Client{}, ReaderSource{}),
  1547  		// returned schema is at top level, not in the config
  1548  		cmpopts.IgnoreFields(FileConfig{}, "Schema"),
  1549  		cmpopts.IgnoreFields(LoadConfig{}, "MediaOptions"))
  1550  	if diff != "" {
  1551  		t.Errorf("got=-, want=+:\n%s", diff)
  1552  	}
  1553  	if err := wait(ctx, job); err != nil {
  1554  		t.Fatal(err)
  1555  	}
  1556  	checkReadAndTotalRows(t, "reader load", table.Read(ctx), wantRows)
  1557  }
  1558  
  1559  func TestIntegration_LoadWithSessionSupport(t *testing.T) {
  1560  	if client == nil {
  1561  		t.Skip("Integration tests skipped")
  1562  	}
  1563  
  1564  	ctx := context.Background()
  1565  	sessionDataset := client.Dataset("_SESSION")
  1566  	sessionTable := sessionDataset.Table("test_temp_destination_table")
  1567  
  1568  	schema := Schema{
  1569  		{Name: "username", Type: StringFieldType, Required: false},
  1570  		{Name: "tweet", Type: StringFieldType, Required: false},
  1571  		{Name: "timestamp", Type: StringFieldType, Required: false},
  1572  		{Name: "likes", Type: IntegerFieldType, Required: false},
  1573  	}
  1574  	sourceURIs := []string{
  1575  		"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.parquet",
  1576  	}
  1577  
  1578  	source := NewGCSReference(sourceURIs...)
  1579  	source.SourceFormat = Parquet
  1580  	source.Schema = schema
  1581  	loader := sessionTable.LoaderFrom(source)
  1582  	loader.CreateSession = true
  1583  	loader.CreateDisposition = CreateIfNeeded
  1584  
  1585  	job, err := loader.Run(ctx)
  1586  	if err != nil {
  1587  		t.Fatalf("loader.Run: %v", err)
  1588  	}
  1589  	err = wait(ctx, job)
  1590  	if err != nil {
  1591  		t.Fatalf("wait: %v", err)
  1592  	}
  1593  
  1594  	sessionInfo := job.lastStatus.Statistics.SessionInfo
  1595  	if sessionInfo == nil {
  1596  		t.Fatalf("empty job.lastStatus.Statistics.SessionInfo: %v", sessionInfo)
  1597  	}
  1598  
  1599  	sessionID := sessionInfo.SessionID
  1600  	loaderWithSession := sessionTable.LoaderFrom(source)
  1601  	loaderWithSession.CreateDisposition = CreateIfNeeded
  1602  	loaderWithSession.ConnectionProperties = []*ConnectionProperty{
  1603  		{
  1604  			Key:   "session_id",
  1605  			Value: sessionID,
  1606  		},
  1607  	}
  1608  	jobWithSession, err := loaderWithSession.Run(ctx)
  1609  	if err != nil {
  1610  		t.Fatalf("loaderWithSession.Run: %v", err)
  1611  	}
  1612  	err = wait(ctx, jobWithSession)
  1613  	if err != nil {
  1614  		t.Fatalf("wait: %v", err)
  1615  	}
  1616  
  1617  	sessionJobInfo := jobWithSession.lastStatus.Statistics.SessionInfo
  1618  	if sessionJobInfo == nil {
  1619  		t.Fatalf("empty jobWithSession.lastStatus.Statistics.SessionInfo: %v", sessionJobInfo)
  1620  	}
  1621  
  1622  	if sessionID != sessionJobInfo.SessionID {
  1623  		t.Fatalf("expected session ID %q, but found %q", sessionID, sessionJobInfo.SessionID)
  1624  	}
  1625  
  1626  	sql := "SELECT * FROM _SESSION.test_temp_destination_table;"
  1627  	q := client.Query(sql)
  1628  	q.ConnectionProperties = []*ConnectionProperty{
  1629  		{
  1630  			Key:   "session_id",
  1631  			Value: sessionID,
  1632  		},
  1633  	}
  1634  	sessionQueryJob, err := q.Run(ctx)
  1635  	err = wait(ctx, sessionQueryJob)
  1636  	if err != nil {
  1637  		t.Fatalf("wait: %v", err)
  1638  	}
  1639  }
  1640  
  1641  func TestIntegration_LoadWithReferenceSchemaFile(t *testing.T) {
  1642  	if client == nil {
  1643  		t.Skip("Integration tests skipped")
  1644  	}
  1645  
  1646  	formats := []DataFormat{Avro, Parquet}
  1647  	for _, format := range formats {
  1648  		ctx := context.Background()
  1649  		table := dataset.Table(tableIDs.New())
  1650  		defer table.Delete(ctx)
  1651  
  1652  		expectedSchema := Schema{
  1653  			{Name: "username", Type: StringFieldType, Required: false},
  1654  			{Name: "tweet", Type: StringFieldType, Required: false},
  1655  			{Name: "timestamp", Type: StringFieldType, Required: false},
  1656  			{Name: "likes", Type: IntegerFieldType, Required: false},
  1657  		}
  1658  		ext := strings.ToLower(string(format))
  1659  		sourceURIs := []string{
  1660  			"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter." + ext,
  1661  			"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter." + ext,
  1662  			"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/c-twitter." + ext,
  1663  		}
  1664  		referenceURI := "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter." + ext
  1665  		source := NewGCSReference(sourceURIs...)
  1666  		source.SourceFormat = format
  1667  		loader := table.LoaderFrom(source)
  1668  		loader.ReferenceFileSchemaURI = referenceURI
  1669  		job, err := loader.Run(ctx)
  1670  		if err != nil {
  1671  			t.Fatalf("loader.Run: %v", err)
  1672  		}
  1673  		err = wait(ctx, job)
  1674  		if err != nil {
  1675  			t.Fatalf("wait: %v", err)
  1676  		}
  1677  		metadata, err := table.Metadata(ctx)
  1678  		if err != nil {
  1679  			t.Fatalf("table.Metadata: %v", err)
  1680  		}
  1681  		diff := testutil.Diff(expectedSchema, metadata.Schema)
  1682  		if diff != "" {
  1683  			t.Errorf("got=-, want=+:\n%s", diff)
  1684  		}
  1685  	}
  1686  }
  1687  
  1688  func TestIntegration_ExternalTableWithReferenceSchemaFile(t *testing.T) {
  1689  	if client == nil {
  1690  		t.Skip("Integration tests skipped")
  1691  	}
  1692  
  1693  	formats := []DataFormat{Avro, Parquet}
  1694  	for _, format := range formats {
  1695  		ctx := context.Background()
  1696  		externalTable := dataset.Table(tableIDs.New())
  1697  		defer externalTable.Delete(ctx)
  1698  
  1699  		expectedSchema := Schema{
  1700  			{Name: "username", Type: StringFieldType, Required: false},
  1701  			{Name: "tweet", Type: StringFieldType, Required: false},
  1702  			{Name: "timestamp", Type: StringFieldType, Required: false},
  1703  			{Name: "likes", Type: IntegerFieldType, Required: false},
  1704  		}
  1705  		ext := strings.ToLower(string(format))
  1706  		sourceURIs := []string{
  1707  			"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter." + ext,
  1708  			"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter." + ext,
  1709  			"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/c-twitter." + ext,
  1710  		}
  1711  		referenceURI := "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter." + ext
  1712  
  1713  		err := externalTable.Create(ctx, &TableMetadata{
  1714  			ExternalDataConfig: &ExternalDataConfig{
  1715  				SourceFormat:           format,
  1716  				SourceURIs:             sourceURIs,
  1717  				ReferenceFileSchemaURI: referenceURI,
  1718  			},
  1719  		})
  1720  		if err != nil {
  1721  			t.Fatalf("table.Create: %v", err)
  1722  		}
  1723  
  1724  		metadata, err := externalTable.Metadata(ctx)
  1725  		if err != nil {
  1726  			t.Fatalf("table.Metadata: %v", err)
  1727  		}
  1728  		diff := testutil.Diff(expectedSchema, metadata.Schema)
  1729  		if diff != "" {
  1730  			t.Errorf("got=-, want=+:\n%s", diff)
  1731  		}
  1732  	}
  1733  }
  1734  
  1735  func TestIntegration_DML(t *testing.T) {
  1736  	if client == nil {
  1737  		t.Skip("Integration tests skipped")
  1738  	}
  1739  	ctx := context.Background()
  1740  	table := newTable(t, schema)
  1741  	defer table.Delete(ctx)
  1742  
  1743  	sql := fmt.Sprintf(`INSERT %s.%s (name, nums, rec)
  1744  						VALUES ('a', [0], STRUCT<BOOL>(TRUE)),
  1745  							   ('b', [1], STRUCT<BOOL>(FALSE)),
  1746  							   ('c', [2], STRUCT<BOOL>(TRUE))`,
  1747  		table.DatasetID, table.TableID)
  1748  	_, stats, err := runQuerySQL(ctx, sql)
  1749  	if err != nil {
  1750  		t.Fatal(err)
  1751  	}
  1752  	wantRows := [][]Value{
  1753  		{"a", []Value{int64(0)}, []Value{true}},
  1754  		{"b", []Value{int64(1)}, []Value{false}},
  1755  		{"c", []Value{int64(2)}, []Value{true}},
  1756  	}
  1757  	checkRead(t, "DML", table.Read(ctx), wantRows)
  1758  	if stats == nil {
  1759  		t.Fatalf("no query stats")
  1760  	}
  1761  	if stats.DMLStats == nil {
  1762  		t.Fatalf("no dml stats")
  1763  	}
  1764  	wantRowCount := int64(len(wantRows))
  1765  	if stats.DMLStats.InsertedRowCount != wantRowCount {
  1766  		t.Fatalf("dml stats mismatch.  got %d inserted rows, want %d", stats.DMLStats.InsertedRowCount, wantRowCount)
  1767  	}
  1768  }
  1769  
  1770  // runQuerySQL runs arbitrary SQL text.
  1771  func runQuerySQL(ctx context.Context, sql string) (*JobStatistics, *QueryStatistics, error) {
  1772  	return runQueryJob(ctx, client.Query(sql))
  1773  }
  1774  
  1775  // runQueryJob is useful for running queries where no row data is returned (DDL/DML).
  1776  func runQueryJob(ctx context.Context, q *Query) (*JobStatistics, *QueryStatistics, error) {
  1777  	var jobStats *JobStatistics
  1778  	var queryStats *QueryStatistics
  1779  	var err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
  1780  		job, err := q.Run(ctx)
  1781  		if err != nil {
  1782  			var e *googleapi.Error
  1783  			if ok := errors.As(err, &e); ok && e.Code < 500 {
  1784  				return true, err // fail on 4xx
  1785  			}
  1786  			return false, err
  1787  		}
  1788  		_, err = job.Wait(ctx)
  1789  		if err != nil {
  1790  			var e *googleapi.Error
  1791  			if ok := errors.As(err, &e); ok && e.Code < 500 {
  1792  				return true, err // fail on 4xx
  1793  			}
  1794  			return false, fmt.Errorf("%q: %v", job.ID(), err)
  1795  		}
  1796  		status := job.LastStatus()
  1797  		if status.Err() != nil {
  1798  			return false, fmt.Errorf("job %q terminated in err: %v", job.ID(), status.Err())
  1799  		}
  1800  		if status.Statistics != nil {
  1801  			jobStats = status.Statistics
  1802  			if qStats, ok := status.Statistics.Details.(*QueryStatistics); ok {
  1803  				queryStats = qStats
  1804  			}
  1805  		}
  1806  		return true, nil
  1807  	})
  1808  	return jobStats, queryStats, err
  1809  }
  1810  
  1811  func TestIntegration_TimeTypes(t *testing.T) {
  1812  	if client == nil {
  1813  		t.Skip("Integration tests skipped")
  1814  	}
  1815  	ctx := context.Background()
  1816  	dtSchema := Schema{
  1817  		{Name: "d", Type: DateFieldType},
  1818  		{Name: "t", Type: TimeFieldType},
  1819  		{Name: "dt", Type: DateTimeFieldType},
  1820  		{Name: "ts", Type: TimestampFieldType},
  1821  	}
  1822  	table := newTable(t, dtSchema)
  1823  	defer table.Delete(ctx)
  1824  
  1825  	d := civil.Date{Year: 2016, Month: 3, Day: 20}
  1826  	tm := civil.Time{Hour: 12, Minute: 30, Second: 0, Nanosecond: 6000}
  1827  	dtm := civil.DateTime{Date: d, Time: tm}
  1828  	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
  1829  	wantRows := [][]Value{
  1830  		{d, tm, dtm, ts},
  1831  	}
  1832  	ins := table.Inserter()
  1833  	if err := ins.Put(ctx, []*ValuesSaver{
  1834  		{Schema: dtSchema, Row: wantRows[0]},
  1835  	}); err != nil {
  1836  		t.Fatal(putError(err))
  1837  	}
  1838  	if err := waitForRow(ctx, table); err != nil {
  1839  		t.Fatal(err)
  1840  	}
  1841  
  1842  	// SQL wants DATETIMEs with a space between date and time, but the service
  1843  	// returns them in RFC3339 form, with a "T" between.
  1844  	query := fmt.Sprintf("INSERT %s.%s (d, t, dt, ts) "+
  1845  		"VALUES ('%s', '%s', '%s', '%s')",
  1846  		table.DatasetID, table.TableID,
  1847  		d, CivilTimeString(tm), CivilDateTimeString(dtm), ts.Format("2006-01-02 15:04:05"))
  1848  	if _, _, err := runQuerySQL(ctx, query); err != nil {
  1849  		t.Fatal(err)
  1850  	}
  1851  	wantRows = append(wantRows, wantRows[0])
  1852  	checkRead(t, "TimeTypes", table.Read(ctx), wantRows)
  1853  }
  1854  
  1855  func TestIntegration_StandardQuery(t *testing.T) {
  1856  	if client == nil {
  1857  		t.Skip("Integration tests skipped")
  1858  	}
  1859  	ctx := context.Background()
  1860  
  1861  	d := civil.Date{Year: 2016, Month: 3, Day: 20}
  1862  	tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 0}
  1863  	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
  1864  	dtm := ts.Format("2006-01-02 15:04:05")
  1865  
  1866  	// Constructs Value slices made up of int64s.
  1867  	ints := func(args ...int) []Value {
  1868  		vals := make([]Value, len(args))
  1869  		for i, arg := range args {
  1870  			vals[i] = int64(arg)
  1871  		}
  1872  		return vals
  1873  	}
  1874  
  1875  	testCases := []struct {
  1876  		name    string
  1877  		query   string
  1878  		wantRow []Value
  1879  	}{
  1880  		{"Ints", "SELECT 1", ints(1)},
  1881  		{"Float", "SELECT 1.3", []Value{1.3}},
  1882  		{"NumericCast", "SELECT CAST(1.3  AS NUMERIC)", []Value{big.NewRat(13, 10)}},
  1883  		{"NumericLiteral", "SELECT NUMERIC '0.25'", []Value{big.NewRat(1, 4)}},
  1884  		{"Boolean", "SELECT TRUE", []Value{true}},
  1885  		{"String", "SELECT 'ABC'", []Value{"ABC"}},
  1886  		{"Bytes", "SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}},
  1887  		{"Timestamp", fmt.Sprintf("SELECT TIMESTAMP '%s'", dtm), []Value{ts}},
  1888  		{"TimestampArray", fmt.Sprintf("SELECT [TIMESTAMP '%s', TIMESTAMP '%s']", dtm, dtm), []Value{[]Value{ts, ts}}},
  1889  		{"AnonStruct", fmt.Sprintf("SELECT ('hello', TIMESTAMP '%s')", dtm), []Value{[]Value{"hello", ts}}},
  1890  		{"DatetimeCast", fmt.Sprintf("SELECT DATETIME(TIMESTAMP '%s')", dtm), []Value{civil.DateTime{Date: d, Time: tm}}},
  1891  		{"DateCast", fmt.Sprintf("SELECT DATE(TIMESTAMP '%s')", dtm), []Value{d}},
  1892  		{"TimeCast", fmt.Sprintf("SELECT TIME(TIMESTAMP '%s')", dtm), []Value{tm}},
  1893  		{"StructOfInts", "SELECT (1, 2)", []Value{ints(1, 2)}},
  1894  		{"IntArray", "SELECT [1, 2, 3]", []Value{ints(1, 2, 3)}},
  1895  		{"StructOfArrays", "SELECT ([1, 2], 3, [4, 5])", []Value{[]Value{ints(1, 2), int64(3), ints(4, 5)}}},
  1896  		{"ArrayOfStructs", "SELECT [(1, 2, 3), (4, 5, 6)]", []Value{[]Value{ints(1, 2, 3), ints(4, 5, 6)}}},
  1897  		{"ComplexNested", "SELECT [([1, 2, 3], 4), ([5, 6], 7)]", []Value{[]Value{[]Value{ints(1, 2, 3), int64(4)}, []Value{ints(5, 6), int64(7)}}}},
  1898  		{"SubSelectArray", "SELECT ARRAY(SELECT STRUCT([1, 2]))", []Value{[]Value{[]Value{ints(1, 2)}}}},
  1899  		{"RangeOofDateLiteral",
  1900  			"SELECT RANGE(DATE '2023-03-01', DATE '2024-04-16')",
  1901  			[]Value{&RangeValue{Start: civil.Date{Year: 2023, Month: 03, Day: 01}, End: civil.Date{Year: 2024, Month: 04, Day: 16}}},
  1902  		},
  1903  	}
  1904  	for _, tc := range testCases {
  1905  		t.Run(tc.name, func(t *testing.T) {
  1906  			q := client.Query(tc.query)
  1907  			it, err := q.Read(ctx)
  1908  			if err != nil {
  1909  				t.Fatal(err)
  1910  			}
  1911  			checkRead(t, "StandardQuery", it, [][]Value{tc.wantRow})
  1912  		})
  1913  	}
  1914  }
  1915  
  1916  func TestIntegration_LegacyQuery(t *testing.T) {
  1917  	if client == nil {
  1918  		t.Skip("Integration tests skipped")
  1919  	}
  1920  	ctx := context.Background()
  1921  
  1922  	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
  1923  	dtm := ts.Format("2006-01-02 15:04:05")
  1924  
  1925  	testCases := []struct {
  1926  		name    string
  1927  		query   string
  1928  		wantRow []Value
  1929  	}{
  1930  		{"Int", "SELECT 1", []Value{int64(1)}},
  1931  		{"Float", "SELECT 1.3", []Value{1.3}},
  1932  		{"Boolean", "SELECT TRUE", []Value{true}},
  1933  		{"String", "SELECT 'ABC'", []Value{"ABC"}},
  1934  		{"Bytes", "SELECT CAST('foo' AS BYTES)", []Value{[]byte("foo")}},
  1935  		{"Timestamp", fmt.Sprintf("SELECT TIMESTAMP('%s')", dtm), []Value{ts}},
  1936  		{"Date", fmt.Sprintf("SELECT DATE(TIMESTAMP('%s'))", dtm), []Value{"2016-03-20"}},
  1937  		{"Time", fmt.Sprintf("SELECT TIME(TIMESTAMP('%s'))", dtm), []Value{"15:04:05"}},
  1938  	}
  1939  	for _, tc := range testCases {
  1940  		q := client.Query(tc.query)
  1941  		q.UseLegacySQL = true
  1942  		it, err := q.Read(ctx)
  1943  		if err != nil {
  1944  			t.Fatal(err)
  1945  		}
  1946  		checkRead(t, "LegacyQuery", it, [][]Value{tc.wantRow})
  1947  	}
  1948  }
  1949  
  1950  func TestIntegration_IteratorSource(t *testing.T) {
  1951  	if client == nil {
  1952  		t.Skip("Integration tests skipped")
  1953  	}
  1954  	ctx := context.Background()
  1955  	q := client.Query("SELECT 17 as foo")
  1956  	it, err := q.Read(ctx)
  1957  	if err != nil {
  1958  		t.Errorf("Read: %v", err)
  1959  	}
  1960  	src := it.SourceJob()
  1961  	if src == nil {
  1962  		t.Errorf("wanted source job, got nil")
  1963  	}
  1964  	status, err := src.Status(ctx)
  1965  	if err != nil {
  1966  		t.Errorf("Status: %v", err)
  1967  	}
  1968  	if status == nil {
  1969  		t.Errorf("got nil status")
  1970  	}
  1971  }
  1972  
  1973  func TestIntegration_ExternalAutodetect(t *testing.T) {
  1974  	if client == nil {
  1975  		t.Skip("Integration tests skipped")
  1976  	}
  1977  	ctx := context.Background()
  1978  
  1979  	testTable := dataset.Table(tableIDs.New())
  1980  
  1981  	origExtCfg := &ExternalDataConfig{
  1982  		SourceFormat: Avro,
  1983  		SourceURIs:   []string{"gs://cloud-samples-data/bigquery/autodetect-samples/original*.avro"},
  1984  	}
  1985  
  1986  	err := testTable.Create(ctx, &TableMetadata{
  1987  		ExternalDataConfig: origExtCfg,
  1988  	})
  1989  	if err != nil {
  1990  		t.Fatalf("Table.Create(%q): %v", testTable.FullyQualifiedName(), err)
  1991  	}
  1992  
  1993  	origMeta, err := testTable.Metadata(ctx)
  1994  	if err != nil {
  1995  		t.Fatalf("Table.Metadata(%q): %v", testTable.FullyQualifiedName(), err)
  1996  	}
  1997  
  1998  	wantSchema := Schema{
  1999  		{Name: "stringfield", Type: "STRING"},
  2000  		{Name: "int64field", Type: "INTEGER"},
  2001  	}
  2002  	if diff := testutil.Diff(origMeta.Schema, wantSchema); diff != "" {
  2003  		t.Fatalf("orig schema, got=-, want=+\n%s", diff)
  2004  	}
  2005  
  2006  	// Now, point at the new files, but don't signal autodetect.
  2007  	newExtCfg := &ExternalDataConfig{
  2008  		SourceFormat: Avro,
  2009  		SourceURIs:   []string{"gs://cloud-samples-data/bigquery/autodetect-samples/widened*.avro"},
  2010  	}
  2011  
  2012  	newMeta, err := testTable.Update(ctx, TableMetadataToUpdate{
  2013  		ExternalDataConfig: newExtCfg,
  2014  	}, origMeta.ETag)
  2015  	if err != nil {
  2016  		t.Fatalf("Table.Update(%q): %v", testTable.FullyQualifiedName(), err)
  2017  	}
  2018  	if diff := testutil.Diff(newMeta.Schema, wantSchema); diff != "" {
  2019  		t.Fatalf("new schema, got=-, want=+\n%s", diff)
  2020  	}
  2021  
  2022  	// Now, signal autodetect in another update.
  2023  	// This should yield a new schema.
  2024  	newMeta2, err := testTable.Update(ctx, TableMetadataToUpdate{}, newMeta.ETag, WithAutoDetectSchema(true))
  2025  	if err != nil {
  2026  		t.Fatalf("Table.Update(%q) with autodetect: %v", testTable.FullyQualifiedName(), err)
  2027  	}
  2028  
  2029  	wantSchema2 := Schema{
  2030  		{Name: "stringfield", Type: "STRING"},
  2031  		{Name: "int64field", Type: "INTEGER"},
  2032  		{Name: "otherfield", Type: "INTEGER"},
  2033  	}
  2034  	if diff := testutil.Diff(newMeta2.Schema, wantSchema2); diff != "" {
  2035  		t.Errorf("new schema after autodetect, got=-, want=+\n%s", diff)
  2036  	}
  2037  
  2038  	id, _ := testTable.Identifier(StandardSQLID)
  2039  	q := client.Query(fmt.Sprintf("SELECT * FROM %s", id))
  2040  	it, err := q.Read(ctx)
  2041  	if err != nil {
  2042  		t.Fatalf("query read: %v", err)
  2043  	}
  2044  	wantRows := [][]Value{
  2045  		{"bar", int64(32), int64(314)},
  2046  	}
  2047  	checkReadAndTotalRows(t, "row check", it, wantRows)
  2048  }
  2049  
  2050  func TestIntegration_QueryExternalHivePartitioning(t *testing.T) {
  2051  	if client == nil {
  2052  		t.Skip("Integration tests skipped")
  2053  	}
  2054  	ctx := context.Background()
  2055  
  2056  	autoTable := dataset.Table(tableIDs.New())
  2057  	customTable := dataset.Table(tableIDs.New())
  2058  
  2059  	err := autoTable.Create(ctx, &TableMetadata{
  2060  		ExternalDataConfig: &ExternalDataConfig{
  2061  			SourceFormat:       Parquet,
  2062  			SourceURIs:         []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/*"},
  2063  			AutoDetect:         true,
  2064  			DecimalTargetTypes: []DecimalTargetType{StringTargetType},
  2065  			HivePartitioningOptions: &HivePartitioningOptions{
  2066  				Mode:                   AutoHivePartitioningMode,
  2067  				SourceURIPrefix:        "gs://cloud-samples-data/bigquery/hive-partitioning-samples/autolayout/",
  2068  				RequirePartitionFilter: true,
  2069  			},
  2070  		},
  2071  	})
  2072  	if err != nil {
  2073  		t.Fatalf("table.Create(auto): %v", err)
  2074  	}
  2075  	defer autoTable.Delete(ctx)
  2076  
  2077  	err = customTable.Create(ctx, &TableMetadata{
  2078  		ExternalDataConfig: &ExternalDataConfig{
  2079  			SourceFormat:       Parquet,
  2080  			SourceURIs:         []string{"gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/*"},
  2081  			AutoDetect:         true,
  2082  			DecimalTargetTypes: []DecimalTargetType{NumericTargetType, StringTargetType},
  2083  			HivePartitioningOptions: &HivePartitioningOptions{
  2084  				Mode:                   CustomHivePartitioningMode,
  2085  				SourceURIPrefix:        "gs://cloud-samples-data/bigquery/hive-partitioning-samples/customlayout/{pkey:STRING}/",
  2086  				RequirePartitionFilter: true,
  2087  			},
  2088  		},
  2089  	})
  2090  	if err != nil {
  2091  		t.Fatalf("table.Create(custom): %v", err)
  2092  	}
  2093  	defer customTable.Delete(ctx)
  2094  
  2095  	customTableSQLID, _ := customTable.Identifier(StandardSQLID)
  2096  
  2097  	// Issue a test query that prunes based on the custom hive partitioning key, and verify the result is as expected.
  2098  	sql := fmt.Sprintf("SELECT COUNT(*) as ct FROM %s WHERE pkey=\"foo\"", customTableSQLID)
  2099  	q := client.Query(sql)
  2100  	it, err := q.Read(ctx)
  2101  	if err != nil {
  2102  		t.Fatalf("Error querying: %v", err)
  2103  	}
  2104  	checkReadAndTotalRows(t, "HiveQuery", it, [][]Value{{int64(50)}})
  2105  }
  2106  
  2107  func TestIntegration_QuerySessionSupport(t *testing.T) {
  2108  	if client == nil {
  2109  		t.Skip("Integration tests skipped")
  2110  	}
  2111  	ctx := context.Background()
  2112  
  2113  	q := client.Query("CREATE TEMPORARY TABLE temptable AS SELECT 17 as foo")
  2114  	q.CreateSession = true
  2115  	jobStats, _, err := runQueryJob(ctx, q)
  2116  	if err != nil {
  2117  		t.Fatalf("error running CREATE TEMPORARY TABLE: %v", err)
  2118  	}
  2119  	if jobStats.SessionInfo == nil {
  2120  		t.Fatalf("expected session info, was nil")
  2121  	}
  2122  	sessionID := jobStats.SessionInfo.SessionID
  2123  	if len(sessionID) == 0 {
  2124  		t.Errorf("expected non-empty sessionID")
  2125  	}
  2126  
  2127  	q2 := client.Query("SELECT * FROM temptable")
  2128  	q2.ConnectionProperties = []*ConnectionProperty{
  2129  		{Key: "session_id", Value: sessionID},
  2130  	}
  2131  	jobStats, _, err = runQueryJob(ctx, q2)
  2132  	if err != nil {
  2133  		t.Errorf("error running SELECT: %v", err)
  2134  	}
  2135  	if jobStats.SessionInfo == nil {
  2136  		t.Fatalf("expected sessionInfo in second query, was nil")
  2137  	}
  2138  	got := jobStats.SessionInfo.SessionID
  2139  	if got != sessionID {
  2140  		t.Errorf("second query mismatched session ID, got %s want %s", got, sessionID)
  2141  	}
  2142  
  2143  }
  2144  
  2145  type queryParameterTestCase struct {
  2146  	name       string
  2147  	query      string
  2148  	parameters []QueryParameter
  2149  	wantRow    []Value
  2150  	wantConfig interface{}
  2151  }
  2152  
  2153  var (
  2154  	queryParameterTestCases = []queryParameterTestCase{}
  2155  )
  2156  
  2157  func initQueryParameterTestCases() {
  2158  	d := civil.Date{Year: 2016, Month: 3, Day: 20}
  2159  	tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 3008}
  2160  	rtm := tm
  2161  	rtm.Nanosecond = 3000 // round to microseconds
  2162  	dtm := civil.DateTime{Date: d, Time: tm}
  2163  	ts := time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC)
  2164  	rat := big.NewRat(13, 10)
  2165  	bigRat := big.NewRat(12345, 10e10)
  2166  	rangeTimestamp1 := &RangeValue{
  2167  		Start: time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC),
  2168  	}
  2169  	rangeTimestamp2 := &RangeValue{
  2170  		End: time.Date(2016, 3, 20, 15, 04, 05, 0, time.UTC),
  2171  	}
  2172  
  2173  	type ss struct {
  2174  		String string
  2175  	}
  2176  
  2177  	type s struct {
  2178  		Timestamp      time.Time
  2179  		StringArray    []string
  2180  		SubStruct      ss
  2181  		SubStructArray []ss
  2182  	}
  2183  
  2184  	queryParameterTestCases = []queryParameterTestCase{
  2185  		{
  2186  			"Int64Param",
  2187  			"SELECT @val",
  2188  			[]QueryParameter{{Name: "val", Value: 1}},
  2189  			[]Value{int64(1)},
  2190  			int64(1),
  2191  		},
  2192  		{
  2193  			"FloatParam",
  2194  			"SELECT @val",
  2195  			[]QueryParameter{{Name: "val", Value: 1.3}},
  2196  			[]Value{1.3},
  2197  			1.3,
  2198  		},
  2199  		{
  2200  			"BigRatParam",
  2201  			"SELECT @val",
  2202  			[]QueryParameter{{Name: "val", Value: rat}},
  2203  			[]Value{rat},
  2204  			rat,
  2205  		},
  2206  		{
  2207  			"BoolParam",
  2208  			"SELECT @val",
  2209  			[]QueryParameter{{Name: "val", Value: true}},
  2210  			[]Value{true},
  2211  			true,
  2212  		},
  2213  		{
  2214  			"StringParam",
  2215  			"SELECT @val",
  2216  			[]QueryParameter{{Name: "val", Value: "ABC"}},
  2217  			[]Value{"ABC"},
  2218  			"ABC",
  2219  		},
  2220  		{
  2221  			"ByteParam",
  2222  			"SELECT @val",
  2223  			[]QueryParameter{{Name: "val", Value: []byte("foo")}},
  2224  			[]Value{[]byte("foo")},
  2225  			[]byte("foo"),
  2226  		},
  2227  		{
  2228  			"TimestampParam",
  2229  			"SELECT @val",
  2230  			[]QueryParameter{{Name: "val", Value: ts}},
  2231  			[]Value{ts},
  2232  			ts,
  2233  		},
  2234  		{
  2235  			"TimestampArrayParam",
  2236  			"SELECT @val",
  2237  			[]QueryParameter{{Name: "val", Value: []time.Time{ts, ts}}},
  2238  			[]Value{[]Value{ts, ts}},
  2239  			[]interface{}{ts, ts},
  2240  		},
  2241  		{
  2242  			"DatetimeParam",
  2243  			"SELECT @val",
  2244  			[]QueryParameter{{Name: "val", Value: dtm}},
  2245  			[]Value{civil.DateTime{Date: d, Time: rtm}},
  2246  			civil.DateTime{Date: d, Time: rtm},
  2247  		},
  2248  		{
  2249  			"DateParam",
  2250  			"SELECT @val",
  2251  			[]QueryParameter{{Name: "val", Value: d}},
  2252  			[]Value{d},
  2253  			d,
  2254  		},
  2255  		{
  2256  			"TimeParam",
  2257  			"SELECT @val",
  2258  			[]QueryParameter{{Name: "val", Value: tm}},
  2259  			[]Value{rtm},
  2260  			rtm,
  2261  		},
  2262  		{
  2263  			"JsonParam",
  2264  			"SELECT @val",
  2265  			[]QueryParameter{
  2266  				{
  2267  					Name: "val",
  2268  					Value: &QueryParameterValue{
  2269  						Type: StandardSQLDataType{
  2270  							TypeKind: "JSON",
  2271  						},
  2272  						Value: "{\"alpha\":\"beta\"}",
  2273  					},
  2274  				},
  2275  			},
  2276  			[]Value{"{\"alpha\":\"beta\"}"},
  2277  			"{\"alpha\":\"beta\"}",
  2278  		},
  2279  		{
  2280  			"RangeUnboundedStart",
  2281  			"SELECT @val",
  2282  			[]QueryParameter{
  2283  				{
  2284  					Name: "val",
  2285  					Value: &QueryParameterValue{
  2286  						Type: StandardSQLDataType{
  2287  							TypeKind: "RANGE",
  2288  							RangeElementType: &StandardSQLDataType{
  2289  								TypeKind: "TIMESTAMP",
  2290  							},
  2291  						},
  2292  						Value: rangeTimestamp1,
  2293  					},
  2294  				},
  2295  			},
  2296  			[]Value{rangeTimestamp1},
  2297  			rangeTimestamp1,
  2298  		},
  2299  		{
  2300  			"RangeUnboundedEnd",
  2301  			"SELECT @val",
  2302  			[]QueryParameter{
  2303  				{
  2304  					Name: "val",
  2305  					Value: &QueryParameterValue{
  2306  						Type: StandardSQLDataType{
  2307  							TypeKind: "RANGE",
  2308  							RangeElementType: &StandardSQLDataType{
  2309  								TypeKind: "TIMESTAMP",
  2310  							},
  2311  						},
  2312  						Value: rangeTimestamp2,
  2313  					},
  2314  				},
  2315  			},
  2316  			[]Value{rangeTimestamp2},
  2317  			rangeTimestamp2,
  2318  		},
  2319  		{
  2320  			"NestedStructParam",
  2321  			"SELECT @val",
  2322  			[]QueryParameter{{Name: "val", Value: s{ts, []string{"a", "b"}, ss{"c"}, []ss{{"d"}, {"e"}}}}},
  2323  			[]Value{[]Value{ts, []Value{"a", "b"}, []Value{"c"}, []Value{[]Value{"d"}, []Value{"e"}}}},
  2324  			map[string]interface{}{
  2325  				"Timestamp":   ts,
  2326  				"StringArray": []interface{}{"a", "b"},
  2327  				"SubStruct":   map[string]interface{}{"String": "c"},
  2328  				"SubStructArray": []interface{}{
  2329  					map[string]interface{}{"String": "d"},
  2330  					map[string]interface{}{"String": "e"},
  2331  				},
  2332  			},
  2333  		},
  2334  		{
  2335  			"StructFieldParam",
  2336  			"SELECT @val.Timestamp, @val.SubStruct.String",
  2337  			[]QueryParameter{{Name: "val", Value: s{Timestamp: ts, SubStruct: ss{"a"}}}},
  2338  			[]Value{ts, "a"},
  2339  			map[string]interface{}{
  2340  				"Timestamp":      ts,
  2341  				"SubStruct":      map[string]interface{}{"String": "a"},
  2342  				"StringArray":    nil,
  2343  				"SubStructArray": nil,
  2344  			},
  2345  		},
  2346  		{
  2347  			"BigNumericExplicitParam",
  2348  			"SELECT @val",
  2349  			[]QueryParameter{
  2350  				{
  2351  					Name: "val",
  2352  					Value: &QueryParameterValue{
  2353  						Type: StandardSQLDataType{
  2354  							TypeKind: "BIGNUMERIC",
  2355  						},
  2356  						Value: BigNumericString(bigRat),
  2357  					},
  2358  				},
  2359  			},
  2360  			[]Value{bigRat},
  2361  			bigRat,
  2362  		},
  2363  		{
  2364  			"StringArrayExplicitParam",
  2365  			"SELECT @val",
  2366  			[]QueryParameter{
  2367  				{
  2368  					Name: "val",
  2369  					Value: &QueryParameterValue{
  2370  						ArrayValue: []QueryParameterValue{
  2371  							{Value: "a"},
  2372  							{Value: "b"},
  2373  						},
  2374  						Type: StandardSQLDataType{
  2375  							ArrayElementType: &StandardSQLDataType{
  2376  								TypeKind: "STRING",
  2377  							},
  2378  						},
  2379  					},
  2380  				},
  2381  			},
  2382  			[]Value{[]Value{"a", "b"}},
  2383  			[]interface{}{"a", "b"},
  2384  		},
  2385  		{
  2386  			"StructExplicitParam",
  2387  			"SELECT @val",
  2388  			[]QueryParameter{
  2389  				{
  2390  					Name: "val",
  2391  					Value: &QueryParameterValue{
  2392  						StructValue: map[string]QueryParameterValue{
  2393  							"Timestamp": {
  2394  								Value: ts,
  2395  							},
  2396  							"BigNumericArray": {
  2397  								ArrayValue: []QueryParameterValue{
  2398  									{Value: BigNumericString(bigRat)},
  2399  									{Value: BigNumericString(rat)},
  2400  								},
  2401  							},
  2402  							"ArraySingleValueStruct": {
  2403  								ArrayValue: []QueryParameterValue{
  2404  									{StructValue: map[string]QueryParameterValue{
  2405  										"Number": {
  2406  											Value: int64(42),
  2407  										},
  2408  									}},
  2409  									{StructValue: map[string]QueryParameterValue{
  2410  										"Number": {
  2411  											Value: int64(43),
  2412  										},
  2413  									}},
  2414  								},
  2415  							},
  2416  							"SubStruct": {
  2417  								StructValue: map[string]QueryParameterValue{
  2418  									"String": {
  2419  										Value: "c",
  2420  									},
  2421  								},
  2422  							},
  2423  						},
  2424  						Type: StandardSQLDataType{
  2425  							StructType: &StandardSQLStructType{
  2426  								Fields: []*StandardSQLField{
  2427  									{
  2428  										Name: "Timestamp",
  2429  										Type: &StandardSQLDataType{
  2430  											TypeKind: "TIMESTAMP",
  2431  										},
  2432  									},
  2433  									{
  2434  										Name: "BigNumericArray",
  2435  										Type: &StandardSQLDataType{
  2436  											ArrayElementType: &StandardSQLDataType{
  2437  												TypeKind: "BIGNUMERIC",
  2438  											},
  2439  										},
  2440  									},
  2441  									{
  2442  										Name: "ArraySingleValueStruct",
  2443  										Type: &StandardSQLDataType{
  2444  											ArrayElementType: &StandardSQLDataType{
  2445  												StructType: &StandardSQLStructType{
  2446  													Fields: []*StandardSQLField{
  2447  														{
  2448  															Name: "Number",
  2449  															Type: &StandardSQLDataType{
  2450  																TypeKind: "INT64",
  2451  															},
  2452  														},
  2453  													},
  2454  												},
  2455  											},
  2456  										},
  2457  									},
  2458  									{
  2459  										Name: "SubStruct",
  2460  										Type: &StandardSQLDataType{
  2461  											StructType: &StandardSQLStructType{
  2462  												Fields: []*StandardSQLField{
  2463  													{
  2464  														Name: "String",
  2465  														Type: &StandardSQLDataType{
  2466  															TypeKind: "STRING",
  2467  														},
  2468  													},
  2469  												},
  2470  											},
  2471  										},
  2472  									},
  2473  								},
  2474  							},
  2475  						},
  2476  					},
  2477  				},
  2478  			},
  2479  			[]Value{[]Value{ts, []Value{bigRat, rat}, []Value{[]Value{int64(42)}, []Value{int64(43)}}, []Value{"c"}}},
  2480  			map[string]interface{}{
  2481  				"Timestamp":       ts,
  2482  				"BigNumericArray": []interface{}{bigRat, rat},
  2483  				"ArraySingleValueStruct": []interface{}{
  2484  					map[string]interface{}{"Number": int64(42)},
  2485  					map[string]interface{}{"Number": int64(43)},
  2486  				},
  2487  				"SubStruct": map[string]interface{}{"String": "c"},
  2488  			},
  2489  		},
  2490  	}
  2491  }
  2492  
  2493  func TestIntegration_QueryParameters(t *testing.T) {
  2494  	if client == nil {
  2495  		t.Skip("Integration tests skipped")
  2496  	}
  2497  	ctx := context.Background()
  2498  
  2499  	initQueryParameterTestCases()
  2500  
  2501  	for _, tc := range queryParameterTestCases {
  2502  		t.Run(tc.name, func(t *testing.T) {
  2503  			q := client.Query(tc.query)
  2504  			q.Parameters = tc.parameters
  2505  			job, err := q.Run(ctx)
  2506  			if err != nil {
  2507  				t.Fatal(err)
  2508  			}
  2509  			if job.LastStatus() == nil {
  2510  				t.Error("no LastStatus")
  2511  			}
  2512  			it, err := job.Read(ctx)
  2513  			if err != nil {
  2514  				t.Fatal(err)
  2515  			}
  2516  			checkRead(t, "QueryParameters", it, [][]Value{tc.wantRow})
  2517  			config, err := job.Config()
  2518  			if err != nil {
  2519  				t.Fatal(err)
  2520  			}
  2521  			got := config.(*QueryConfig).Parameters[0].Value
  2522  			if !testutil.Equal(got, tc.wantConfig) {
  2523  				t.Errorf("param %[1]v (%[1]T): config:\ngot %[2]v (%[2]T)\nwant %[3]v (%[3]T)",
  2524  					tc.parameters[0].Value, got, tc.wantConfig)
  2525  			}
  2526  		})
  2527  	}
  2528  }
  2529  
  2530  // This test can be merged with the TestIntegration_QueryParameters as soon as support for explicit typed query parameter lands.
  2531  // To test timestamps with different formats, we need to be able to specify the type explicitly.
  2532  func TestIntegration_TimestampFormat(t *testing.T) {
  2533  	if client == nil {
  2534  		t.Skip("Integration tests skipped")
  2535  	}
  2536  	ctx := context.Background()
  2537  	ts := time.Date(2020, 10, 15, 15, 04, 05, 0, time.UTC)
  2538  
  2539  	testCases := []struct {
  2540  		name       string
  2541  		query      string
  2542  		parameters []*bq.QueryParameter
  2543  		wantRow    []Value
  2544  		wantConfig interface{}
  2545  	}{
  2546  		{
  2547  			"Literal",
  2548  			"SELECT @val",
  2549  			[]*bq.QueryParameter{
  2550  				{
  2551  					Name: "val",
  2552  					ParameterType: &bq.QueryParameterType{
  2553  						Type: "TIMESTAMP",
  2554  					},
  2555  					ParameterValue: &bq.QueryParameterValue{
  2556  						Value: ts.Format(timestampFormat),
  2557  					},
  2558  				},
  2559  			},
  2560  			[]Value{ts},
  2561  			ts,
  2562  		},
  2563  		{
  2564  			"RFC3339Nano",
  2565  			"SELECT @val",
  2566  			[]*bq.QueryParameter{
  2567  				{
  2568  					Name: "val",
  2569  					ParameterType: &bq.QueryParameterType{
  2570  						Type: "TIMESTAMP",
  2571  					},
  2572  					ParameterValue: &bq.QueryParameterValue{
  2573  						Value: ts.Format(time.RFC3339Nano),
  2574  					},
  2575  				},
  2576  			},
  2577  			[]Value{ts},
  2578  			ts,
  2579  		},
  2580  		{
  2581  			"DatetimeFormat",
  2582  			"SELECT @val",
  2583  			[]*bq.QueryParameter{
  2584  				{
  2585  					Name: "val",
  2586  					ParameterType: &bq.QueryParameterType{
  2587  						Type: "TIMESTAMP",
  2588  					},
  2589  					ParameterValue: &bq.QueryParameterValue{
  2590  						Value: ts.Format(dateTimeFormat),
  2591  					},
  2592  				},
  2593  			},
  2594  			[]Value{ts},
  2595  			ts,
  2596  		},
  2597  		{
  2598  			"RFC3339",
  2599  			"SELECT @val",
  2600  			[]*bq.QueryParameter{
  2601  				{
  2602  					Name: "val",
  2603  					ParameterType: &bq.QueryParameterType{
  2604  						Type: "TIMESTAMP",
  2605  					},
  2606  					ParameterValue: &bq.QueryParameterValue{
  2607  						Value: ts.Format(time.RFC3339),
  2608  					},
  2609  				},
  2610  			},
  2611  			[]Value{ts},
  2612  			ts,
  2613  		},
  2614  	}
  2615  	for _, tc := range testCases {
  2616  		t.Run(tc.name, func(t *testing.T) {
  2617  			q := client.Query(tc.query)
  2618  			bqJob, err := q.newJob()
  2619  			if err != nil {
  2620  				t.Fatal(err)
  2621  			}
  2622  			bqJob.Configuration.Query.QueryParameters = tc.parameters
  2623  
  2624  			job, err := q.client.insertJob(ctx, bqJob, nil)
  2625  			if err != nil {
  2626  				t.Fatal(err)
  2627  			}
  2628  			if job.LastStatus() == nil {
  2629  				t.Error("no LastStatus")
  2630  			}
  2631  			it, err := job.Read(ctx)
  2632  			if err != nil {
  2633  				t.Fatal(err)
  2634  			}
  2635  			checkRead(t, "QueryParameters", it, [][]Value{tc.wantRow})
  2636  			config, err := job.Config()
  2637  			if err != nil {
  2638  				t.Fatal(err)
  2639  			}
  2640  			got := config.(*QueryConfig).Parameters[0].Value
  2641  			if !testutil.Equal(got, tc.wantConfig) {
  2642  				t.Errorf("param %[1]v (%[1]T): config:\ngot %[2]v (%[2]T)\nwant %[3]v (%[3]T)",
  2643  					tc.parameters[0].ParameterValue.Value, got, tc.wantConfig)
  2644  			}
  2645  		})
  2646  	}
  2647  }
  2648  
  2649  func TestIntegration_QueryDryRun(t *testing.T) {
  2650  	if client == nil {
  2651  		t.Skip("Integration tests skipped")
  2652  	}
  2653  	ctx := context.Background()
  2654  	q := client.Query("SELECT word from " + stdName + " LIMIT 10")
  2655  	q.DryRun = true
  2656  	job, err := q.Run(ctx)
  2657  	if err != nil {
  2658  		t.Fatal(err)
  2659  	}
  2660  
  2661  	s := job.LastStatus()
  2662  	if s.State != Done {
  2663  		t.Errorf("state is %v, expected Done", s.State)
  2664  	}
  2665  	if s.Statistics == nil {
  2666  		t.Fatal("no statistics")
  2667  	}
  2668  	if s.Statistics.Details.(*QueryStatistics).Schema == nil {
  2669  		t.Fatal("no schema")
  2670  	}
  2671  	if s.Statistics.Details.(*QueryStatistics).TotalBytesProcessedAccuracy == "" {
  2672  		t.Fatal("no cost accuracy")
  2673  	}
  2674  }
  2675  
  2676  func TestIntegration_Scripting(t *testing.T) {
  2677  	if client == nil {
  2678  		t.Skip("Integration tests skipped")
  2679  	}
  2680  	ctx := context.Background()
  2681  	sql := `
  2682  	-- Declare a variable to hold names as an array.
  2683  	DECLARE top_names ARRAY<STRING>;
  2684  	BEGIN TRANSACTION;
  2685  	-- Build an array of the top 100 names from the year 2017.
  2686  	SET top_names = (
  2687  	  SELECT ARRAY_AGG(name ORDER BY number DESC LIMIT 100)
  2688  	  FROM ` + "`bigquery-public-data`" + `.usa_names.usa_1910_current
  2689  	  WHERE year = 2017
  2690  	);
  2691  	-- Which names appear as words in Shakespeare's plays?
  2692  	SELECT
  2693  	  name AS shakespeare_name
  2694  	FROM UNNEST(top_names) AS name
  2695  	WHERE name IN (
  2696  	  SELECT word
  2697  	  FROM ` + "`bigquery-public-data`" + `.samples.shakespeare
  2698  	);
  2699  	COMMIT TRANSACTION;
  2700  	`
  2701  	q := client.Query(sql)
  2702  	job, err := q.Run(ctx)
  2703  	if err != nil {
  2704  		t.Fatalf("failed to run parent job: %v", err)
  2705  	}
  2706  	status, err := job.Wait(ctx)
  2707  	if err != nil {
  2708  		t.Fatalf("job %q failed to wait for completion: %v", job.ID(), err)
  2709  	}
  2710  	if status.Err() != nil {
  2711  		t.Fatalf("job %q terminated with error: %v", job.ID(), err)
  2712  	}
  2713  
  2714  	queryStats, ok := status.Statistics.Details.(*QueryStatistics)
  2715  	if !ok {
  2716  		t.Fatalf("failed to fetch query statistics")
  2717  	}
  2718  
  2719  	want := "SCRIPT"
  2720  	if queryStats.StatementType != want {
  2721  		t.Errorf("statement type mismatch. got %s want %s", queryStats.StatementType, want)
  2722  	}
  2723  
  2724  	if status.Statistics.NumChildJobs <= 0 {
  2725  		t.Errorf("expected script to indicate nonzero child jobs, got %d", status.Statistics.NumChildJobs)
  2726  	}
  2727  
  2728  	// Ensure child jobs are present.
  2729  	var childJobs []*Job
  2730  
  2731  	it := job.Children(ctx)
  2732  	for {
  2733  		job, err := it.Next()
  2734  		if err == iterator.Done {
  2735  			break
  2736  		}
  2737  		if err != nil {
  2738  			t.Fatal(err)
  2739  		}
  2740  		childJobs = append(childJobs, job)
  2741  	}
  2742  	if len(childJobs) == 0 {
  2743  		t.Fatal("Script had no child jobs.")
  2744  	}
  2745  
  2746  	for _, cj := range childJobs {
  2747  		cStatus := cj.LastStatus()
  2748  		if cStatus.Statistics.ParentJobID != job.ID() {
  2749  			t.Errorf("child job %q doesn't indicate parent.  got %q, want %q", cj.ID(), cStatus.Statistics.ParentJobID, job.ID())
  2750  		}
  2751  		if cStatus.Statistics.ScriptStatistics == nil {
  2752  			t.Errorf("child job %q doesn't have script statistics present", cj.ID())
  2753  		}
  2754  		if cStatus.Statistics.ScriptStatistics.EvaluationKind == "" {
  2755  			t.Errorf("child job %q didn't indicate evaluation kind", cj.ID())
  2756  		}
  2757  		if cStatus.Statistics.TransactionInfo == nil {
  2758  			t.Errorf("child job %q didn't have transaction info present", cj.ID())
  2759  		}
  2760  		if cStatus.Statistics.TransactionInfo.TransactionID == "" {
  2761  			t.Errorf("child job %q didn't have transactionID present", cj.ID())
  2762  		}
  2763  	}
  2764  
  2765  }
  2766  
  2767  func TestIntegration_ExtractExternal(t *testing.T) {
  2768  	// Create a table, extract it to GCS, then query it externally.
  2769  	if client == nil {
  2770  		t.Skip("Integration tests skipped")
  2771  	}
  2772  	ctx := context.Background()
  2773  	schema := Schema{
  2774  		{Name: "name", Type: StringFieldType},
  2775  		{Name: "num", Type: IntegerFieldType},
  2776  	}
  2777  	table := newTable(t, schema)
  2778  	defer table.Delete(ctx)
  2779  
  2780  	// Insert table data.
  2781  	sql := fmt.Sprintf(`INSERT %s.%s (name, num)
  2782  		                VALUES ('a', 1), ('b', 2), ('c', 3)`,
  2783  		table.DatasetID, table.TableID)
  2784  	if _, _, err := runQuerySQL(ctx, sql); err != nil {
  2785  		t.Fatal(err)
  2786  	}
  2787  	// Extract to a GCS object as CSV.
  2788  	bucketName := testutil.ProjID()
  2789  	objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID)
  2790  	uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
  2791  	defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
  2792  	gr := NewGCSReference(uri)
  2793  	gr.DestinationFormat = CSV
  2794  	e := table.ExtractorTo(gr)
  2795  	job, err := e.Run(ctx)
  2796  	if err != nil {
  2797  		t.Fatal(err)
  2798  	}
  2799  	conf, err := job.Config()
  2800  	if err != nil {
  2801  		t.Fatal(err)
  2802  	}
  2803  	config, ok := conf.(*ExtractConfig)
  2804  	if !ok {
  2805  		t.Fatalf("got %T, want ExtractConfig", conf)
  2806  	}
  2807  	diff := testutil.Diff(config, &e.ExtractConfig,
  2808  		cmp.AllowUnexported(Table{}),
  2809  		cmpopts.IgnoreUnexported(Client{}))
  2810  	if diff != "" {
  2811  		t.Errorf("got=-, want=+:\n%s", diff)
  2812  	}
  2813  	if err := wait(ctx, job); err != nil {
  2814  		t.Fatal(err)
  2815  	}
  2816  
  2817  	edc := &ExternalDataConfig{
  2818  		SourceFormat: CSV,
  2819  		SourceURIs:   []string{uri},
  2820  		Schema:       schema,
  2821  		Options: &CSVOptions{
  2822  			SkipLeadingRows: 1,
  2823  			// This is the default. Since we use edc as an expectation later on,
  2824  			// let's just be explicit.
  2825  			FieldDelimiter: ",",
  2826  		},
  2827  	}
  2828  	// Query that CSV file directly.
  2829  	q := client.Query("SELECT * FROM csv")
  2830  	q.TableDefinitions = map[string]ExternalData{"csv": edc}
  2831  	wantRows := [][]Value{
  2832  		{"a", int64(1)},
  2833  		{"b", int64(2)},
  2834  		{"c", int64(3)},
  2835  	}
  2836  	iter, err := q.Read(ctx)
  2837  	if err != nil {
  2838  		t.Fatal(err)
  2839  	}
  2840  	checkReadAndTotalRows(t, "external query", iter, wantRows)
  2841  
  2842  	// Make a table pointing to the file, and query it.
  2843  	// BigQuery does not allow a Table.Read on an external table.
  2844  	table = dataset.Table(tableIDs.New())
  2845  	err = table.Create(context.Background(), &TableMetadata{
  2846  		Schema:             schema,
  2847  		ExpirationTime:     testTableExpiration,
  2848  		ExternalDataConfig: edc,
  2849  	})
  2850  	if err != nil {
  2851  		t.Fatal(err)
  2852  	}
  2853  	q = client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID))
  2854  	iter, err = q.Read(ctx)
  2855  	if err != nil {
  2856  		t.Fatal(err)
  2857  	}
  2858  	checkReadAndTotalRows(t, "external table", iter, wantRows)
  2859  
  2860  	// While we're here, check that the table metadata is correct.
  2861  	md, err := table.Metadata(ctx)
  2862  	if err != nil {
  2863  		t.Fatal(err)
  2864  	}
  2865  	// One difference: since BigQuery returns the schema as part of the ordinary
  2866  	// table metadata, it does not populate ExternalDataConfig.Schema.
  2867  	md.ExternalDataConfig.Schema = md.Schema
  2868  	if diff := testutil.Diff(md.ExternalDataConfig, edc); diff != "" {
  2869  		t.Errorf("got=-, want=+\n%s", diff)
  2870  	}
  2871  }
  2872  
  2873  func TestIntegration_ExportDataStatistics(t *testing.T) {
  2874  	// Create a table, extract it to GCS using EXPORT DATA statement.
  2875  	if client == nil {
  2876  		t.Skip("Integration tests skipped")
  2877  	}
  2878  	ctx := context.Background()
  2879  	schema := Schema{
  2880  		{Name: "name", Type: StringFieldType},
  2881  		{Name: "num", Type: IntegerFieldType},
  2882  	}
  2883  	table := newTable(t, schema)
  2884  	defer table.Delete(ctx)
  2885  
  2886  	// Extract to a GCS object as CSV.
  2887  	bucketName := testutil.ProjID()
  2888  	uri := fmt.Sprintf("gs://%s/bq-export-test-*.csv", bucketName)
  2889  	defer func() {
  2890  		it := storageClient.Bucket(bucketName).Objects(ctx, &storage.Query{
  2891  			MatchGlob: "bq-export-test-*.csv",
  2892  		})
  2893  		for {
  2894  			obj, err := it.Next()
  2895  			if err == iterator.Done {
  2896  				break
  2897  			}
  2898  			if err != nil {
  2899  				t.Logf("failed to iterate through bucket %q: %v", bucketName, err)
  2900  				continue
  2901  			}
  2902  			err = storageClient.Bucket(bucketName).Object(obj.Name).Delete(ctx)
  2903  		}
  2904  	}()
  2905  
  2906  	// EXPORT DATA to GCS object.
  2907  	sql := fmt.Sprintf(`EXPORT DATA
  2908  		OPTIONS (
  2909  			uri = '%s',
  2910  			format = 'CSV',
  2911  			overwrite = true,
  2912  			header = true,
  2913  			field_delimiter = ';'
  2914  		)
  2915  		AS (
  2916  			SELECT 'a' as name, 1 as num
  2917  			UNION ALL
  2918  			SELECT 'b' as name, 2 as num
  2919  			UNION ALL
  2920  			SELECT 'c' as name, 3 as num
  2921  		);`,
  2922  		uri)
  2923  	stats, _, err := runQuerySQL(ctx, sql)
  2924  	if err != nil {
  2925  		t.Fatal(err)
  2926  	}
  2927  
  2928  	qStats, ok := stats.Details.(*QueryStatistics)
  2929  	if !ok {
  2930  		t.Fatalf("expected query statistics not present")
  2931  	}
  2932  
  2933  	if qStats.ExportDataStatistics == nil {
  2934  		t.Fatal("jobStatus missing ExportDataStatistics")
  2935  	}
  2936  	if qStats.ExportDataStatistics.FileCount != 1 {
  2937  		t.Fatalf("expected ExportDataStatistics to have 1 file, but got %d files", qStats.ExportDataStatistics.FileCount)
  2938  	}
  2939  	if qStats.ExportDataStatistics.RowCount != 3 {
  2940  		t.Fatalf("expected ExportDataStatistics to have 3 rows, got %d rows", qStats.ExportDataStatistics.RowCount)
  2941  	}
  2942  }
  2943  
  2944  func TestIntegration_ReadNullIntoStruct(t *testing.T) {
  2945  	// Reading a null into a struct field should return an error (not panic).
  2946  	if client == nil {
  2947  		t.Skip("Integration tests skipped")
  2948  	}
  2949  	ctx := context.Background()
  2950  	table := newTable(t, schema)
  2951  	defer table.Delete(ctx)
  2952  
  2953  	ins := table.Inserter()
  2954  	row := &ValuesSaver{
  2955  		Schema: schema,
  2956  		Row:    []Value{nil, []Value{}, []Value{nil}},
  2957  	}
  2958  	if err := ins.Put(ctx, []*ValuesSaver{row}); err != nil {
  2959  		t.Fatal(putError(err))
  2960  	}
  2961  	if err := waitForRow(ctx, table); err != nil {
  2962  		t.Fatal(err)
  2963  	}
  2964  
  2965  	q := client.Query(fmt.Sprintf("select name from %s", table.TableID))
  2966  	q.DefaultProjectID = dataset.ProjectID
  2967  	q.DefaultDatasetID = dataset.DatasetID
  2968  	it, err := q.Read(ctx)
  2969  	if err != nil {
  2970  		t.Fatal(err)
  2971  	}
  2972  	type S struct{ Name string }
  2973  	var s S
  2974  	if err := it.Next(&s); err == nil {
  2975  		t.Fatal("got nil, want error")
  2976  	}
  2977  }
  2978  
  2979  const (
  2980  	stdName    = "`bigquery-public-data.samples.shakespeare`"
  2981  	legacyName = "[bigquery-public-data:samples.shakespeare]"
  2982  )
  2983  
  2984  // These tests exploit the fact that the two SQL versions have different syntaxes for
  2985  // fully-qualified table names.
  2986  var useLegacySQLTests = []struct {
  2987  	t           string // name of table
  2988  	std, legacy bool   // use standard/legacy SQL
  2989  	err         bool   // do we expect an error?
  2990  }{
  2991  	{t: legacyName, std: false, legacy: true, err: false},
  2992  	{t: legacyName, std: true, legacy: false, err: true},
  2993  	{t: legacyName, std: false, legacy: false, err: true}, // standard SQL is default
  2994  	{t: legacyName, std: true, legacy: true, err: true},
  2995  	{t: stdName, std: false, legacy: true, err: true},
  2996  	{t: stdName, std: true, legacy: false, err: false},
  2997  	{t: stdName, std: false, legacy: false, err: false}, // standard SQL is default
  2998  	{t: stdName, std: true, legacy: true, err: true},
  2999  }
  3000  
  3001  func TestIntegration_QueryUseLegacySQL(t *testing.T) {
  3002  	// Test the UseLegacySQL and UseStandardSQL options for queries.
  3003  	if client == nil {
  3004  		t.Skip("Integration tests skipped")
  3005  	}
  3006  	ctx := context.Background()
  3007  	for _, test := range useLegacySQLTests {
  3008  		q := client.Query(fmt.Sprintf("select word from %s limit 1", test.t))
  3009  		q.UseStandardSQL = test.std
  3010  		q.UseLegacySQL = test.legacy
  3011  		_, err := q.Read(ctx)
  3012  		gotErr := err != nil
  3013  		if gotErr && !test.err {
  3014  			t.Errorf("%+v:\nunexpected error: %v", test, err)
  3015  		} else if !gotErr && test.err {
  3016  			t.Errorf("%+v:\nsucceeded, but want error", test)
  3017  		}
  3018  	}
  3019  }
  3020  
  3021  func TestIntegration_ListJobs(t *testing.T) {
  3022  	// It's difficult to test the list of jobs, because we can't easily
  3023  	// control what's in it. Also, there are many jobs in the test project,
  3024  	// and it takes considerable time to list them all.
  3025  	if client == nil {
  3026  		t.Skip("Integration tests skipped")
  3027  	}
  3028  	ctx := context.Background()
  3029  
  3030  	// About all we can do is list a few jobs.
  3031  	const max = 20
  3032  	var jobs []*Job
  3033  	it := client.Jobs(ctx)
  3034  	for {
  3035  		job, err := it.Next()
  3036  		if err == iterator.Done {
  3037  			break
  3038  		}
  3039  		if err != nil {
  3040  			t.Fatal(err)
  3041  		}
  3042  		jobs = append(jobs, job)
  3043  		if len(jobs) >= max {
  3044  			break
  3045  		}
  3046  	}
  3047  	// We expect that there is at least one job in the last few months.
  3048  	if len(jobs) == 0 {
  3049  		t.Fatal("did not get any jobs")
  3050  	}
  3051  }
  3052  
  3053  func TestIntegration_DeleteJob(t *testing.T) {
  3054  	if client == nil {
  3055  		t.Skip("Integration tests skipped")
  3056  	}
  3057  	ctx := context.Background()
  3058  
  3059  	q := client.Query("SELECT 17 as foo")
  3060  	q.Location = "us-east1"
  3061  
  3062  	job, err := q.Run(ctx)
  3063  	if err != nil {
  3064  		t.Fatalf("job Run failure: %v", err)
  3065  	}
  3066  	err = wait(ctx, job)
  3067  	if err != nil {
  3068  		t.Fatalf("job %q completion failure: %v", job.ID(), err)
  3069  	}
  3070  
  3071  	if err := job.Delete(ctx); err != nil {
  3072  		t.Fatalf("job.Delete failed: %v", err)
  3073  	}
  3074  }
  3075  
  3076  const tokyo = "asia-northeast1"
  3077  
  3078  func TestIntegration_Location(t *testing.T) {
  3079  	if client == nil {
  3080  		t.Skip("Integration tests skipped")
  3081  	}
  3082  	client.Location = ""
  3083  	testLocation(t, tokyo)
  3084  	client.Location = tokyo
  3085  	defer func() {
  3086  		client.Location = ""
  3087  	}()
  3088  	testLocation(t, "")
  3089  }
  3090  
  3091  func testLocation(t *testing.T, loc string) {
  3092  	ctx := context.Background()
  3093  	tokyoDataset := client.Dataset("tokyo")
  3094  	err := tokyoDataset.Create(ctx, &DatasetMetadata{Location: loc})
  3095  	if err != nil && !hasStatusCode(err, 409) { // 409 = already exists
  3096  		t.Fatal(err)
  3097  	}
  3098  	md, err := tokyoDataset.Metadata(ctx)
  3099  	if err != nil {
  3100  		t.Fatal(err)
  3101  	}
  3102  	if md.Location != tokyo {
  3103  		t.Fatalf("dataset location: got %s, want %s", md.Location, tokyo)
  3104  	}
  3105  	table := tokyoDataset.Table(tableIDs.New())
  3106  	err = table.Create(context.Background(), &TableMetadata{
  3107  		Schema: Schema{
  3108  			{Name: "name", Type: StringFieldType},
  3109  			{Name: "nums", Type: IntegerFieldType},
  3110  		},
  3111  		ExpirationTime: testTableExpiration,
  3112  	})
  3113  	if err != nil {
  3114  		t.Fatal(err)
  3115  	}
  3116  
  3117  	tableMetadata, err := table.Metadata(ctx)
  3118  	if err != nil {
  3119  		t.Fatalf("failed to get table metadata: %v", err)
  3120  	}
  3121  	wantLoc := loc
  3122  	if loc == "" && client.Location != "" {
  3123  		wantLoc = client.Location
  3124  	}
  3125  	if tableMetadata.Location != wantLoc {
  3126  		t.Errorf("Location on table doesn't match.  Got %s want %s", tableMetadata.Location, wantLoc)
  3127  	}
  3128  	defer table.Delete(ctx)
  3129  	loader := table.LoaderFrom(NewReaderSource(strings.NewReader("a,0\nb,1\nc,2\n")))
  3130  	loader.Location = loc
  3131  	job, err := loader.Run(ctx)
  3132  	if err != nil {
  3133  		t.Fatal("loader.Run", err)
  3134  	}
  3135  	if job.Location() != tokyo {
  3136  		t.Fatalf("job location: got %s, want %s", job.Location(), tokyo)
  3137  	}
  3138  	_, err = client.JobFromID(ctx, job.ID())
  3139  	if client.Location == "" && err == nil {
  3140  		t.Error("JobFromID with Tokyo job, no client location: want error, got nil")
  3141  	}
  3142  	if client.Location != "" && err != nil {
  3143  		t.Errorf("JobFromID with Tokyo job, with client location: want nil, got %v", err)
  3144  	}
  3145  	_, err = client.JobFromIDLocation(ctx, job.ID(), "US")
  3146  	if err == nil {
  3147  		t.Error("JobFromIDLocation with US: want error, got nil")
  3148  	}
  3149  	job2, err := client.JobFromIDLocation(ctx, job.ID(), loc)
  3150  	if loc == tokyo && err != nil {
  3151  		t.Errorf("loc=tokyo: %v", err)
  3152  	}
  3153  	if loc == "" && err == nil {
  3154  		t.Error("loc empty: got nil, want error")
  3155  	}
  3156  	if job2 != nil && (job2.ID() != job.ID() || job2.Location() != tokyo) {
  3157  		t.Errorf("got id %s loc %s, want id%s loc %s", job2.ID(), job2.Location(), job.ID(), tokyo)
  3158  	}
  3159  	if err := wait(ctx, job); err != nil {
  3160  		t.Fatal(err)
  3161  	}
  3162  	// Cancel should succeed even if the job is done.
  3163  	if err := job.Cancel(ctx); err != nil {
  3164  		t.Fatal(err)
  3165  	}
  3166  
  3167  	q := client.Query(fmt.Sprintf("SELECT * FROM %s.%s", table.DatasetID, table.TableID))
  3168  	q.Location = loc
  3169  	iter, err := q.Read(ctx)
  3170  	if err != nil {
  3171  		t.Fatal(err)
  3172  	}
  3173  	wantRows := [][]Value{
  3174  		{"a", int64(0)},
  3175  		{"b", int64(1)},
  3176  		{"c", int64(2)},
  3177  	}
  3178  	checkRead(t, "location", iter, wantRows)
  3179  
  3180  	table2 := tokyoDataset.Table(tableIDs.New())
  3181  	copier := table2.CopierFrom(table)
  3182  	copier.Location = loc
  3183  	if _, err := copier.Run(ctx); err != nil {
  3184  		t.Fatal(err)
  3185  	}
  3186  	bucketName := testutil.ProjID()
  3187  	objectName := fmt.Sprintf("bq-test-%s.csv", table.TableID)
  3188  	uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
  3189  	defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
  3190  	gr := NewGCSReference(uri)
  3191  	gr.DestinationFormat = CSV
  3192  	e := table.ExtractorTo(gr)
  3193  	e.Location = loc
  3194  	if _, err := e.Run(ctx); err != nil {
  3195  		t.Fatal(err)
  3196  	}
  3197  }
  3198  
  3199  func TestIntegration_NumericErrors(t *testing.T) {
  3200  	// Verify that the service returns an error for a big.Rat that's too large.
  3201  	if client == nil {
  3202  		t.Skip("Integration tests skipped")
  3203  	}
  3204  	ctx := context.Background()
  3205  	schema := Schema{{Name: "n", Type: NumericFieldType}}
  3206  	table := newTable(t, schema)
  3207  	defer table.Delete(ctx)
  3208  	tooBigRat := &big.Rat{}
  3209  	if _, ok := tooBigRat.SetString("1e40"); !ok {
  3210  		t.Fatal("big.Rat.SetString failed")
  3211  	}
  3212  	ins := table.Inserter()
  3213  	err := ins.Put(ctx, []*ValuesSaver{{Schema: schema, Row: []Value{tooBigRat}}})
  3214  	if err == nil {
  3215  		t.Fatal("got nil, want error")
  3216  	}
  3217  }
  3218  
  3219  func TestIntegration_QueryErrors(t *testing.T) {
  3220  	// Verify that a bad query returns an appropriate error.
  3221  	if client == nil {
  3222  		t.Skip("Integration tests skipped")
  3223  	}
  3224  	ctx := context.Background()
  3225  	q := client.Query("blah blah broken")
  3226  	_, err := q.Read(ctx)
  3227  	const want = "invalidQuery"
  3228  	if !strings.Contains(err.Error(), want) {
  3229  		t.Fatalf("got %q, want substring %q", err, want)
  3230  	}
  3231  }
  3232  
  3233  func TestIntegration_MaterializedViewLifecycle(t *testing.T) {
  3234  	if client == nil {
  3235  		t.Skip("Integration tests skipped")
  3236  	}
  3237  	ctx := context.Background()
  3238  
  3239  	// instantiate a base table via a CTAS
  3240  	baseTableID := tableIDs.New()
  3241  	qualified := fmt.Sprintf("`%s`.%s.%s", testutil.ProjID(), dataset.DatasetID, baseTableID)
  3242  	sql := fmt.Sprintf(`
  3243  	CREATE TABLE %s
  3244  	(
  3245  		sample_value INT64,
  3246  		groupid STRING,
  3247  	)
  3248  	AS
  3249  	SELECT
  3250  	  CAST(RAND() * 100 AS INT64),
  3251  	  CONCAT("group", CAST(CAST(RAND()*10 AS INT64) AS STRING))
  3252  	FROM
  3253  	  UNNEST(GENERATE_ARRAY(0,999))
  3254  	`, qualified)
  3255  	if _, _, err := runQuerySQL(ctx, sql); err != nil {
  3256  		t.Fatalf("couldn't instantiate base table: %v", err)
  3257  	}
  3258  
  3259  	// Define the SELECT aggregation to become a mat view
  3260  	sql = fmt.Sprintf(`
  3261  	SELECT
  3262  	  SUM(sample_value) as total,
  3263  	  groupid
  3264  	FROM
  3265  	  %s
  3266  	GROUP BY groupid
  3267  	`, qualified)
  3268  
  3269  	// Create materialized view
  3270  
  3271  	wantRefresh := 6 * time.Hour
  3272  	matViewID := tableIDs.New()
  3273  	view := dataset.Table(matViewID)
  3274  	if err := view.Create(ctx, &TableMetadata{
  3275  		MaterializedView: &MaterializedViewDefinition{
  3276  			Query:           sql,
  3277  			RefreshInterval: wantRefresh,
  3278  		}}); err != nil {
  3279  		t.Fatal(err)
  3280  	}
  3281  
  3282  	// Get metadata
  3283  	curMeta, err := view.Metadata(ctx)
  3284  	if err != nil {
  3285  		t.Fatal(err)
  3286  	}
  3287  
  3288  	if curMeta.MaterializedView == nil {
  3289  		t.Fatal("expected materialized view definition, was null")
  3290  	}
  3291  
  3292  	if curMeta.MaterializedView.Query != sql {
  3293  		t.Errorf("mismatch on view sql.  Got %s want %s", curMeta.MaterializedView.Query, sql)
  3294  	}
  3295  
  3296  	if curMeta.MaterializedView.RefreshInterval != wantRefresh {
  3297  		t.Errorf("mismatch on refresh time: got %d usec want %d usec", 1000*curMeta.MaterializedView.RefreshInterval.Nanoseconds(), 1000*wantRefresh.Nanoseconds())
  3298  	}
  3299  
  3300  	// MaterializedView is a TableType constant
  3301  	want := MaterializedView
  3302  	if curMeta.Type != want {
  3303  		t.Errorf("mismatch on table type.  got %s want %s", curMeta.Type, want)
  3304  	}
  3305  
  3306  	// Update metadata
  3307  	wantRefresh = time.Hour // 6hr -> 1hr
  3308  	upd := TableMetadataToUpdate{
  3309  		MaterializedView: &MaterializedViewDefinition{
  3310  			Query:           sql,
  3311  			RefreshInterval: wantRefresh,
  3312  		},
  3313  	}
  3314  
  3315  	newMeta, err := view.Update(ctx, upd, curMeta.ETag)
  3316  	if err != nil {
  3317  		t.Fatalf("failed to update view definition: %v", err)
  3318  	}
  3319  
  3320  	if newMeta.MaterializedView == nil {
  3321  		t.Error("MaterializeView missing in updated metadata")
  3322  	}
  3323  
  3324  	if newMeta.MaterializedView.RefreshInterval != wantRefresh {
  3325  		t.Errorf("mismatch on updated refresh time: got %d usec want %d usec", 1000*curMeta.MaterializedView.RefreshInterval.Nanoseconds(), 1000*wantRefresh.Nanoseconds())
  3326  	}
  3327  
  3328  	// verify implicit setting of false due to partial population of update.
  3329  	if newMeta.MaterializedView.EnableRefresh {
  3330  		t.Error("expected EnableRefresh to be false, is true")
  3331  	}
  3332  
  3333  	// Verify list
  3334  
  3335  	it := dataset.Tables(ctx)
  3336  	seen := false
  3337  	for {
  3338  		tbl, err := it.Next()
  3339  		if err == iterator.Done {
  3340  			break
  3341  		}
  3342  		if err != nil {
  3343  			t.Fatal(err)
  3344  		}
  3345  		if tbl.TableID == matViewID {
  3346  			seen = true
  3347  		}
  3348  	}
  3349  	if !seen {
  3350  		t.Error("materialized view not listed in dataset")
  3351  	}
  3352  
  3353  	// Verify deletion
  3354  	if err := view.Delete(ctx); err != nil {
  3355  		t.Errorf("failed to delete materialized view: %v", err)
  3356  	}
  3357  
  3358  }
  3359  
  3360  func TestIntegration_ModelLifecycle(t *testing.T) {
  3361  	if client == nil {
  3362  		t.Skip("Integration tests skipped")
  3363  	}
  3364  	ctx := context.Background()
  3365  
  3366  	// Create a model via a CREATE MODEL query
  3367  	modelID := modelIDs.New()
  3368  	model := dataset.Model(modelID)
  3369  	modelSQLID, _ := model.Identifier(StandardSQLID)
  3370  
  3371  	sql := fmt.Sprintf(`
  3372  		CREATE MODEL %s
  3373  		OPTIONS (
  3374  			model_type='linear_reg',
  3375  			max_iteration=1,
  3376  			learn_rate=0.4,
  3377  			learn_rate_strategy='constant'
  3378  		) AS (
  3379  			SELECT 'a' AS f1, 2.0 AS label
  3380  			UNION ALL
  3381  			SELECT 'b' AS f1, 3.8 AS label
  3382  		)`, modelSQLID)
  3383  	if _, _, err := runQuerySQL(ctx, sql); err != nil {
  3384  		t.Fatal(err)
  3385  	}
  3386  	defer model.Delete(ctx)
  3387  
  3388  	// Get the model metadata.
  3389  	curMeta, err := model.Metadata(ctx)
  3390  	if err != nil {
  3391  		t.Fatalf("couldn't get metadata: %v", err)
  3392  	}
  3393  
  3394  	want := "LINEAR_REGRESSION"
  3395  	if curMeta.Type != want {
  3396  		t.Errorf("Model type mismatch.  Want %s got %s", curMeta.Type, want)
  3397  	}
  3398  
  3399  	// Ensure training metadata is available.
  3400  	runs := curMeta.RawTrainingRuns()
  3401  	if runs == nil {
  3402  		t.Errorf("training runs unpopulated.")
  3403  	}
  3404  	labelCols, err := curMeta.RawLabelColumns()
  3405  	if err != nil {
  3406  		t.Fatalf("failed to get label cols: %v", err)
  3407  	}
  3408  	if labelCols == nil {
  3409  		t.Errorf("label column information unpopulated.")
  3410  	}
  3411  	featureCols, err := curMeta.RawFeatureColumns()
  3412  	if err != nil {
  3413  		t.Fatalf("failed to get feature cols: %v", err)
  3414  	}
  3415  	if featureCols == nil {
  3416  		t.Errorf("feature column information unpopulated.")
  3417  	}
  3418  
  3419  	// Update mutable fields via API.
  3420  	expiry := time.Now().Add(24 * time.Hour).Truncate(time.Millisecond)
  3421  
  3422  	upd := ModelMetadataToUpdate{
  3423  		Description:    "new",
  3424  		Name:           "friendly",
  3425  		ExpirationTime: expiry,
  3426  	}
  3427  
  3428  	newMeta, err := model.Update(ctx, upd, curMeta.ETag)
  3429  	if err != nil {
  3430  		t.Fatalf("failed to update: %v", err)
  3431  	}
  3432  
  3433  	want = "new"
  3434  	if newMeta.Description != want {
  3435  		t.Fatalf("Description not updated. got %s want %s", newMeta.Description, want)
  3436  	}
  3437  	want = "friendly"
  3438  	if newMeta.Name != want {
  3439  		t.Fatalf("Description not updated. got %s want %s", newMeta.Description, want)
  3440  	}
  3441  	if newMeta.ExpirationTime != expiry {
  3442  		t.Fatalf("ExpirationTime not updated.  got %v want %v", newMeta.ExpirationTime, expiry)
  3443  	}
  3444  
  3445  	// Ensure presence when enumerating the model list.
  3446  	it := dataset.Models(ctx)
  3447  	seen := false
  3448  	for {
  3449  		mdl, err := it.Next()
  3450  		if err == iterator.Done {
  3451  			break
  3452  		}
  3453  		if err != nil {
  3454  			t.Fatal(err)
  3455  		}
  3456  		if mdl.ModelID == modelID {
  3457  			seen = true
  3458  		}
  3459  	}
  3460  	if !seen {
  3461  		t.Fatal("model not listed in dataset")
  3462  	}
  3463  
  3464  	// Extract the model to GCS.
  3465  	bucketName := testutil.ProjID()
  3466  	objectName := fmt.Sprintf("bq-model-extract-%s", modelID)
  3467  	uri := fmt.Sprintf("gs://%s/%s", bucketName, objectName)
  3468  	defer storageClient.Bucket(bucketName).Object(objectName).Delete(ctx)
  3469  	gr := NewGCSReference(uri)
  3470  	gr.DestinationFormat = TFSavedModel
  3471  	extractor := model.ExtractorTo(gr)
  3472  	job, err := extractor.Run(ctx)
  3473  	if err != nil {
  3474  		t.Fatalf("failed to extract model to GCS: %v", err)
  3475  	}
  3476  	if err = wait(ctx, job); err != nil {
  3477  		t.Errorf("extract failed: %v", err)
  3478  	}
  3479  
  3480  	// Delete the model.
  3481  	if err := model.Delete(ctx); err != nil {
  3482  		t.Fatalf("failed to delete model: %v", err)
  3483  	}
  3484  }
  3485  
  3486  // Creates a new, temporary table with a unique name and the given schema.
  3487  func newTable(t *testing.T, s Schema) *Table {
  3488  	table := dataset.Table(tableIDs.New())
  3489  	err := table.Create(context.Background(), &TableMetadata{
  3490  		Schema:         s,
  3491  		ExpirationTime: testTableExpiration,
  3492  	})
  3493  	if err != nil {
  3494  		t.Fatal(err)
  3495  	}
  3496  	return table
  3497  }
  3498  
  3499  func checkRead(t *testing.T, msg string, it *RowIterator, want [][]Value) {
  3500  	if msg2, ok := compareRead(it, want, false); !ok {
  3501  		t.Errorf("%s: %s", msg, msg2)
  3502  	}
  3503  }
  3504  
  3505  func checkReadAndTotalRows(t *testing.T, msg string, it *RowIterator, want [][]Value) {
  3506  	if msg2, ok := compareRead(it, want, true); !ok {
  3507  		t.Errorf("%s: %s", msg, msg2)
  3508  	}
  3509  }
  3510  
  3511  func compareRead(it *RowIterator, want [][]Value, compareTotalRows bool) (msg string, ok bool) {
  3512  	got, _, totalRows, err := readAll(it)
  3513  	jobStr := ""
  3514  	if it.SourceJob() != nil {
  3515  		jobStr = it.SourceJob().jobID
  3516  	}
  3517  	if jobStr != "" {
  3518  		jobStr = fmt.Sprintf("(Job: %s)", jobStr)
  3519  	}
  3520  	if err != nil {
  3521  		return err.Error(), false
  3522  	}
  3523  	if len(got) != len(want) {
  3524  		return fmt.Sprintf("%s got %d rows, want %d", jobStr, len(got), len(want)), false
  3525  	}
  3526  	if compareTotalRows && len(got) != int(totalRows) {
  3527  		return fmt.Sprintf("%s got %d rows, but totalRows = %d", jobStr, len(got), totalRows), false
  3528  	}
  3529  	sort.Sort(byCol0(got))
  3530  	for i, r := range got {
  3531  		gotRow := []Value(r)
  3532  		wantRow := want[i]
  3533  		if !testutil.Equal(gotRow, wantRow) {
  3534  			return fmt.Sprintf("%s #%d: got %#v, want %#v", jobStr, i, gotRow, wantRow), false
  3535  		}
  3536  	}
  3537  	return "", true
  3538  }
  3539  
  3540  func readAll(it *RowIterator) ([][]Value, Schema, uint64, error) {
  3541  	var (
  3542  		rows      [][]Value
  3543  		schema    Schema
  3544  		totalRows uint64
  3545  	)
  3546  	for {
  3547  		var vals []Value
  3548  		err := it.Next(&vals)
  3549  		if err == iterator.Done {
  3550  			return rows, schema, totalRows, nil
  3551  		}
  3552  		if err != nil {
  3553  			return nil, nil, 0, err
  3554  		}
  3555  		rows = append(rows, vals)
  3556  		schema = it.Schema
  3557  		totalRows = it.TotalRows
  3558  	}
  3559  }
  3560  
  3561  type byCol0 [][]Value
  3562  
  3563  func (b byCol0) Len() int      { return len(b) }
  3564  func (b byCol0) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
  3565  func (b byCol0) Less(i, j int) bool {
  3566  	switch a := b[i][0].(type) {
  3567  	case string:
  3568  		return a < b[j][0].(string)
  3569  	case civil.Date:
  3570  		return a.Before(b[j][0].(civil.Date))
  3571  	default:
  3572  		panic("unknown type")
  3573  	}
  3574  }
  3575  
  3576  func hasStatusCode(err error, code int) bool {
  3577  	var e *googleapi.Error
  3578  	if ok := errors.As(err, &e); ok && e.Code == code {
  3579  		return true
  3580  	}
  3581  	return false
  3582  }
  3583  
  3584  // wait polls the job until it is complete or an error is returned.
  3585  func wait(ctx context.Context, job *Job) error {
  3586  	status, err := job.Wait(ctx)
  3587  	if err != nil {
  3588  		return fmt.Errorf("job %q error: %v", job.ID(), err)
  3589  	}
  3590  	if status.Err() != nil {
  3591  		return fmt.Errorf("job %q status error: %#v", job.ID(), status.Err())
  3592  	}
  3593  	if status.Statistics == nil {
  3594  		return fmt.Errorf("job %q nil Statistics", job.ID())
  3595  	}
  3596  	if status.Statistics.EndTime.IsZero() {
  3597  		return fmt.Errorf("job %q EndTime is zero", job.ID())
  3598  	}
  3599  	return nil
  3600  }
  3601  
  3602  // waitForRow polls the table until it contains a row.
  3603  // TODO(jba): use internal.Retry.
  3604  func waitForRow(ctx context.Context, table *Table) error {
  3605  	for {
  3606  		it := table.Read(ctx)
  3607  		var v []Value
  3608  		err := it.Next(&v)
  3609  		if err == nil {
  3610  			return nil
  3611  		}
  3612  		if err != iterator.Done {
  3613  			return err
  3614  		}
  3615  		time.Sleep(1 * time.Second)
  3616  	}
  3617  }
  3618  
  3619  func putError(err error) string {
  3620  	pme, ok := err.(PutMultiError)
  3621  	if !ok {
  3622  		return err.Error()
  3623  	}
  3624  	var msgs []string
  3625  	for _, err := range pme {
  3626  		msgs = append(msgs, err.Error())
  3627  	}
  3628  	return strings.Join(msgs, "\n")
  3629  }
  3630  

View as plain text