...

Source file src/cloud.google.com/go/bigquery/read_test.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2015 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"testing"
    21  
    22  	"cloud.google.com/go/internal/testutil"
    23  	"github.com/google/go-cmp/cmp"
    24  	bq "google.golang.org/api/bigquery/v2"
    25  	"google.golang.org/api/iterator"
    26  )
    27  
    28  type pageFetcherArgs struct {
    29  	src        *rowSource
    30  	schema     Schema
    31  	startIndex uint64
    32  	pageSize   int64
    33  	pageToken  string
    34  }
    35  
    36  // pageFetcherReadStub services read requests by returning data from an in-memory list of values.
    37  type pageFetcherReadStub struct {
    38  	// values and pageTokens are used as sources of data to return in response to calls to readTabledata or readQuery.
    39  	values     [][][]Value       // contains pages / rows / columns.
    40  	pageTokens map[string]string // maps incoming page token to returned page token.
    41  
    42  	// arguments are recorded for later inspection.
    43  	calls []pageFetcherArgs
    44  }
    45  
    46  func (s *pageFetcherReadStub) fetchPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
    47  	s.calls = append(s.calls,
    48  		pageFetcherArgs{src, schema, startIndex, pageSize, pageToken})
    49  	result := &fetchPageResult{
    50  		pageToken: s.pageTokens[pageToken],
    51  		rows:      s.values[0],
    52  	}
    53  	s.values = s.values[1:]
    54  	return result, nil
    55  }
    56  
    57  func waitForQueryStub(context.Context, string) (Schema, uint64, error) {
    58  	return nil, 1, nil
    59  }
    60  
    61  func TestRead(t *testing.T) {
    62  	// The data for the service stub to return is populated for each test case in the testCases for loop.
    63  	ctx := context.Background()
    64  	c := &Client{projectID: "project-id"}
    65  	pf := &pageFetcherReadStub{}
    66  	queryJob := &Job{
    67  		projectID: "project-id",
    68  		jobID:     "job-id",
    69  		c:         c,
    70  		config: &bq.JobConfiguration{
    71  			Query: &bq.JobConfigurationQuery{
    72  				DestinationTable: &bq.TableReference{
    73  					ProjectId: "project-id",
    74  					DatasetId: "dataset-id",
    75  					TableId:   "table-id",
    76  				},
    77  			},
    78  		},
    79  	}
    80  
    81  	for _, readFunc := range []func() *RowIterator{
    82  		func() *RowIterator {
    83  			return c.Dataset("dataset-id").Table("table-id").read(ctx, pf.fetchPage)
    84  		},
    85  		func() *RowIterator {
    86  			it, err := queryJob.read(ctx, waitForQueryStub, pf.fetchPage)
    87  			if err != nil {
    88  				t.Fatal(err)
    89  			}
    90  			return it
    91  		},
    92  	} {
    93  		testCases := []struct {
    94  			data       [][][]Value
    95  			pageTokens map[string]string
    96  			want       [][]Value
    97  		}{
    98  			{
    99  				data:       [][][]Value{{{1, 2}, {11, 12}}, {{30, 40}, {31, 41}}},
   100  				pageTokens: map[string]string{"": "a", "a": ""},
   101  				want:       [][]Value{{1, 2}, {11, 12}, {30, 40}, {31, 41}},
   102  			},
   103  			{
   104  				data:       [][][]Value{{{1, 2}, {11, 12}}, {{30, 40}, {31, 41}}},
   105  				pageTokens: map[string]string{"": ""}, // no more pages after first one.
   106  				want:       [][]Value{{1, 2}, {11, 12}},
   107  			},
   108  		}
   109  		for _, tc := range testCases {
   110  			pf.values = tc.data
   111  			pf.pageTokens = tc.pageTokens
   112  			if got, ok := collectValues(t, readFunc()); ok {
   113  				if !testutil.Equal(got, tc.want) {
   114  					t.Errorf("reading: got:\n%v\nwant:\n%v", got, tc.want)
   115  				}
   116  			}
   117  		}
   118  	}
   119  }
   120  
   121  func collectValues(t *testing.T, it *RowIterator) ([][]Value, bool) {
   122  	var got [][]Value
   123  	for {
   124  		var vals []Value
   125  		err := it.Next(&vals)
   126  		if err == iterator.Done {
   127  			break
   128  		}
   129  		if err != nil {
   130  			t.Errorf("err calling Next: %v", err)
   131  			return nil, false
   132  		}
   133  		got = append(got, vals)
   134  	}
   135  	return got, true
   136  }
   137  
   138  func TestNoMoreValues(t *testing.T) {
   139  	c := &Client{projectID: "project-id"}
   140  	pf := &pageFetcherReadStub{
   141  		values: [][][]Value{{{1, 2}, {11, 12}}},
   142  	}
   143  	it := c.Dataset("dataset-id").Table("table-id").read(context.Background(), pf.fetchPage)
   144  	var vals []Value
   145  	// We expect to retrieve two values and then fail on the next attempt.
   146  	if err := it.Next(&vals); err != nil {
   147  		t.Fatalf("Next: got: %v: want: nil", err)
   148  	}
   149  	if err := it.Next(&vals); err != nil {
   150  		t.Fatalf("Next: got: %v: want: nil", err)
   151  	}
   152  	if err := it.Next(&vals); err != iterator.Done {
   153  		t.Fatalf("Next: got: %v: want: iterator.Done", err)
   154  	}
   155  }
   156  
   157  var errBang = errors.New("bang")
   158  
   159  func errorFetchPage(context.Context, *rowSource, Schema, uint64, int64, string) (*fetchPageResult, error) {
   160  	return nil, errBang
   161  }
   162  
   163  func TestReadError(t *testing.T) {
   164  	// test that service read errors are propagated back to the caller.
   165  	c := &Client{projectID: "project-id"}
   166  	it := c.Dataset("dataset-id").Table("table-id").read(context.Background(), errorFetchPage)
   167  	var vals []Value
   168  	if err := it.Next(&vals); err != errBang {
   169  		t.Fatalf("Get: got: %v: want: %v", err, errBang)
   170  	}
   171  }
   172  
   173  func TestReadTabledataOptions(t *testing.T) {
   174  	// test that read options are propagated.
   175  	s := &pageFetcherReadStub{
   176  		values: [][][]Value{{{1, 2}}},
   177  	}
   178  	c := &Client{projectID: "project-id"}
   179  	tr := c.Dataset("dataset-id").Table("table-id")
   180  	it := tr.read(context.Background(), s.fetchPage)
   181  	it.PageInfo().MaxSize = 5
   182  	var vals []Value
   183  	if err := it.Next(&vals); err != nil {
   184  		t.Fatal(err)
   185  	}
   186  	want := []pageFetcherArgs{{
   187  		src: &rowSource{
   188  			t: tr,
   189  		},
   190  		pageSize:  5,
   191  		pageToken: "",
   192  	}}
   193  	if diff := testutil.Diff(s.calls, want, cmp.AllowUnexported(pageFetcherArgs{}, pageFetcherReadStub{}, rowSource{}, Table{}, Client{})); diff != "" {
   194  		t.Errorf("reading (got=-, want=+):\n%s", diff)
   195  	}
   196  }
   197  
   198  func TestReadQueryOptions(t *testing.T) {
   199  	// test that read options are propagated.
   200  	c := &Client{projectID: "project-id"}
   201  	pf := &pageFetcherReadStub{
   202  		values: [][][]Value{{{1, 2}}},
   203  	}
   204  	tr := &bq.TableReference{
   205  		ProjectId: "project-id",
   206  		DatasetId: "dataset-id",
   207  		TableId:   "table-id",
   208  	}
   209  	queryJob := &Job{
   210  		projectID: "project-id",
   211  		jobID:     "job-id",
   212  		c:         c,
   213  		config: &bq.JobConfiguration{
   214  			Query: &bq.JobConfigurationQuery{DestinationTable: tr},
   215  		},
   216  	}
   217  	it, err := queryJob.read(context.Background(), waitForQueryStub, pf.fetchPage)
   218  	if err != nil {
   219  		t.Fatalf("err calling Read: %v", err)
   220  	}
   221  	it.PageInfo().MaxSize = 5
   222  	var vals []Value
   223  	if err := it.Next(&vals); err != nil {
   224  		t.Fatalf("Next: got: %v: want: nil", err)
   225  	}
   226  
   227  	want := []pageFetcherArgs{{
   228  		src: &rowSource{
   229  			j: &Job{
   230  				c:         queryJob.c,
   231  				jobID:     queryJob.jobID,
   232  				projectID: queryJob.projectID,
   233  				location:  queryJob.location,
   234  			},
   235  		},
   236  		pageSize:  5,
   237  		pageToken: "",
   238  	}}
   239  	if !testutil.Equal(pf.calls, want, cmp.AllowUnexported(pageFetcherArgs{}, rowSource{}, Job{}, Client{})) {
   240  		t.Errorf("reading: got:\n%v\nwant:\n%v", pf.calls, want)
   241  	}
   242  }
   243  

View as plain text