...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/connection_test.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2022 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"io"
    22  	"testing"
    23  	"time"
    24  
    25  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    26  	"github.com/googleapis/gax-go/v2"
    27  	"google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
    28  	statuspb "google.golang.org/genproto/googleapis/rpc/status"
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/codes"
    31  	"google.golang.org/grpc/status"
    32  )
    33  
    34  func TestConnection_OpenWithRetry(t *testing.T) {
    35  
    36  	testCases := []struct {
    37  		desc     string
    38  		errors   []error
    39  		wantFail bool
    40  	}{
    41  		{
    42  			desc:     "no error",
    43  			errors:   []error{nil},
    44  			wantFail: false,
    45  		},
    46  		{
    47  			desc: "transient failures",
    48  			errors: []error{
    49  				status.Errorf(codes.Unavailable, "try 1"),
    50  				status.Errorf(codes.Unavailable, "try 2"),
    51  				nil},
    52  			wantFail: false,
    53  		},
    54  		{
    55  			desc:     "terminal error",
    56  			errors:   []error{status.Errorf(codes.InvalidArgument, "bad args")},
    57  			wantFail: true,
    58  		},
    59  	}
    60  
    61  	for _, tc := range testCases {
    62  		pool := &connectionPool{
    63  			ctx: context.Background(),
    64  			open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
    65  				if len(tc.errors) == 0 {
    66  					panic("out of errors")
    67  				}
    68  				err := tc.errors[0]
    69  				tc.errors = tc.errors[1:]
    70  				if err == nil {
    71  					return &testAppendRowsClient{}, nil
    72  				}
    73  				return nil, err
    74  			},
    75  		}
    76  		if err := pool.activateRouter(newSimpleRouter("")); err != nil {
    77  			t.Errorf("activateRouter: %v", err)
    78  		}
    79  		writer := &ManagedStream{id: "foo"}
    80  		if err := pool.addWriter(writer); err != nil {
    81  			t.Errorf("addWriter: %v", err)
    82  		}
    83  
    84  		conn, err := pool.router.pickConnection(nil)
    85  		if err != nil {
    86  			t.Errorf("case %s, failed to add connection: %v", tc.desc, err)
    87  		}
    88  		arc, ch, err := pool.openWithRetry(conn)
    89  		if tc.wantFail && err == nil {
    90  			t.Errorf("case %s: wanted failure, got success", tc.desc)
    91  		}
    92  		if !tc.wantFail && err != nil {
    93  			t.Errorf("case %s: wanted success, got %v", tc.desc, err)
    94  		}
    95  		if err == nil {
    96  			if arc == nil {
    97  				t.Errorf("case %s: expected append client, got nil", tc.desc)
    98  			}
    99  			if ch == nil {
   100  				t.Errorf("case %s: expected channel, got nil", tc.desc)
   101  			}
   102  		}
   103  	}
   104  }
   105  
   106  // Ensure we properly refund the flow control during send failures.
   107  // https://github.com/googleapis/google-cloud-go/issues/9540
   108  func TestConnection_LockingAppendFlowRelease(t *testing.T) {
   109  	ctx := context.Background()
   110  
   111  	pool := &connectionPool{
   112  		ctx:                ctx,
   113  		baseFlowController: newFlowController(10, 0),
   114  		open: openTestArc(&testAppendRowsClient{},
   115  			func(req *storagepb.AppendRowsRequest) error {
   116  				// Append always reports EOF on send.
   117  				return io.EOF
   118  			}, nil),
   119  	}
   120  	router := newSimpleRouter("")
   121  	if err := pool.activateRouter(router); err != nil {
   122  		t.Errorf("activateRouter: %v", err)
   123  	}
   124  
   125  	writer := &ManagedStream{id: "foo", ctx: ctx}
   126  	if err := pool.addWriter(writer); err != nil {
   127  		t.Errorf("addWriter: %v", err)
   128  	}
   129  
   130  	pw := newPendingWrite(ctx, writer, &storagepb.AppendRowsRequest{WriteStream: "somestream"}, newVersionedTemplate(), "", "")
   131  	for i := 0; i < 5; i++ {
   132  		conn, err := router.pool.selectConn(pw)
   133  		if err != nil {
   134  			t.Errorf("selectConn: %v", err)
   135  		}
   136  
   137  		// Ensure FC is empty before lockingAppend
   138  		if got := conn.fc.count(); got != 0 {
   139  			t.Errorf("attempt %d expected empty flow count, got %d", i, got)
   140  		}
   141  		if got := conn.fc.bytes(); got != 0 {
   142  			t.Errorf("attempt %d expected empty flow bytes, got %d", i, got)
   143  		}
   144  		// invoke lockingAppend, which fails
   145  		if err := conn.lockingAppend(pw); err != io.EOF {
   146  			t.Errorf("lockingAppend attempt %d: expected io.EOF, got %v", i, err)
   147  		}
   148  		// Ensure we're refunded due to failure
   149  		if got := conn.fc.count(); got != 0 {
   150  			t.Errorf("attempt %d expected empty flow count, got %d", i, got)
   151  		}
   152  		if got := conn.fc.bytes(); got != 0 {
   153  			t.Errorf("attempt %d expected empty flow bytes, got %d", i, got)
   154  		}
   155  	}
   156  }
   157  
   158  // Ensures we don't lose track of channels/connections during reconnects.
   159  // https://github.com/googleapis/google-cloud-go/issues/6766
   160  func TestConnection_LeakingReconnect(t *testing.T) {
   161  
   162  	ctx := context.Background()
   163  
   164  	pool := &connectionPool{
   165  		ctx:                ctx,
   166  		baseFlowController: newFlowController(10, 0),
   167  		open: openTestArc(&testAppendRowsClient{},
   168  			func(req *storagepb.AppendRowsRequest) error {
   169  				// Append always reports EOF on send.
   170  				return io.EOF
   171  			}, nil),
   172  	}
   173  	router := newSimpleRouter("")
   174  	if err := pool.activateRouter(router); err != nil {
   175  		t.Errorf("activateRouter: %v", err)
   176  	}
   177  	writer := &ManagedStream{id: "foo"}
   178  	if err := pool.addWriter(writer); err != nil {
   179  		t.Errorf("addWriter: %v", err)
   180  	}
   181  
   182  	var chans []chan *pendingWrite
   183  
   184  	for i := 0; i < 10; i++ {
   185  		_, ch, err := router.conn.getStream(nil, true)
   186  		if err != nil {
   187  			t.Fatalf("failed getStream(%d): %v", i, err)
   188  		}
   189  		chans = append(chans, ch)
   190  	}
   191  	var closedCount int
   192  	for _, ch := range chans {
   193  		select {
   194  		case _, ok := <-ch:
   195  			if !ok {
   196  				closedCount = closedCount + 1
   197  			}
   198  		case <-time.After(time.Second):
   199  			// we blocked, likely indicative that the channel is open.
   200  			continue
   201  		}
   202  	}
   203  	if wantClosed := len(chans) - 1; wantClosed != closedCount {
   204  		t.Errorf("closed count mismatch, got %d want %d", closedCount, wantClosed)
   205  	}
   206  }
   207  
   208  // Ensures we're propagating call options as expected.
   209  // Background: https://github.com/googleapis/google-cloud-go/issues/6487
   210  func TestConnectionPool_OpenCallOptionPropagation(t *testing.T) {
   211  	ctx, cancel := context.WithCancel(context.Background())
   212  	cancel()
   213  
   214  	pool := &connectionPool{
   215  		ctx:    ctx,
   216  		cancel: cancel,
   217  		open: createOpenF(func(ctx context.Context, opts ...gax.CallOption) (storage.BigQueryWrite_AppendRowsClient, error) {
   218  			if len(opts) == 0 {
   219  				t.Fatalf("no options were propagated")
   220  			}
   221  			return nil, fmt.Errorf("no real client")
   222  		}, ""),
   223  		callOptions: []gax.CallOption{
   224  			gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(99)),
   225  		},
   226  	}
   227  	conn := newConnection(pool, "", nil)
   228  	pool.openWithRetry(conn)
   229  }
   230  
   231  // This test evaluates how the receiver deals with a pending write.
   232  func TestConnection_Receiver(t *testing.T) {
   233  
   234  	var customErr = fmt.Errorf("foo")
   235  
   236  	testCases := []struct {
   237  		description       string
   238  		recvResp          []*testRecvResponse
   239  		wantFinalErr      error
   240  		wantTotalAttempts int
   241  	}{
   242  		{
   243  			description: "no errors",
   244  			recvResp: []*testRecvResponse{
   245  				{
   246  					resp: &storagepb.AppendRowsResponse{},
   247  					err:  nil,
   248  				},
   249  			},
   250  			wantTotalAttempts: 1,
   251  		},
   252  		{
   253  			description: "recv err w/io.EOF",
   254  			recvResp: []*testRecvResponse{
   255  				{
   256  					resp: nil,
   257  					err:  io.EOF,
   258  				},
   259  				{
   260  					resp: &storagepb.AppendRowsResponse{},
   261  					err:  nil,
   262  				},
   263  			},
   264  			wantTotalAttempts: 2,
   265  		},
   266  		{
   267  			description: "recv err retried and then failed",
   268  			recvResp: []*testRecvResponse{
   269  				{
   270  					resp: nil,
   271  					err:  io.EOF,
   272  				},
   273  				{
   274  					resp: nil,
   275  					err:  customErr,
   276  				},
   277  			},
   278  			wantTotalAttempts: 2,
   279  			wantFinalErr:      customErr,
   280  		},
   281  		{
   282  			description: "recv err w/ custom error",
   283  			recvResp: []*testRecvResponse{
   284  				{
   285  					resp: nil,
   286  					err:  customErr,
   287  				},
   288  				{
   289  					resp: &storagepb.AppendRowsResponse{},
   290  					err:  nil,
   291  				},
   292  			},
   293  			wantTotalAttempts: 1,
   294  			wantFinalErr:      customErr,
   295  		},
   296  
   297  		{
   298  			description: "resp embeds Unavailable",
   299  			recvResp: []*testRecvResponse{
   300  				{
   301  					resp: &storagepb.AppendRowsResponse{
   302  						Response: &storagepb.AppendRowsResponse_Error{
   303  							Error: &statuspb.Status{
   304  								Code:    int32(codes.Unavailable),
   305  								Message: "foo",
   306  							},
   307  						},
   308  					},
   309  					err: nil,
   310  				},
   311  				{
   312  					resp: &storagepb.AppendRowsResponse{},
   313  					err:  nil,
   314  				},
   315  			},
   316  			wantTotalAttempts: 2,
   317  		},
   318  		{
   319  			description: "resp embeds generic ResourceExhausted",
   320  			recvResp: []*testRecvResponse{
   321  				{
   322  					resp: &storagepb.AppendRowsResponse{
   323  						Response: &storagepb.AppendRowsResponse_Error{
   324  							Error: &statuspb.Status{
   325  								Code:    int32(codes.ResourceExhausted),
   326  								Message: "foo",
   327  							},
   328  						},
   329  					},
   330  					err: nil,
   331  				},
   332  			},
   333  			wantTotalAttempts: 1,
   334  			wantFinalErr: func() error {
   335  				return status.ErrorProto(&statuspb.Status{
   336  					Code:    int32(codes.ResourceExhausted),
   337  					Message: "foo",
   338  				})
   339  			}(),
   340  		},
   341  		{
   342  			description: "resp embeds throughput ResourceExhausted",
   343  			recvResp: []*testRecvResponse{
   344  				{
   345  					resp: &storagepb.AppendRowsResponse{
   346  						Response: &storagepb.AppendRowsResponse_Error{
   347  							Error: &statuspb.Status{
   348  								Code:    int32(codes.ResourceExhausted),
   349  								Message: "Exceeds 'AppendRows throughput' quota for stream blah",
   350  							},
   351  						},
   352  					},
   353  					err: nil,
   354  				},
   355  				{
   356  					resp: &storagepb.AppendRowsResponse{},
   357  					err:  nil,
   358  				},
   359  			},
   360  			wantTotalAttempts: 2,
   361  		},
   362  		{
   363  			description: "retriable failures until max attempts",
   364  			recvResp: []*testRecvResponse{
   365  				{
   366  					err: io.EOF,
   367  				},
   368  				{
   369  					err: io.EOF,
   370  				},
   371  				{
   372  					err: io.EOF,
   373  				},
   374  				{
   375  					err: io.EOF,
   376  				},
   377  			},
   378  			wantTotalAttempts: 4,
   379  			wantFinalErr:      io.EOF,
   380  		},
   381  	}
   382  
   383  	for _, tc := range testCases {
   384  		ctx, cancel := context.WithCancel(context.Background())
   385  
   386  		testArc := &testAppendRowsClient{
   387  			responses: tc.recvResp,
   388  		}
   389  
   390  		pool := &connectionPool{
   391  			ctx: ctx,
   392  			open: openTestArc(testArc, nil,
   393  				func() (*storagepb.AppendRowsResponse, error) {
   394  					if len(testArc.responses) == 0 {
   395  						panic("out of responses")
   396  					}
   397  					curResp := testArc.responses[0]
   398  					testArc.responses = testArc.responses[1:]
   399  					return curResp.resp, curResp.err
   400  				},
   401  			),
   402  			baseFlowController: newFlowController(0, 0),
   403  		}
   404  		router := newSimpleRouter("")
   405  		if err := pool.activateRouter(router); err != nil {
   406  			t.Errorf("activateRouter: %v", err)
   407  		}
   408  
   409  		ms := &ManagedStream{
   410  			id:    "foo",
   411  			ctx:   ctx,
   412  			retry: newStatelessRetryer(),
   413  		}
   414  		if err := pool.addWriter(ms); err != nil {
   415  			t.Errorf("addWriter: %v", err)
   416  		}
   417  		conn := router.conn
   418  		// use openWithRetry to get the reference to the channel and add our test pending write.
   419  		_, ch, _ := pool.openWithRetry(conn)
   420  		pw := newPendingWrite(ctx, ms, &storagepb.AppendRowsRequest{}, nil, "", "")
   421  		pw.writer = ms
   422  		pw.attemptCount = 1 // we're injecting directly, but attribute this as a single attempt.
   423  		ch <- pw
   424  
   425  		// Wait until the write is marked done.
   426  		<-pw.result.Ready()
   427  
   428  		// Check retry count is as expected.
   429  		gotTotalAttempts, err := pw.result.TotalAttempts(ctx)
   430  		if err != nil {
   431  			t.Errorf("%s: failed to get total attempts: %v", tc.description, err)
   432  		}
   433  		if gotTotalAttempts != tc.wantTotalAttempts {
   434  			t.Errorf("%s: got %d total attempts, want %d attempts", tc.description, gotTotalAttempts, tc.wantTotalAttempts)
   435  		}
   436  
   437  		// Check that the write got the expected final result.
   438  		if gotFinalErr := pw.result.err; !errors.Is(gotFinalErr, tc.wantFinalErr) {
   439  			t.Errorf("%s: got final error %v, wanted final error %v", tc.description, gotFinalErr, tc.wantFinalErr)
   440  		}
   441  		cancel()
   442  	}
   443  }
   444  

View as plain text