...

Source file src/cloud.google.com/go/bigquery/storage_iterator_test.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2023 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"io"
    22  	"testing"
    23  	"time"
    24  
    25  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    26  	gax "github.com/googleapis/gax-go/v2"
    27  	"google.golang.org/grpc/codes"
    28  	"google.golang.org/grpc/status"
    29  )
    30  
    31  func TestStorageIteratorRetry(t *testing.T) {
    32  	cancelledCtx, cancel := context.WithCancel(context.Background())
    33  	cancel()
    34  	testCases := []struct {
    35  		ctx      context.Context
    36  		desc     string
    37  		errors   []error
    38  		wantFail bool
    39  	}{
    40  		{
    41  			desc:     "no error",
    42  			errors:   []error{},
    43  			wantFail: false,
    44  		},
    45  		{
    46  			desc: "transient failures",
    47  			errors: []error{
    48  				status.Errorf(codes.DeadlineExceeded, "try 1"),
    49  				status.Errorf(codes.Unavailable, "try 2"),
    50  				status.Errorf(codes.Canceled, "try 3"),
    51  				status.Errorf(codes.Internal, "try 4"),
    52  			},
    53  			wantFail: false,
    54  		},
    55  		{
    56  			desc: "not enough permission",
    57  			errors: []error{
    58  				status.Errorf(codes.PermissionDenied, "the user does not have 'bigquery.readsessions.getData' permission"),
    59  			},
    60  			wantFail: true,
    61  		},
    62  		{
    63  			desc: "permanent error",
    64  			errors: []error{
    65  				status.Errorf(codes.InvalidArgument, "invalid args"),
    66  			},
    67  			wantFail: true,
    68  		},
    69  		{
    70  			ctx:  cancelledCtx,
    71  			desc: "context cancelled/deadline exceeded",
    72  			errors: []error{
    73  				fmt.Errorf("random error"),
    74  				fmt.Errorf("another random error"),
    75  				fmt.Errorf("yet another random error"),
    76  			},
    77  			wantFail: true,
    78  		},
    79  	}
    80  	for _, tc := range testCases {
    81  		rrc := &testReadRowsClient{
    82  			errors: tc.errors,
    83  		}
    84  		readRowsFuncs := map[string]func(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error){
    85  			"readRows fail on first call": func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
    86  				if len(tc.errors) == 0 {
    87  					return &testReadRowsClient{}, nil
    88  				}
    89  				err := tc.errors[0]
    90  				tc.errors = tc.errors[1:]
    91  				if err != nil {
    92  					return nil, err
    93  				}
    94  				return &testReadRowsClient{}, nil
    95  			},
    96  			"readRows fails on Recv": func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
    97  				return rrc, nil
    98  			},
    99  		}
   100  		for readRowsFuncType, readRowsFunc := range readRowsFuncs {
   101  			baseCtx := tc.ctx
   102  			if baseCtx == nil {
   103  				baseCtx = context.Background()
   104  			}
   105  			ctx, cancel := context.WithTimeout(baseCtx, 5*time.Second)
   106  			defer cancel()
   107  
   108  			it, err := newRawStorageRowIterator(&readSession{
   109  				ctx:          ctx,
   110  				settings:     defaultReadClientSettings(),
   111  				readRowsFunc: readRowsFunc,
   112  				bqSession:    &storagepb.ReadSession{},
   113  			}, Schema{})
   114  			if err != nil {
   115  				t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err)
   116  			}
   117  
   118  			it.processStream("test-stream")
   119  
   120  			if errors.Is(it.ctx.Err(), context.Canceled) || errors.Is(it.ctx.Err(), context.DeadlineExceeded) {
   121  				if tc.wantFail {
   122  					continue
   123  				}
   124  				t.Fatalf("case %s(%s): deadline exceeded", tc.desc, readRowsFuncType)
   125  			}
   126  			if tc.wantFail && len(it.errs) == 0 {
   127  				t.Fatalf("case %s(%s):want test to fail, but found no errors", tc.desc, readRowsFuncType)
   128  			}
   129  			if !tc.wantFail && len(it.errs) > 0 {
   130  				t.Fatalf("case %s(%s):test should not fail, but found %d errors", tc.desc, readRowsFuncType, len(it.errs))
   131  			}
   132  		}
   133  	}
   134  }
   135  
   136  type testReadRowsClient struct {
   137  	storagepb.BigQueryRead_ReadRowsClient
   138  	responses []*storagepb.ReadRowsResponse
   139  	errors    []error
   140  }
   141  
   142  func (trrc *testReadRowsClient) Recv() (*storagepb.ReadRowsResponse, error) {
   143  	if len(trrc.errors) > 0 {
   144  		err := trrc.errors[0]
   145  		trrc.errors = trrc.errors[1:]
   146  		return nil, err
   147  	}
   148  	if len(trrc.responses) > 0 {
   149  		r := trrc.responses[0]
   150  		trrc.responses = trrc.responses[:1]
   151  		return r, nil
   152  	}
   153  	return nil, io.EOF
   154  }
   155  

View as plain text