1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "reflect"
22
23 bq "google.golang.org/api/bigquery/v2"
24 "google.golang.org/api/googleapi"
25 "google.golang.org/api/iterator"
26 )
27
28
29 func newRowIterator(ctx context.Context, src *rowSource, pf pageFetcher) *RowIterator {
30 it := &RowIterator{
31 ctx: ctx,
32 src: src,
33 pf: pf,
34 }
35 it.pageInfo, it.nextFunc = iterator.NewPageInfo(
36 it.fetch,
37 func() int { return len(it.rows) },
38 func() interface{} { r := it.rows; it.rows = nil; return r })
39 return it
40 }
41
42
43 type RowIterator struct {
44 ctx context.Context
45 src *rowSource
46
47 arrowIterator ArrowIterator
48 arrowDecoder *arrowDecoder
49
50 pageInfo *iterator.PageInfo
51 nextFunc func() error
52 pf pageFetcher
53
54
55
56
57
58 StartIndex uint64
59
60
61
62
63
64 Schema Schema
65
66
67
68
69
70
71 TotalRows uint64
72
73 rows [][]Value
74 structLoader structLoader
75 }
76
77
78
79 func (ri *RowIterator) SourceJob() *Job {
80 if ri.src == nil {
81 return nil
82 }
83 if ri.src.j == nil {
84 return nil
85 }
86 return &Job{
87 c: ri.src.j.c,
88 projectID: ri.src.j.projectID,
89 location: ri.src.j.location,
90 jobID: ri.src.j.jobID,
91 }
92 }
93
94
95 func (ri *RowIterator) QueryID() string {
96 if ri.src == nil {
97 return ""
98 }
99 return ri.src.queryID
100 }
101
102
103
104
105 type pageFetcher func(ctx context.Context, _ *rowSource, _ Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error)
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 func (it *RowIterator) Next(dst interface{}) error {
153 var vl ValueLoader
154 switch dst := dst.(type) {
155 case ValueLoader:
156 vl = dst
157 case *[]Value:
158 vl = (*valueList)(dst)
159 case *map[string]Value:
160 vl = (*valueMap)(dst)
161 default:
162 if !isStructPtr(dst) {
163 return fmt.Errorf("bigquery: cannot convert %T to ValueLoader (need pointer to []Value, map[string]Value, or struct)", dst)
164 }
165 }
166 if err := it.nextFunc(); err != nil {
167 return err
168 }
169 row := it.rows[0]
170 it.rows = it.rows[1:]
171
172 if vl == nil {
173
174
175 if err := it.structLoader.set(dst, it.Schema); err != nil {
176 return err
177 }
178 vl = &it.structLoader
179 }
180 return vl.Load(row, it.Schema)
181 }
182
183 func isStructPtr(x interface{}) bool {
184 t := reflect.TypeOf(x)
185 return t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Struct
186 }
187
188
189
190
191 func (it *RowIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
192
193 func (it *RowIterator) fetch(pageSize int, pageToken string) (string, error) {
194 res, err := it.pf(it.ctx, it.src, it.Schema, it.StartIndex, int64(pageSize), pageToken)
195 if err != nil {
196 return "", err
197 }
198 it.rows = append(it.rows, res.rows...)
199 if it.Schema == nil {
200 it.Schema = res.schema
201 }
202 it.TotalRows = res.totalRows
203 return res.pageToken, nil
204 }
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220 type rowSource struct {
221 j *Job
222 t *Table
223 queryID string
224
225 cachedRows []*bq.TableRow
226 cachedSchema *bq.TableSchema
227 cachedNextToken string
228 }
229
230
231 type fetchPageResult struct {
232 pageToken string
233 rows [][]Value
234 totalRows uint64
235 schema Schema
236 }
237
238
239
240
241 func fetchPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
242 result, err := fetchCachedPage(ctx, src, schema, startIndex, pageSize, pageToken)
243 if err != nil {
244 if err != errNoCacheData {
245
246 return nil, err
247 }
248
249 if src.j != nil {
250 return fetchJobResultPage(ctx, src, schema, startIndex, pageSize, pageToken)
251 }
252 if src.t != nil {
253 return fetchTableResultPage(ctx, src, schema, startIndex, pageSize, pageToken)
254 }
255
256 return &fetchPageResult{}, nil
257 }
258 return result, nil
259 }
260
261 func fetchTableResultPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
262
263 errc := make(chan error, 1)
264 if schema != nil {
265 errc <- nil
266 } else {
267 go func() {
268 var bqt *bq.Table
269 err := runWithRetry(ctx, func() (err error) {
270 bqt, err = src.t.c.bqs.Tables.Get(src.t.ProjectID, src.t.DatasetID, src.t.TableID).
271 Fields("schema").
272 Context(ctx).
273 Do()
274 return err
275 })
276 if err == nil && bqt.Schema != nil {
277 schema = bqToSchema(bqt.Schema)
278 }
279 errc <- err
280 }()
281 }
282 call := src.t.c.bqs.Tabledata.List(src.t.ProjectID, src.t.DatasetID, src.t.TableID)
283 call = call.FormatOptionsUseInt64Timestamp(true)
284 setClientHeader(call.Header())
285 if pageToken != "" {
286 call.PageToken(pageToken)
287 } else {
288 call.StartIndex(startIndex)
289 }
290 if pageSize > 0 {
291 call.MaxResults(pageSize)
292 }
293 var res *bq.TableDataList
294 err := runWithRetry(ctx, func() (err error) {
295 res, err = call.Context(ctx).Do()
296 return err
297 })
298 if err != nil {
299 return nil, err
300 }
301 err = <-errc
302 if err != nil {
303 return nil, err
304 }
305 rows, err := convertRows(res.Rows, schema)
306 if err != nil {
307 return nil, err
308 }
309 return &fetchPageResult{
310 pageToken: res.PageToken,
311 rows: rows,
312 totalRows: uint64(res.TotalRows),
313 schema: schema,
314 }, nil
315 }
316
317 func fetchJobResultPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
318
319 projectedFields := []googleapi.Field{"rows", "pageToken", "totalRows"}
320 call := src.j.c.bqs.Jobs.GetQueryResults(src.j.projectID, src.j.jobID).Location(src.j.location).Context(ctx)
321 call = call.FormatOptionsUseInt64Timestamp(true)
322 if schema == nil {
323
324 projectedFields = append(projectedFields, "schema")
325 }
326 call = call.Fields(projectedFields...)
327 setClientHeader(call.Header())
328 if pageToken != "" {
329 call.PageToken(pageToken)
330 } else {
331 call.StartIndex(startIndex)
332 }
333 if pageSize > 0 {
334 call.MaxResults(pageSize)
335 }
336 var res *bq.GetQueryResultsResponse
337 err := runWithRetry(ctx, func() (err error) {
338 res, err = call.Do()
339 return err
340 })
341 if err != nil {
342 return nil, err
343 }
344
345 if schema == nil {
346 schema = bqToSchema(res.Schema)
347 }
348 rows, err := convertRows(res.Rows, schema)
349 if err != nil {
350 return nil, err
351 }
352 return &fetchPageResult{
353 pageToken: res.PageToken,
354 rows: rows,
355 totalRows: uint64(res.TotalRows),
356 schema: schema,
357 }, nil
358 }
359
360 var errNoCacheData = errors.New("no rows in rowSource cache")
361
362
363
364
365 func fetchCachedPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
366
367 if src.cachedRows == nil {
368 return nil, errNoCacheData
369 }
370
371 if schema == nil {
372 if src.cachedSchema == nil {
373
374 src.cachedRows = nil
375 src.cachedNextToken = ""
376 return nil, errNoCacheData
377 }
378 schema = bqToSchema(src.cachedSchema)
379 }
380
381
382
383
384
385 if pageToken == "" &&
386 startIndex == 0 &&
387 (pageSize == 0 || pageSize == int64(len(src.cachedRows))) {
388 converted, err := convertRows(src.cachedRows, schema)
389 if err != nil {
390
391 src.cachedRows = nil
392 src.cachedSchema = nil
393 src.cachedNextToken = ""
394 return nil, err
395 }
396 result := &fetchPageResult{
397 pageToken: src.cachedNextToken,
398 rows: converted,
399 schema: schema,
400 totalRows: uint64(len(converted)),
401 }
402
403 src.cachedRows = nil
404 src.cachedSchema = nil
405 src.cachedNextToken = ""
406 return result, nil
407 }
408
409 src.cachedRows = nil
410 src.cachedSchema = nil
411 src.cachedNextToken = ""
412 return nil, errNoCacheData
413 }
414
View as plain text