...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/client_test.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"context"
    19  	"testing"
    20  
    21  	"github.com/googleapis/gax-go/v2"
    22  	"google.golang.org/grpc"
    23  	"google.golang.org/grpc/metadata"
    24  )
    25  
    26  func TestTableParentFromStreamName(t *testing.T) {
    27  	testCases := []struct {
    28  		in   string
    29  		want string
    30  	}{
    31  		{
    32  			"bad",
    33  			"bad",
    34  		},
    35  		{
    36  			"projects/foo/datasets/bar/tables/baz",
    37  			"projects/foo/datasets/bar/tables/baz",
    38  		},
    39  		{
    40  			"projects/foo/datasets/bar/tables/baz/zip/zam/zoomie",
    41  			"projects/foo/datasets/bar/tables/baz",
    42  		},
    43  		{
    44  			"projects/foo/datasets/bar/tables/baz/_default",
    45  			"projects/foo/datasets/bar/tables/baz",
    46  		},
    47  	}
    48  
    49  	for _, tc := range testCases {
    50  		got := TableParentFromStreamName(tc.in)
    51  		if got != tc.want {
    52  			t.Errorf("mismatch on %s: got %s want %s", tc.in, got, tc.want)
    53  		}
    54  	}
    55  }
    56  
    57  func TestCreatePool_Location(t *testing.T) {
    58  	t.Skip("skipping until new write_location is allowed")
    59  	c := &Client{
    60  		cfg:       &writerClientConfig{},
    61  		ctx:       context.Background(),
    62  		projectID: "myproj",
    63  	}
    64  	pool, err := c.createPool("foo", nil)
    65  	if err != nil {
    66  		t.Fatalf("createPool: %v", err)
    67  	}
    68  	meta, ok := metadata.FromOutgoingContext(pool.ctx)
    69  	if !ok {
    70  		t.Fatalf("no metadata in outgoing context")
    71  	}
    72  	vals, ok := meta["x-goog-request-params"]
    73  	if !ok {
    74  		t.Fatalf("metadata key not present")
    75  	}
    76  	found := false
    77  	for _, v := range vals {
    78  		if v == "write_location=projects/myproj/locations/foo" {
    79  			found = true
    80  			break
    81  		}
    82  	}
    83  	if !found {
    84  		t.Fatal("expected location header not found")
    85  	}
    86  }
    87  
    88  // TestCreatePool tests the result of calling createPool with different combinations
    89  // of global configuration and per-writer configuration.
    90  func TestCreatePool(t *testing.T) {
    91  	testCases := []struct {
    92  		desc                string
    93  		cfg                 *writerClientConfig
    94  		settings            *streamSettings
    95  		wantMaxBytes        int
    96  		wantMaxRequests     int
    97  		wantCallOptions     int
    98  		wantPoolCallOptions int
    99  	}{
   100  		{
   101  			desc: "cfg, no settings",
   102  			cfg: &writerClientConfig{
   103  				defaultInflightRequests: 12,
   104  				defaultInflightBytes:    2048,
   105  			},
   106  			wantMaxBytes:    2048,
   107  			wantMaxRequests: 12,
   108  		},
   109  		{
   110  			desc: "empty cfg, w/settings",
   111  			cfg:  &writerClientConfig{},
   112  			settings: &streamSettings{
   113  				MaxInflightRequests: 99,
   114  				MaxInflightBytes:    1024,
   115  				appendCallOptions:   []gax.CallOption{gax.WithPath("foo")},
   116  			},
   117  			wantMaxBytes:    1024,
   118  			wantMaxRequests: 99,
   119  			wantCallOptions: 1,
   120  		},
   121  		{
   122  			desc: "both cfg and settings",
   123  			cfg: &writerClientConfig{
   124  				defaultInflightRequests:      123,
   125  				defaultInflightBytes:         456,
   126  				defaultAppendRowsCallOptions: []gax.CallOption{gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(999))},
   127  			},
   128  			settings: &streamSettings{
   129  				MaxInflightRequests: 99,
   130  				MaxInflightBytes:    1024,
   131  			},
   132  			wantMaxBytes:        1024,
   133  			wantMaxRequests:     99,
   134  			wantPoolCallOptions: 1,
   135  		},
   136  		{
   137  			desc: "merge defaults and settings",
   138  			cfg: &writerClientConfig{
   139  				defaultInflightRequests:      123,
   140  				defaultInflightBytes:         456,
   141  				defaultAppendRowsCallOptions: []gax.CallOption{gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(999))},
   142  			},
   143  			settings: &streamSettings{
   144  				MaxInflightBytes:  1024,
   145  				appendCallOptions: []gax.CallOption{gax.WithPath("foo")},
   146  			},
   147  			wantMaxBytes:        1024,
   148  			wantMaxRequests:     123,
   149  			wantCallOptions:     1,
   150  			wantPoolCallOptions: 1,
   151  		},
   152  	}
   153  
   154  	for _, tc := range testCases {
   155  		c := &Client{
   156  			cfg: tc.cfg,
   157  			ctx: context.Background(),
   158  		}
   159  		pool, err := c.createPool("", nil)
   160  		if err != nil {
   161  			t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err)
   162  			continue
   163  		}
   164  		writer := &ManagedStream{
   165  			id:             "foo",
   166  			streamSettings: tc.settings,
   167  		}
   168  		if err = pool.addWriter(writer); err != nil {
   169  			t.Errorf("case %q: addWriter: %v", tc.desc, err)
   170  		}
   171  		pw := newPendingWrite(context.Background(), writer, nil, nil, "", "")
   172  		gotConn, err := pool.selectConn(pw)
   173  		if err != nil {
   174  			t.Errorf("case %q: selectConn: %v", tc.desc, err)
   175  		}
   176  
   177  		// too many go-cmp overrides needed to quickly diff here, look at the interesting fields explicitly.
   178  		if gotVal := gotConn.fc.maxInsertBytes; gotVal != tc.wantMaxBytes {
   179  			t.Errorf("case %q: flowController maxInsertBytes mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxBytes)
   180  		}
   181  		if gotVal := gotConn.fc.maxInsertCount; gotVal != tc.wantMaxRequests {
   182  			t.Errorf("case %q: flowController maxInsertCount mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxRequests)
   183  		}
   184  		if gotVal := len(gotConn.callOptions); gotVal != tc.wantCallOptions {
   185  			t.Errorf("case %q: calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantCallOptions)
   186  		}
   187  		if gotVal := len(pool.callOptions); gotVal != tc.wantPoolCallOptions {
   188  			t.Errorf("case %q: POOL calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantPoolCallOptions)
   189  		}
   190  	}
   191  }
   192  

View as plain text