...

Source file src/cloud.google.com/go/bigquery/iterator.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2015 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"reflect"
    22  
    23  	bq "google.golang.org/api/bigquery/v2"
    24  	"google.golang.org/api/googleapi"
    25  	"google.golang.org/api/iterator"
    26  )
    27  
    28  // Construct a RowIterator.
    29  func newRowIterator(ctx context.Context, src *rowSource, pf pageFetcher) *RowIterator {
    30  	it := &RowIterator{
    31  		ctx: ctx,
    32  		src: src,
    33  		pf:  pf,
    34  	}
    35  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
    36  		it.fetch,
    37  		func() int { return len(it.rows) },
    38  		func() interface{} { r := it.rows; it.rows = nil; return r })
    39  	return it
    40  }
    41  
    42  // A RowIterator provides access to the result of a BigQuery lookup.
    43  type RowIterator struct {
    44  	ctx context.Context
    45  	src *rowSource
    46  
    47  	arrowIterator ArrowIterator
    48  	arrowDecoder  *arrowDecoder
    49  
    50  	pageInfo *iterator.PageInfo
    51  	nextFunc func() error
    52  	pf       pageFetcher
    53  
    54  	// StartIndex can be set before the first call to Next. If PageInfo().Token
    55  	// is also set, StartIndex is ignored. If Storage API is enabled,
    56  	// StartIndex is also ignored because is not supported. IsAccelerated()
    57  	// method can be called to check if Storage API is enabled for the RowIterator.
    58  	StartIndex uint64
    59  
    60  	// The schema of the table.
    61  	// In some scenarios it will only be available after the first
    62  	// call to Next(), like when a call to Query.Read uses
    63  	// the jobs.query API for an optimized query path.
    64  	Schema Schema
    65  
    66  	// The total number of rows in the result.
    67  	// In some scenarios it will only be available after the first
    68  	// call to Next(), like when a call to Query.Read uses
    69  	// the jobs.query API for an optimized query path.
    70  	// May be zero just after rows were inserted.
    71  	TotalRows uint64
    72  
    73  	rows         [][]Value
    74  	structLoader structLoader // used to populate a pointer to a struct
    75  }
    76  
    77  // SourceJob returns an instance of a Job if the RowIterator is backed by a query,
    78  // or a nil.
    79  func (ri *RowIterator) SourceJob() *Job {
    80  	if ri.src == nil {
    81  		return nil
    82  	}
    83  	if ri.src.j == nil {
    84  		return nil
    85  	}
    86  	return &Job{
    87  		c:         ri.src.j.c,
    88  		projectID: ri.src.j.projectID,
    89  		location:  ri.src.j.location,
    90  		jobID:     ri.src.j.jobID,
    91  	}
    92  }
    93  
    94  // QueryID returns a query ID if available, or an empty string.
    95  func (ri *RowIterator) QueryID() string {
    96  	if ri.src == nil {
    97  		return ""
    98  	}
    99  	return ri.src.queryID
   100  }
   101  
   102  // We declare a function signature for fetching results.  The primary reason
   103  // for this is to enable us to swap out the fetch function with alternate
   104  // implementations (e.g. to enable testing).
   105  type pageFetcher func(ctx context.Context, _ *rowSource, _ Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error)
   106  
   107  // Next loads the next row into dst. Its return value is iterator.Done if there
   108  // are no more results. Once Next returns iterator.Done, all subsequent calls
   109  // will return iterator.Done.
   110  //
   111  // dst may implement ValueLoader, or may be a *[]Value, *map[string]Value, or struct pointer.
   112  //
   113  // If dst is a *[]Value, it will be set to new []Value whose i'th element
   114  // will be populated with the i'th column of the row.
   115  //
   116  // If dst is a *map[string]Value, a new map will be created if dst is nil. Then
   117  // for each schema column name, the map key of that name will be set to the column's
   118  // value. STRUCT types (RECORD types or nested schemas) become nested maps.
   119  //
   120  // If dst is pointer to a struct, each column in the schema will be matched
   121  // with an exported field of the struct that has the same name, ignoring case.
   122  // Unmatched schema columns and struct fields will be ignored.
   123  //
   124  // Each BigQuery column type corresponds to one or more Go types; a matching struct
   125  // field must be of the correct type. The correspondences are:
   126  //
   127  //	STRING      string
   128  //	BOOL        bool
   129  //	INTEGER     int, int8, int16, int32, int64, uint8, uint16, uint32
   130  //	FLOAT       float32, float64
   131  //	BYTES       []byte
   132  //	TIMESTAMP   time.Time
   133  //	DATE        civil.Date
   134  //	TIME        civil.Time
   135  //	DATETIME    civil.DateTime
   136  //	NUMERIC     *big.Rat
   137  //	BIGNUMERIC  *big.Rat
   138  //
   139  // The big.Rat type supports numbers of arbitrary size and precision.
   140  // See https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type
   141  // for more on NUMERIC.
   142  //
   143  // A repeated field corresponds to a slice or array of the element type. A STRUCT
   144  // type (RECORD or nested schema) corresponds to a nested struct or struct pointer.
   145  // All calls to Next on the same iterator must use the same struct type.
   146  //
   147  // It is an error to attempt to read a BigQuery NULL value into a struct field,
   148  // unless the field is of type []byte or is one of the special Null types: NullInt64,
   149  // NullFloat64, NullBool, NullString, NullTimestamp, NullDate, NullTime or
   150  // NullDateTime. You can also use a *[]Value or *map[string]Value to read from a
   151  // table with NULLs.
   152  func (it *RowIterator) Next(dst interface{}) error {
   153  	var vl ValueLoader
   154  	switch dst := dst.(type) {
   155  	case ValueLoader:
   156  		vl = dst
   157  	case *[]Value:
   158  		vl = (*valueList)(dst)
   159  	case *map[string]Value:
   160  		vl = (*valueMap)(dst)
   161  	default:
   162  		if !isStructPtr(dst) {
   163  			return fmt.Errorf("bigquery: cannot convert %T to ValueLoader (need pointer to []Value, map[string]Value, or struct)", dst)
   164  		}
   165  	}
   166  	if err := it.nextFunc(); err != nil {
   167  		return err
   168  	}
   169  	row := it.rows[0]
   170  	it.rows = it.rows[1:]
   171  
   172  	if vl == nil {
   173  		// This can only happen if dst is a pointer to a struct. We couldn't
   174  		// set vl above because we need the schema.
   175  		if err := it.structLoader.set(dst, it.Schema); err != nil {
   176  			return err
   177  		}
   178  		vl = &it.structLoader
   179  	}
   180  	return vl.Load(row, it.Schema)
   181  }
   182  
   183  func isStructPtr(x interface{}) bool {
   184  	t := reflect.TypeOf(x)
   185  	return t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Struct
   186  }
   187  
   188  // PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
   189  // Currently pagination is not supported when the Storage API is enabled. IsAccelerated()
   190  // method can be called to check if Storage API is enabled for the RowIterator.
   191  func (it *RowIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
   192  
   193  func (it *RowIterator) fetch(pageSize int, pageToken string) (string, error) {
   194  	res, err := it.pf(it.ctx, it.src, it.Schema, it.StartIndex, int64(pageSize), pageToken)
   195  	if err != nil {
   196  		return "", err
   197  	}
   198  	it.rows = append(it.rows, res.rows...)
   199  	if it.Schema == nil {
   200  		it.Schema = res.schema
   201  	}
   202  	it.TotalRows = res.totalRows
   203  	return res.pageToken, nil
   204  }
   205  
   206  // rowSource represents one of the multiple sources of data for a row iterator.
   207  // Rows can be read directly from a BigQuery table or from a job reference.
   208  // If a job is present, that's treated as the authoritative source.
   209  //
   210  // rowSource can also cache results for special situations, primarily for the
   211  // fast execution query path which can return status, rows, and schema all at
   212  // once.  Our cache data expectations are as follows:
   213  //
   214  //   - We can only cache data from the start of a source.
   215  //   - We need to cache schema, rows, and next page token to effective service
   216  //     a request from cache.
   217  //   - cache references are destroyed as soon as they're interrogated.  We don't
   218  //     want to retain the data unnecessarily, and we expect that the backend
   219  //     can always provide them if needed.
   220  type rowSource struct {
   221  	j       *Job
   222  	t       *Table
   223  	queryID string
   224  
   225  	cachedRows      []*bq.TableRow
   226  	cachedSchema    *bq.TableSchema
   227  	cachedNextToken string
   228  }
   229  
   230  // fetchPageResult represents a page of rows returned from the backend.
   231  type fetchPageResult struct {
   232  	pageToken string
   233  	rows      [][]Value
   234  	totalRows uint64
   235  	schema    Schema
   236  }
   237  
   238  // fetchPage is our generalized fetch mechanism.  It interrogates from cache, and
   239  // then dispatches to either the appropriate job or table-based backend mechanism
   240  // as needed.
   241  func fetchPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
   242  	result, err := fetchCachedPage(ctx, src, schema, startIndex, pageSize, pageToken)
   243  	if err != nil {
   244  		if err != errNoCacheData {
   245  			// This likely means something more severe, like a problem with schema.
   246  			return nil, err
   247  		}
   248  		// If we failed to fetch data from cache, invoke the appropriate service method.
   249  		if src.j != nil {
   250  			return fetchJobResultPage(ctx, src, schema, startIndex, pageSize, pageToken)
   251  		}
   252  		if src.t != nil {
   253  			return fetchTableResultPage(ctx, src, schema, startIndex, pageSize, pageToken)
   254  		}
   255  		// No rows, but no table or job reference.  Return an empty result set.
   256  		return &fetchPageResult{}, nil
   257  	}
   258  	return result, nil
   259  }
   260  
   261  func fetchTableResultPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
   262  	// Fetch the table schema in the background, if necessary.
   263  	errc := make(chan error, 1)
   264  	if schema != nil {
   265  		errc <- nil
   266  	} else {
   267  		go func() {
   268  			var bqt *bq.Table
   269  			err := runWithRetry(ctx, func() (err error) {
   270  				bqt, err = src.t.c.bqs.Tables.Get(src.t.ProjectID, src.t.DatasetID, src.t.TableID).
   271  					Fields("schema").
   272  					Context(ctx).
   273  					Do()
   274  				return err
   275  			})
   276  			if err == nil && bqt.Schema != nil {
   277  				schema = bqToSchema(bqt.Schema)
   278  			}
   279  			errc <- err
   280  		}()
   281  	}
   282  	call := src.t.c.bqs.Tabledata.List(src.t.ProjectID, src.t.DatasetID, src.t.TableID)
   283  	call = call.FormatOptionsUseInt64Timestamp(true)
   284  	setClientHeader(call.Header())
   285  	if pageToken != "" {
   286  		call.PageToken(pageToken)
   287  	} else {
   288  		call.StartIndex(startIndex)
   289  	}
   290  	if pageSize > 0 {
   291  		call.MaxResults(pageSize)
   292  	}
   293  	var res *bq.TableDataList
   294  	err := runWithRetry(ctx, func() (err error) {
   295  		res, err = call.Context(ctx).Do()
   296  		return err
   297  	})
   298  	if err != nil {
   299  		return nil, err
   300  	}
   301  	err = <-errc
   302  	if err != nil {
   303  		return nil, err
   304  	}
   305  	rows, err := convertRows(res.Rows, schema)
   306  	if err != nil {
   307  		return nil, err
   308  	}
   309  	return &fetchPageResult{
   310  		pageToken: res.PageToken,
   311  		rows:      rows,
   312  		totalRows: uint64(res.TotalRows),
   313  		schema:    schema,
   314  	}, nil
   315  }
   316  
   317  func fetchJobResultPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
   318  	// reduce data transfered by leveraging api projections
   319  	projectedFields := []googleapi.Field{"rows", "pageToken", "totalRows"}
   320  	call := src.j.c.bqs.Jobs.GetQueryResults(src.j.projectID, src.j.jobID).Location(src.j.location).Context(ctx)
   321  	call = call.FormatOptionsUseInt64Timestamp(true)
   322  	if schema == nil {
   323  		// only project schema if we weren't supplied one.
   324  		projectedFields = append(projectedFields, "schema")
   325  	}
   326  	call = call.Fields(projectedFields...)
   327  	setClientHeader(call.Header())
   328  	if pageToken != "" {
   329  		call.PageToken(pageToken)
   330  	} else {
   331  		call.StartIndex(startIndex)
   332  	}
   333  	if pageSize > 0 {
   334  		call.MaxResults(pageSize)
   335  	}
   336  	var res *bq.GetQueryResultsResponse
   337  	err := runWithRetry(ctx, func() (err error) {
   338  		res, err = call.Do()
   339  		return err
   340  	})
   341  	if err != nil {
   342  		return nil, err
   343  	}
   344  	// Populate schema in the rowsource if it's missing
   345  	if schema == nil {
   346  		schema = bqToSchema(res.Schema)
   347  	}
   348  	rows, err := convertRows(res.Rows, schema)
   349  	if err != nil {
   350  		return nil, err
   351  	}
   352  	return &fetchPageResult{
   353  		pageToken: res.PageToken,
   354  		rows:      rows,
   355  		totalRows: uint64(res.TotalRows),
   356  		schema:    schema,
   357  	}, nil
   358  }
   359  
   360  var errNoCacheData = errors.New("no rows in rowSource cache")
   361  
   362  // fetchCachedPage attempts to service the first page of results.  For the jobs path specifically, we have an
   363  // opportunity to fetch rows before the iterator is constructed, and thus serve that data as the first request
   364  // without an unnecessary network round trip.
   365  func fetchCachedPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
   366  	// we have no cached data
   367  	if src.cachedRows == nil {
   368  		return nil, errNoCacheData
   369  	}
   370  	// we have no schema for decoding.  convert from the cached representation if available.
   371  	if schema == nil {
   372  		if src.cachedSchema == nil {
   373  			// We can't progress with no schema, destroy references and return a miss.
   374  			src.cachedRows = nil
   375  			src.cachedNextToken = ""
   376  			return nil, errNoCacheData
   377  		}
   378  		schema = bqToSchema(src.cachedSchema)
   379  	}
   380  	// Only serve from cache where we're confident we know someone's asking for the first page
   381  	// without having to align data.
   382  	//
   383  	// Future consideration: we could service pagesizes smaller than the cache if we're willing to handle generation
   384  	// of pageTokens for the cache.
   385  	if pageToken == "" &&
   386  		startIndex == 0 &&
   387  		(pageSize == 0 || pageSize == int64(len(src.cachedRows))) {
   388  		converted, err := convertRows(src.cachedRows, schema)
   389  		if err != nil {
   390  			// destroy cache references and return error
   391  			src.cachedRows = nil
   392  			src.cachedSchema = nil
   393  			src.cachedNextToken = ""
   394  			return nil, err
   395  		}
   396  		result := &fetchPageResult{
   397  			pageToken: src.cachedNextToken,
   398  			rows:      converted,
   399  			schema:    schema,
   400  			totalRows: uint64(len(converted)),
   401  		}
   402  		// clear cache references and return response.
   403  		src.cachedRows = nil
   404  		src.cachedSchema = nil
   405  		src.cachedNextToken = ""
   406  		return result, nil
   407  	}
   408  	// All other cases are invalid.  Destroy any cache references on the way out the door.
   409  	src.cachedRows = nil
   410  	src.cachedSchema = nil
   411  	src.cachedNextToken = ""
   412  	return nil, errNoCacheData
   413  }
   414  

View as plain text