1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "testing"
22
23 "cloud.google.com/go/internal/testutil"
24 "github.com/google/go-cmp/cmp"
25 bq "google.golang.org/api/bigquery/v2"
26 "google.golang.org/api/iterator"
27 )
28
29 type fetchResponse struct {
30 result *fetchPageResult
31 err error
32 }
33
34
35 type pageFetcherStub struct {
36 fetchResponses map[string]fetchResponse
37 err error
38 }
39
40 func (pf *pageFetcherStub) fetchPage(ctx context.Context, _ *rowSource, _ Schema, _ uint64, _ int64, pageToken string) (*fetchPageResult, error) {
41 call, ok := pf.fetchResponses[pageToken]
42 if !ok {
43 pf.err = fmt.Errorf("Unexpected page token: %q", pageToken)
44 }
45 return call.result, call.err
46 }
47
48 func TestRowIteratorCacheBehavior(t *testing.T) {
49
50 testSchema := &bq.TableSchema{
51 Fields: []*bq.TableFieldSchema{
52 {Type: "INTEGER", Name: "field1"},
53 {Type: "STRING", Name: "field2"},
54 },
55 }
56 testRows := []*bq.TableRow{
57 {F: []*bq.TableCell{
58 {V: "1"},
59 {V: "foo"},
60 },
61 },
62 }
63 convertedSchema := bqToSchema(testSchema)
64
65 convertedRows, _ := convertRows(testRows, convertedSchema)
66
67 testCases := []struct {
68 inSource *rowSource
69 inSchema Schema
70 inStartIndex uint64
71 inPageSize int64
72 inPageToken string
73 wantErr error
74 wantResult *fetchPageResult
75 }{
76 {
77 inSource: &rowSource{},
78 wantErr: errNoCacheData,
79 },
80 {
81
82 inSource: &rowSource{
83 cachedSchema: testSchema,
84 cachedRows: testRows,
85 },
86 wantResult: &fetchPageResult{
87 totalRows: uint64(len(convertedRows)),
88 schema: convertedSchema,
89 rows: convertedRows,
90 },
91 },
92 {
93
94 inSource: &rowSource{
95 cachedRows: testRows,
96 cachedNextToken: "foo",
97 },
98 inSchema: convertedSchema,
99 wantResult: &fetchPageResult{
100 totalRows: uint64(len(convertedRows)),
101 schema: convertedSchema,
102 rows: convertedRows,
103 pageToken: "foo",
104 },
105 },
106 {
107
108 inSource: &rowSource{
109 cachedSchema: testSchema,
110 cachedRows: testRows,
111 },
112 inPageSize: 99,
113 wantErr: errNoCacheData,
114 },
115 {
116
117 inSource: &rowSource{
118 cachedSchema: testSchema,
119 cachedRows: testRows,
120 },
121 inStartIndex: 1,
122 wantErr: errNoCacheData,
123 },
124 {
125
126 inSource: &rowSource{
127 cachedSchema: testSchema,
128 cachedRows: testRows,
129 },
130 inStartIndex: 1,
131 wantErr: errNoCacheData,
132 },
133 {
134
135 inSource: &rowSource{
136 cachedSchema: testSchema,
137 cachedRows: testRows,
138 },
139 inStartIndex: 1,
140 wantErr: errNoCacheData,
141 },
142 }
143 for _, tc := range testCases {
144 gotResp, gotErr := fetchCachedPage(context.Background(), tc.inSource, tc.inSchema, tc.inStartIndex, tc.inPageSize, tc.inPageToken)
145 if gotErr != tc.wantErr {
146 t.Errorf("err mismatch. got %v, want %v", gotErr, tc.wantErr)
147 } else {
148 if diff := testutil.Diff(gotResp, tc.wantResult,
149 cmp.AllowUnexported(fetchPageResult{}, rowSource{}, Job{}, Client{}, Table{})); diff != "" {
150 t.Errorf("response diff (got=-, want=+):\n%s", diff)
151 }
152 }
153 }
154
155 }
156
157 func TestIterator(t *testing.T) {
158 var (
159 iiSchema = Schema{
160 {Type: IntegerFieldType},
161 {Type: IntegerFieldType},
162 }
163 siSchema = Schema{
164 {Type: StringFieldType},
165 {Type: IntegerFieldType},
166 }
167 )
168 fetchFailure := errors.New("fetch failure")
169
170 testCases := []struct {
171 desc string
172 pageToken string
173 fetchResponses map[string]fetchResponse
174 want [][]Value
175 wantErr error
176 wantSchema Schema
177 wantTotalRows uint64
178 }{
179 {
180 desc: "Iteration over single empty page",
181 fetchResponses: map[string]fetchResponse{
182 "": {
183 result: &fetchPageResult{
184 pageToken: "",
185 rows: [][]Value{},
186 schema: Schema{},
187 },
188 },
189 },
190 want: [][]Value{},
191 wantSchema: Schema{},
192 },
193 {
194 desc: "Iteration over single page",
195 fetchResponses: map[string]fetchResponse{
196 "": {
197 result: &fetchPageResult{
198 pageToken: "",
199 rows: [][]Value{{1, 2}, {11, 12}},
200 schema: iiSchema,
201 totalRows: 4,
202 },
203 },
204 },
205 want: [][]Value{{1, 2}, {11, 12}},
206 wantSchema: iiSchema,
207 wantTotalRows: 4,
208 },
209 {
210 desc: "Iteration over single page with different schema",
211 fetchResponses: map[string]fetchResponse{
212 "": {
213 result: &fetchPageResult{
214 pageToken: "",
215 rows: [][]Value{{"1", 2}, {"11", 12}},
216 schema: siSchema,
217 },
218 },
219 },
220 want: [][]Value{{"1", 2}, {"11", 12}},
221 wantSchema: siSchema,
222 },
223 {
224 desc: "Iteration over two pages",
225 fetchResponses: map[string]fetchResponse{
226 "": {
227 result: &fetchPageResult{
228 pageToken: "a",
229 rows: [][]Value{{1, 2}, {11, 12}},
230 schema: iiSchema,
231 totalRows: 4,
232 },
233 },
234 "a": {
235 result: &fetchPageResult{
236 pageToken: "",
237 rows: [][]Value{{101, 102}, {111, 112}},
238 schema: iiSchema,
239 totalRows: 4,
240 },
241 },
242 },
243 want: [][]Value{{1, 2}, {11, 12}, {101, 102}, {111, 112}},
244 wantSchema: iiSchema,
245 wantTotalRows: 4,
246 },
247 {
248 desc: "Server response includes empty page",
249 fetchResponses: map[string]fetchResponse{
250 "": {
251 result: &fetchPageResult{
252 pageToken: "a",
253 rows: [][]Value{{1, 2}, {11, 12}},
254 schema: iiSchema,
255 },
256 },
257 "a": {
258 result: &fetchPageResult{
259 pageToken: "b",
260 rows: [][]Value{},
261 schema: iiSchema,
262 },
263 },
264 "b": {
265 result: &fetchPageResult{
266 pageToken: "",
267 rows: [][]Value{{101, 102}, {111, 112}},
268 schema: iiSchema,
269 },
270 },
271 },
272 want: [][]Value{{1, 2}, {11, 12}, {101, 102}, {111, 112}},
273 wantSchema: iiSchema,
274 },
275 {
276 desc: "Fetch error",
277 fetchResponses: map[string]fetchResponse{
278 "": {
279 result: &fetchPageResult{
280 pageToken: "a",
281 rows: [][]Value{{1, 2}, {11, 12}},
282 schema: iiSchema,
283 },
284 },
285 "a": {
286
287
288 err: fetchFailure,
289 result: &fetchPageResult{
290 pageToken: "b",
291 rows: [][]Value{{101, 102}, {111, 112}},
292 schema: iiSchema,
293 },
294 },
295 },
296 want: [][]Value{{1, 2}, {11, 12}},
297 wantErr: fetchFailure,
298 wantSchema: iiSchema,
299 },
300
301 {
302 desc: "Skip over an entire page",
303 pageToken: "a",
304 fetchResponses: map[string]fetchResponse{
305 "": {
306 result: &fetchPageResult{
307 pageToken: "a",
308 rows: [][]Value{{1, 2}, {11, 12}},
309 schema: iiSchema,
310 },
311 },
312 "a": {
313 result: &fetchPageResult{
314 pageToken: "",
315 rows: [][]Value{{101, 102}, {111, 112}},
316 schema: iiSchema,
317 },
318 },
319 },
320 want: [][]Value{{101, 102}, {111, 112}},
321 wantSchema: iiSchema,
322 },
323
324 {
325 desc: "Skip beyond all data",
326 pageToken: "b",
327 fetchResponses: map[string]fetchResponse{
328 "": {
329 result: &fetchPageResult{
330 pageToken: "a",
331 rows: [][]Value{{1, 2}, {11, 12}},
332 schema: iiSchema,
333 },
334 },
335 "a": {
336 result: &fetchPageResult{
337 pageToken: "b",
338 rows: [][]Value{{101, 102}, {111, 112}},
339 schema: iiSchema,
340 },
341 },
342 "b": {
343 result: &fetchPageResult{},
344 },
345 },
346
347
348 want: [][]Value{},
349 wantSchema: Schema{},
350 },
351 }
352
353 for _, tc := range testCases {
354 pf := &pageFetcherStub{
355 fetchResponses: tc.fetchResponses,
356 }
357 it := newRowIterator(context.Background(), nil, pf.fetchPage)
358 it.PageInfo().Token = tc.pageToken
359 values, schema, totalRows, err := consumeRowIterator(it)
360 if err != tc.wantErr {
361 t.Fatalf("%s: got %v, want %v", tc.desc, err, tc.wantErr)
362 }
363 if (len(values) != 0 || len(tc.want) != 0) && !testutil.Equal(values, tc.want) {
364 t.Errorf("%s: values:\ngot: %v\nwant:%v", tc.desc, values, tc.want)
365 }
366 if (len(schema) != 0 || len(tc.wantSchema) != 0) && !testutil.Equal(schema, tc.wantSchema) {
367 t.Errorf("%s: iterator.Schema:\ngot: %v\nwant: %v", tc.desc, schema, tc.wantSchema)
368 }
369 if totalRows != tc.wantTotalRows {
370 t.Errorf("%s: totalRows: got %d, want %d", tc.desc, totalRows, tc.wantTotalRows)
371 }
372 }
373 }
374
375
376 func consumeRowIterator(it *RowIterator) ([][]Value, Schema, uint64, error) {
377 var (
378 got [][]Value
379 schema Schema
380 totalRows uint64
381 )
382 for {
383 var vls []Value
384 err := it.Next(&vls)
385 if err == iterator.Done {
386 return got, schema, totalRows, nil
387 }
388 if err != nil {
389 return got, schema, totalRows, err
390 }
391 got = append(got, vls)
392 schema = it.Schema
393 totalRows = it.TotalRows
394 }
395 }
396
397 func TestNextDuringErrorState(t *testing.T) {
398 pf := &pageFetcherStub{
399 fetchResponses: map[string]fetchResponse{
400 "": {err: errors.New("bang")},
401 },
402 }
403 it := newRowIterator(context.Background(), nil, pf.fetchPage)
404 var vals []Value
405 if err := it.Next(&vals); err == nil {
406 t.Errorf("Expected error after calling Next")
407 }
408 if err := it.Next(&vals); err == nil {
409 t.Errorf("Expected error calling Next again when iterator has a non-nil error.")
410 }
411 }
412
413 func TestNextAfterFinished(t *testing.T) {
414 testCases := []struct {
415 fetchResponses map[string]fetchResponse
416 want [][]Value
417 }{
418 {
419 fetchResponses: map[string]fetchResponse{
420 "": {
421 result: &fetchPageResult{
422 pageToken: "",
423 rows: [][]Value{{1, 2}, {11, 12}},
424 },
425 },
426 },
427 want: [][]Value{{1, 2}, {11, 12}},
428 },
429 {
430 fetchResponses: map[string]fetchResponse{
431 "": {
432 result: &fetchPageResult{
433 pageToken: "",
434 rows: [][]Value{},
435 },
436 },
437 },
438 want: [][]Value{},
439 },
440 }
441
442 for _, tc := range testCases {
443 pf := &pageFetcherStub{
444 fetchResponses: tc.fetchResponses,
445 }
446 it := newRowIterator(context.Background(), nil, pf.fetchPage)
447
448 values, _, _, err := consumeRowIterator(it)
449 if err != nil {
450 t.Fatal(err)
451 }
452 if (len(values) != 0 || len(tc.want) != 0) && !testutil.Equal(values, tc.want) {
453 t.Errorf("values: got:\n%v\nwant:\n%v", values, tc.want)
454 }
455
456 var vals []Value
457 if err := it.Next(&vals); err != iterator.Done {
458 t.Errorf("Expected Done calling Next when there are no more values")
459 }
460 }
461 }
462
463 func TestIteratorNextTypes(t *testing.T) {
464 it := newRowIterator(context.Background(), nil, nil)
465 for _, v := range []interface{}{3, "s", []int{}, &[]int{},
466 map[string]Value{}, &map[string]interface{}{},
467 struct{}{},
468 } {
469 if err := it.Next(v); err == nil {
470 t.Errorf("%v: want error, got nil", v)
471 }
472 }
473 }
474
475 func TestIteratorSourceJob(t *testing.T) {
476 testcases := []struct {
477 description string
478 src *rowSource
479 wantJob *Job
480 }{
481 {
482 description: "nil source",
483 src: nil,
484 wantJob: nil,
485 },
486 {
487 description: "empty source",
488 src: &rowSource{},
489 wantJob: nil,
490 },
491 {
492 description: "table source",
493 src: &rowSource{t: &Table{ProjectID: "p", DatasetID: "d", TableID: "t"}},
494 wantJob: nil,
495 },
496 {
497 description: "job source",
498 src: &rowSource{j: &Job{projectID: "p", location: "l", jobID: "j"}},
499 wantJob: &Job{projectID: "p", location: "l", jobID: "j"},
500 },
501 }
502
503 for _, tc := range testcases {
504
505 it := newRowIterator(context.Background(), tc.src, nil)
506 got := it.SourceJob()
507
508 if !cmp.Equal(got, tc.wantJob, cmp.AllowUnexported(Job{})) {
509 t.Errorf("%s: mismatch on SourceJob, got %v want %v", tc.description, got, tc.wantJob)
510 }
511 }
512 }
513
514 func TestIteratorQueryID(t *testing.T) {
515 testcases := []struct {
516 description string
517 src *rowSource
518 want string
519 }{
520 {
521 description: "nil source",
522 src: nil,
523 want: "",
524 },
525 {
526 description: "empty source",
527 src: &rowSource{},
528 want: "",
529 },
530 {
531 description: "populated id",
532 src: &rowSource{queryID: "foo"},
533 want: "foo",
534 },
535 }
536
537 for _, tc := range testcases {
538
539 it := newRowIterator(context.Background(), tc.src, nil)
540 got := it.QueryID()
541 if got != tc.want {
542 t.Errorf("%s: mismatch queryid, got %q want %q", tc.description, got, tc.want)
543 }
544 }
545 }
546
View as plain text