...

Source file src/cloud.google.com/go/bigquery/iterator_test.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2015 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"testing"
    22  
    23  	"cloud.google.com/go/internal/testutil"
    24  	"github.com/google/go-cmp/cmp"
    25  	bq "google.golang.org/api/bigquery/v2"
    26  	"google.golang.org/api/iterator"
    27  )
    28  
    29  type fetchResponse struct {
    30  	result *fetchPageResult // The result to return.
    31  	err    error            // The error to return.
    32  }
    33  
    34  // pageFetcherStub services fetch requests by returning data from an in-memory list of values.
    35  type pageFetcherStub struct {
    36  	fetchResponses map[string]fetchResponse
    37  	err            error
    38  }
    39  
    40  func (pf *pageFetcherStub) fetchPage(ctx context.Context, _ *rowSource, _ Schema, _ uint64, _ int64, pageToken string) (*fetchPageResult, error) {
    41  	call, ok := pf.fetchResponses[pageToken]
    42  	if !ok {
    43  		pf.err = fmt.Errorf("Unexpected page token: %q", pageToken)
    44  	}
    45  	return call.result, call.err
    46  }
    47  
    48  func TestRowIteratorCacheBehavior(t *testing.T) {
    49  
    50  	testSchema := &bq.TableSchema{
    51  		Fields: []*bq.TableFieldSchema{
    52  			{Type: "INTEGER", Name: "field1"},
    53  			{Type: "STRING", Name: "field2"},
    54  		},
    55  	}
    56  	testRows := []*bq.TableRow{
    57  		{F: []*bq.TableCell{
    58  			{V: "1"},
    59  			{V: "foo"},
    60  		},
    61  		},
    62  	}
    63  	convertedSchema := bqToSchema(testSchema)
    64  
    65  	convertedRows, _ := convertRows(testRows, convertedSchema)
    66  
    67  	testCases := []struct {
    68  		inSource     *rowSource
    69  		inSchema     Schema
    70  		inStartIndex uint64
    71  		inPageSize   int64
    72  		inPageToken  string
    73  		wantErr      error
    74  		wantResult   *fetchPageResult
    75  	}{
    76  		{
    77  			inSource: &rowSource{},
    78  			wantErr:  errNoCacheData,
    79  		},
    80  		{
    81  			// primary success case: schema in cache
    82  			inSource: &rowSource{
    83  				cachedSchema: testSchema,
    84  				cachedRows:   testRows,
    85  			},
    86  			wantResult: &fetchPageResult{
    87  				totalRows: uint64(len(convertedRows)),
    88  				schema:    convertedSchema,
    89  				rows:      convertedRows,
    90  			},
    91  		},
    92  		{
    93  			// secondary success case: schema provided
    94  			inSource: &rowSource{
    95  				cachedRows:      testRows,
    96  				cachedNextToken: "foo",
    97  			},
    98  			inSchema: convertedSchema,
    99  			wantResult: &fetchPageResult{
   100  				totalRows: uint64(len(convertedRows)),
   101  				schema:    convertedSchema,
   102  				rows:      convertedRows,
   103  				pageToken: "foo",
   104  			},
   105  		},
   106  		{
   107  			// misaligned page size.
   108  			inSource: &rowSource{
   109  				cachedSchema: testSchema,
   110  				cachedRows:   testRows,
   111  			},
   112  			inPageSize: 99,
   113  			wantErr:    errNoCacheData,
   114  		},
   115  		{
   116  			// nonzero start.
   117  			inSource: &rowSource{
   118  				cachedSchema: testSchema,
   119  				cachedRows:   testRows,
   120  			},
   121  			inStartIndex: 1,
   122  			wantErr:      errNoCacheData,
   123  		},
   124  		{
   125  			// data without schema
   126  			inSource: &rowSource{
   127  				cachedSchema: testSchema,
   128  				cachedRows:   testRows,
   129  			},
   130  			inStartIndex: 1,
   131  			wantErr:      errNoCacheData,
   132  		},
   133  		{
   134  			// data without schema
   135  			inSource: &rowSource{
   136  				cachedSchema: testSchema,
   137  				cachedRows:   testRows,
   138  			},
   139  			inStartIndex: 1,
   140  			wantErr:      errNoCacheData,
   141  		},
   142  	}
   143  	for _, tc := range testCases {
   144  		gotResp, gotErr := fetchCachedPage(context.Background(), tc.inSource, tc.inSchema, tc.inStartIndex, tc.inPageSize, tc.inPageToken)
   145  		if gotErr != tc.wantErr {
   146  			t.Errorf("err mismatch.  got %v, want %v", gotErr, tc.wantErr)
   147  		} else {
   148  			if diff := testutil.Diff(gotResp, tc.wantResult,
   149  				cmp.AllowUnexported(fetchPageResult{}, rowSource{}, Job{}, Client{}, Table{})); diff != "" {
   150  				t.Errorf("response diff (got=-, want=+):\n%s", diff)
   151  			}
   152  		}
   153  	}
   154  
   155  }
   156  
   157  func TestIterator(t *testing.T) {
   158  	var (
   159  		iiSchema = Schema{
   160  			{Type: IntegerFieldType},
   161  			{Type: IntegerFieldType},
   162  		}
   163  		siSchema = Schema{
   164  			{Type: StringFieldType},
   165  			{Type: IntegerFieldType},
   166  		}
   167  	)
   168  	fetchFailure := errors.New("fetch failure")
   169  
   170  	testCases := []struct {
   171  		desc           string
   172  		pageToken      string
   173  		fetchResponses map[string]fetchResponse
   174  		want           [][]Value
   175  		wantErr        error
   176  		wantSchema     Schema
   177  		wantTotalRows  uint64
   178  	}{
   179  		{
   180  			desc: "Iteration over single empty page",
   181  			fetchResponses: map[string]fetchResponse{
   182  				"": {
   183  					result: &fetchPageResult{
   184  						pageToken: "",
   185  						rows:      [][]Value{},
   186  						schema:    Schema{},
   187  					},
   188  				},
   189  			},
   190  			want:       [][]Value{},
   191  			wantSchema: Schema{},
   192  		},
   193  		{
   194  			desc: "Iteration over single page",
   195  			fetchResponses: map[string]fetchResponse{
   196  				"": {
   197  					result: &fetchPageResult{
   198  						pageToken: "",
   199  						rows:      [][]Value{{1, 2}, {11, 12}},
   200  						schema:    iiSchema,
   201  						totalRows: 4,
   202  					},
   203  				},
   204  			},
   205  			want:          [][]Value{{1, 2}, {11, 12}},
   206  			wantSchema:    iiSchema,
   207  			wantTotalRows: 4,
   208  		},
   209  		{
   210  			desc: "Iteration over single page with different schema",
   211  			fetchResponses: map[string]fetchResponse{
   212  				"": {
   213  					result: &fetchPageResult{
   214  						pageToken: "",
   215  						rows:      [][]Value{{"1", 2}, {"11", 12}},
   216  						schema:    siSchema,
   217  					},
   218  				},
   219  			},
   220  			want:       [][]Value{{"1", 2}, {"11", 12}},
   221  			wantSchema: siSchema,
   222  		},
   223  		{
   224  			desc: "Iteration over two pages",
   225  			fetchResponses: map[string]fetchResponse{
   226  				"": {
   227  					result: &fetchPageResult{
   228  						pageToken: "a",
   229  						rows:      [][]Value{{1, 2}, {11, 12}},
   230  						schema:    iiSchema,
   231  						totalRows: 4,
   232  					},
   233  				},
   234  				"a": {
   235  					result: &fetchPageResult{
   236  						pageToken: "",
   237  						rows:      [][]Value{{101, 102}, {111, 112}},
   238  						schema:    iiSchema,
   239  						totalRows: 4,
   240  					},
   241  				},
   242  			},
   243  			want:          [][]Value{{1, 2}, {11, 12}, {101, 102}, {111, 112}},
   244  			wantSchema:    iiSchema,
   245  			wantTotalRows: 4,
   246  		},
   247  		{
   248  			desc: "Server response includes empty page",
   249  			fetchResponses: map[string]fetchResponse{
   250  				"": {
   251  					result: &fetchPageResult{
   252  						pageToken: "a",
   253  						rows:      [][]Value{{1, 2}, {11, 12}},
   254  						schema:    iiSchema,
   255  					},
   256  				},
   257  				"a": {
   258  					result: &fetchPageResult{
   259  						pageToken: "b",
   260  						rows:      [][]Value{},
   261  						schema:    iiSchema,
   262  					},
   263  				},
   264  				"b": {
   265  					result: &fetchPageResult{
   266  						pageToken: "",
   267  						rows:      [][]Value{{101, 102}, {111, 112}},
   268  						schema:    iiSchema,
   269  					},
   270  				},
   271  			},
   272  			want:       [][]Value{{1, 2}, {11, 12}, {101, 102}, {111, 112}},
   273  			wantSchema: iiSchema,
   274  		},
   275  		{
   276  			desc: "Fetch error",
   277  			fetchResponses: map[string]fetchResponse{
   278  				"": {
   279  					result: &fetchPageResult{
   280  						pageToken: "a",
   281  						rows:      [][]Value{{1, 2}, {11, 12}},
   282  						schema:    iiSchema,
   283  					},
   284  				},
   285  				"a": {
   286  					// We returns some data from this fetch, but also an error.
   287  					// So the end result should include only data from the previous fetch.
   288  					err: fetchFailure,
   289  					result: &fetchPageResult{
   290  						pageToken: "b",
   291  						rows:      [][]Value{{101, 102}, {111, 112}},
   292  						schema:    iiSchema,
   293  					},
   294  				},
   295  			},
   296  			want:       [][]Value{{1, 2}, {11, 12}},
   297  			wantErr:    fetchFailure,
   298  			wantSchema: iiSchema,
   299  		},
   300  
   301  		{
   302  			desc:      "Skip over an entire page",
   303  			pageToken: "a",
   304  			fetchResponses: map[string]fetchResponse{
   305  				"": {
   306  					result: &fetchPageResult{
   307  						pageToken: "a",
   308  						rows:      [][]Value{{1, 2}, {11, 12}},
   309  						schema:    iiSchema,
   310  					},
   311  				},
   312  				"a": {
   313  					result: &fetchPageResult{
   314  						pageToken: "",
   315  						rows:      [][]Value{{101, 102}, {111, 112}},
   316  						schema:    iiSchema,
   317  					},
   318  				},
   319  			},
   320  			want:       [][]Value{{101, 102}, {111, 112}},
   321  			wantSchema: iiSchema,
   322  		},
   323  
   324  		{
   325  			desc:      "Skip beyond all data",
   326  			pageToken: "b",
   327  			fetchResponses: map[string]fetchResponse{
   328  				"": {
   329  					result: &fetchPageResult{
   330  						pageToken: "a",
   331  						rows:      [][]Value{{1, 2}, {11, 12}},
   332  						schema:    iiSchema,
   333  					},
   334  				},
   335  				"a": {
   336  					result: &fetchPageResult{
   337  						pageToken: "b",
   338  						rows:      [][]Value{{101, 102}, {111, 112}},
   339  						schema:    iiSchema,
   340  					},
   341  				},
   342  				"b": {
   343  					result: &fetchPageResult{},
   344  				},
   345  			},
   346  			// In this test case, Next will return false on its first call,
   347  			// so we won't even attempt to call Get.
   348  			want:       [][]Value{},
   349  			wantSchema: Schema{},
   350  		},
   351  	}
   352  
   353  	for _, tc := range testCases {
   354  		pf := &pageFetcherStub{
   355  			fetchResponses: tc.fetchResponses,
   356  		}
   357  		it := newRowIterator(context.Background(), nil, pf.fetchPage)
   358  		it.PageInfo().Token = tc.pageToken
   359  		values, schema, totalRows, err := consumeRowIterator(it)
   360  		if err != tc.wantErr {
   361  			t.Fatalf("%s: got %v, want %v", tc.desc, err, tc.wantErr)
   362  		}
   363  		if (len(values) != 0 || len(tc.want) != 0) && !testutil.Equal(values, tc.want) {
   364  			t.Errorf("%s: values:\ngot: %v\nwant:%v", tc.desc, values, tc.want)
   365  		}
   366  		if (len(schema) != 0 || len(tc.wantSchema) != 0) && !testutil.Equal(schema, tc.wantSchema) {
   367  			t.Errorf("%s: iterator.Schema:\ngot: %v\nwant: %v", tc.desc, schema, tc.wantSchema)
   368  		}
   369  		if totalRows != tc.wantTotalRows {
   370  			t.Errorf("%s: totalRows: got %d, want %d", tc.desc, totalRows, tc.wantTotalRows)
   371  		}
   372  	}
   373  }
   374  
   375  // consumeRowIterator reads the schema and all values from a RowIterator and returns them.
   376  func consumeRowIterator(it *RowIterator) ([][]Value, Schema, uint64, error) {
   377  	var (
   378  		got       [][]Value
   379  		schema    Schema
   380  		totalRows uint64
   381  	)
   382  	for {
   383  		var vls []Value
   384  		err := it.Next(&vls)
   385  		if err == iterator.Done {
   386  			return got, schema, totalRows, nil
   387  		}
   388  		if err != nil {
   389  			return got, schema, totalRows, err
   390  		}
   391  		got = append(got, vls)
   392  		schema = it.Schema
   393  		totalRows = it.TotalRows
   394  	}
   395  }
   396  
   397  func TestNextDuringErrorState(t *testing.T) {
   398  	pf := &pageFetcherStub{
   399  		fetchResponses: map[string]fetchResponse{
   400  			"": {err: errors.New("bang")},
   401  		},
   402  	}
   403  	it := newRowIterator(context.Background(), nil, pf.fetchPage)
   404  	var vals []Value
   405  	if err := it.Next(&vals); err == nil {
   406  		t.Errorf("Expected error after calling Next")
   407  	}
   408  	if err := it.Next(&vals); err == nil {
   409  		t.Errorf("Expected error calling Next again when iterator has a non-nil error.")
   410  	}
   411  }
   412  
   413  func TestNextAfterFinished(t *testing.T) {
   414  	testCases := []struct {
   415  		fetchResponses map[string]fetchResponse
   416  		want           [][]Value
   417  	}{
   418  		{
   419  			fetchResponses: map[string]fetchResponse{
   420  				"": {
   421  					result: &fetchPageResult{
   422  						pageToken: "",
   423  						rows:      [][]Value{{1, 2}, {11, 12}},
   424  					},
   425  				},
   426  			},
   427  			want: [][]Value{{1, 2}, {11, 12}},
   428  		},
   429  		{
   430  			fetchResponses: map[string]fetchResponse{
   431  				"": {
   432  					result: &fetchPageResult{
   433  						pageToken: "",
   434  						rows:      [][]Value{},
   435  					},
   436  				},
   437  			},
   438  			want: [][]Value{},
   439  		},
   440  	}
   441  
   442  	for _, tc := range testCases {
   443  		pf := &pageFetcherStub{
   444  			fetchResponses: tc.fetchResponses,
   445  		}
   446  		it := newRowIterator(context.Background(), nil, pf.fetchPage)
   447  
   448  		values, _, _, err := consumeRowIterator(it)
   449  		if err != nil {
   450  			t.Fatal(err)
   451  		}
   452  		if (len(values) != 0 || len(tc.want) != 0) && !testutil.Equal(values, tc.want) {
   453  			t.Errorf("values: got:\n%v\nwant:\n%v", values, tc.want)
   454  		}
   455  		// Try calling Get again.
   456  		var vals []Value
   457  		if err := it.Next(&vals); err != iterator.Done {
   458  			t.Errorf("Expected Done calling Next when there are no more values")
   459  		}
   460  	}
   461  }
   462  
   463  func TestIteratorNextTypes(t *testing.T) {
   464  	it := newRowIterator(context.Background(), nil, nil)
   465  	for _, v := range []interface{}{3, "s", []int{}, &[]int{},
   466  		map[string]Value{}, &map[string]interface{}{},
   467  		struct{}{},
   468  	} {
   469  		if err := it.Next(v); err == nil {
   470  			t.Errorf("%v: want error, got nil", v)
   471  		}
   472  	}
   473  }
   474  
   475  func TestIteratorSourceJob(t *testing.T) {
   476  	testcases := []struct {
   477  		description string
   478  		src         *rowSource
   479  		wantJob     *Job
   480  	}{
   481  		{
   482  			description: "nil source",
   483  			src:         nil,
   484  			wantJob:     nil,
   485  		},
   486  		{
   487  			description: "empty source",
   488  			src:         &rowSource{},
   489  			wantJob:     nil,
   490  		},
   491  		{
   492  			description: "table source",
   493  			src:         &rowSource{t: &Table{ProjectID: "p", DatasetID: "d", TableID: "t"}},
   494  			wantJob:     nil,
   495  		},
   496  		{
   497  			description: "job source",
   498  			src:         &rowSource{j: &Job{projectID: "p", location: "l", jobID: "j"}},
   499  			wantJob:     &Job{projectID: "p", location: "l", jobID: "j"},
   500  		},
   501  	}
   502  
   503  	for _, tc := range testcases {
   504  		// Don't pass a page func, we're not reading from the iterator.
   505  		it := newRowIterator(context.Background(), tc.src, nil)
   506  		got := it.SourceJob()
   507  		// AllowUnexported because of the embedded client reference, which we're ignoring.
   508  		if !cmp.Equal(got, tc.wantJob, cmp.AllowUnexported(Job{})) {
   509  			t.Errorf("%s: mismatch on SourceJob, got %v want %v", tc.description, got, tc.wantJob)
   510  		}
   511  	}
   512  }
   513  
   514  func TestIteratorQueryID(t *testing.T) {
   515  	testcases := []struct {
   516  		description string
   517  		src         *rowSource
   518  		want        string
   519  	}{
   520  		{
   521  			description: "nil source",
   522  			src:         nil,
   523  			want:        "",
   524  		},
   525  		{
   526  			description: "empty source",
   527  			src:         &rowSource{},
   528  			want:        "",
   529  		},
   530  		{
   531  			description: "populated id",
   532  			src:         &rowSource{queryID: "foo"},
   533  			want:        "foo",
   534  		},
   535  	}
   536  
   537  	for _, tc := range testcases {
   538  		// Don't pass a page func, we're not reading from the iterator.
   539  		it := newRowIterator(context.Background(), tc.src, nil)
   540  		got := it.QueryID()
   541  		if got != tc.want {
   542  			t.Errorf("%s: mismatch queryid, got %q want %q", tc.description, got, tc.want)
   543  		}
   544  	}
   545  }
   546  

View as plain text