...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/options_test.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"sync"
    19  	"testing"
    20  
    21  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    22  	"github.com/google/go-cmp/cmp"
    23  	"github.com/google/go-cmp/cmp/cmpopts"
    24  	"github.com/googleapis/gax-go/v2"
    25  	"google.golang.org/api/option"
    26  	"google.golang.org/grpc"
    27  	"google.golang.org/protobuf/proto"
    28  	"google.golang.org/protobuf/testing/protocmp"
    29  	"google.golang.org/protobuf/types/descriptorpb"
    30  )
    31  
    32  func TestCustomClientOptions(t *testing.T) {
    33  	testCases := []struct {
    34  		desc    string
    35  		options []option.ClientOption
    36  		want    *writerClientConfig
    37  	}{
    38  		{
    39  			desc: "no options",
    40  			want: &writerClientConfig{},
    41  		},
    42  		{
    43  			desc: "multiplex enable",
    44  			options: []option.ClientOption{
    45  				WithMultiplexing(),
    46  			},
    47  			want: &writerClientConfig{
    48  				useMultiplex:         true,
    49  				maxMultiplexPoolSize: 1,
    50  			},
    51  		},
    52  		{
    53  			desc: "multiplex max",
    54  			options: []option.ClientOption{
    55  				WithMultiplexPoolLimit(99),
    56  			},
    57  			want: &writerClientConfig{
    58  				maxMultiplexPoolSize: 99,
    59  			},
    60  		},
    61  		{
    62  			desc: "default requests",
    63  			options: []option.ClientOption{
    64  				WithDefaultInflightRequests(42),
    65  			},
    66  			want: &writerClientConfig{
    67  				defaultInflightRequests: 42,
    68  			},
    69  		},
    70  		{
    71  			desc: "default bytes",
    72  			options: []option.ClientOption{
    73  				WithDefaultInflightBytes(123),
    74  			},
    75  			want: &writerClientConfig{
    76  				defaultInflightBytes: 123,
    77  			},
    78  		},
    79  		{
    80  			desc: "default call options",
    81  			options: []option.ClientOption{
    82  				WithDefaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
    83  			},
    84  			want: &writerClientConfig{
    85  				defaultAppendRowsCallOptions: []gax.CallOption{
    86  					gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
    87  				},
    88  			},
    89  		},
    90  		{
    91  			desc: "unusual values",
    92  			options: []option.ClientOption{
    93  				WithMultiplexing(),
    94  				WithMultiplexPoolLimit(-8),
    95  				WithDefaultInflightBytes(-1),
    96  				WithDefaultInflightRequests(-99),
    97  			},
    98  			want: &writerClientConfig{
    99  				useMultiplex:            true,
   100  				maxMultiplexPoolSize:    1,
   101  				defaultInflightRequests: 0,
   102  				defaultInflightBytes:    0,
   103  			},
   104  		},
   105  		{
   106  			desc: "multiple options",
   107  			options: []option.ClientOption{
   108  				WithMultiplexing(),
   109  				WithMultiplexPoolLimit(10),
   110  				WithDefaultInflightRequests(99),
   111  				WithDefaultInflightBytes(12345),
   112  				WithDefaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
   113  			},
   114  			want: &writerClientConfig{
   115  				useMultiplex:            true,
   116  				maxMultiplexPoolSize:    10,
   117  				defaultInflightRequests: 99,
   118  				defaultInflightBytes:    12345,
   119  				defaultAppendRowsCallOptions: []gax.CallOption{
   120  					gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
   121  				},
   122  			},
   123  		},
   124  	}
   125  	for _, tc := range testCases {
   126  		gotCfg := newWriterClientConfig(tc.options...)
   127  
   128  		if diff := cmp.Diff(gotCfg, tc.want, cmp.AllowUnexported(writerClientConfig{})); diff != "" {
   129  			t.Errorf("diff in case (%s):\n%v", tc.desc, diff)
   130  		}
   131  	}
   132  }
   133  
   134  func TestWriterOptions(t *testing.T) {
   135  
   136  	testCases := []struct {
   137  		desc    string
   138  		options []WriterOption
   139  		want    *ManagedStream
   140  	}{
   141  		{
   142  			desc:    "WithType",
   143  			options: []WriterOption{WithType(BufferedStream)},
   144  			want: func() *ManagedStream {
   145  				ms := &ManagedStream{
   146  					streamSettings: defaultStreamSettings(),
   147  					curTemplate:    newVersionedTemplate(),
   148  				}
   149  				ms.streamSettings.streamType = BufferedStream
   150  				return ms
   151  			}(),
   152  		},
   153  		{
   154  			desc:    "WithMaxInflightRequests",
   155  			options: []WriterOption{WithMaxInflightRequests(2)},
   156  			want: func() *ManagedStream {
   157  				ms := &ManagedStream{
   158  					streamSettings: defaultStreamSettings(),
   159  					curTemplate:    newVersionedTemplate(),
   160  				}
   161  				ms.streamSettings.MaxInflightRequests = 2
   162  				return ms
   163  			}(),
   164  		},
   165  		{
   166  			desc:    "WithMaxInflightBytes",
   167  			options: []WriterOption{WithMaxInflightBytes(5)},
   168  			want: func() *ManagedStream {
   169  				ms := &ManagedStream{
   170  					streamSettings: defaultStreamSettings(),
   171  					curTemplate:    newVersionedTemplate(),
   172  				}
   173  				ms.streamSettings.MaxInflightBytes = 5
   174  				return ms
   175  			}(),
   176  		},
   177  		{
   178  			desc:    "WithTraceID",
   179  			options: []WriterOption{WithTraceID("foo")},
   180  			want: func() *ManagedStream {
   181  				ms := &ManagedStream{
   182  					streamSettings: defaultStreamSettings(),
   183  					curTemplate:    newVersionedTemplate(),
   184  				}
   185  				ms.streamSettings.TraceID = "foo"
   186  				return ms
   187  			}(),
   188  		},
   189  		{
   190  			desc:    "WithDestinationTable",
   191  			options: []WriterOption{WithDestinationTable("foo")},
   192  			want: func() *ManagedStream {
   193  				ms := &ManagedStream{
   194  					streamSettings: defaultStreamSettings(),
   195  					curTemplate:    newVersionedTemplate(),
   196  				}
   197  				ms.streamSettings.destinationTable = "foo"
   198  				return ms
   199  			}(),
   200  		},
   201  		{
   202  			desc:    "WithDataOrigin",
   203  			options: []WriterOption{WithDataOrigin("origin")},
   204  			want: func() *ManagedStream {
   205  				ms := &ManagedStream{
   206  					streamSettings: defaultStreamSettings(),
   207  					curTemplate:    newVersionedTemplate(),
   208  				}
   209  				ms.streamSettings.dataOrigin = "origin"
   210  				return ms
   211  			}(),
   212  		},
   213  		{
   214  			desc:    "WithCallOption",
   215  			options: []WriterOption{WithAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)))},
   216  			want: func() *ManagedStream {
   217  				ms := &ManagedStream{
   218  					streamSettings: defaultStreamSettings(),
   219  					curTemplate:    newVersionedTemplate(),
   220  				}
   221  				ms.streamSettings.appendCallOptions = append(ms.streamSettings.appendCallOptions,
   222  					gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)))
   223  				return ms
   224  			}(),
   225  		},
   226  		{
   227  			desc:    "EnableRetries",
   228  			options: []WriterOption{EnableWriteRetries(true)},
   229  			want: func() *ManagedStream {
   230  				ms := &ManagedStream{
   231  					streamSettings: defaultStreamSettings(),
   232  					curTemplate:    newVersionedTemplate(),
   233  				}
   234  				ms.retry = newStatelessRetryer()
   235  				return ms
   236  			}(),
   237  		},
   238  		{
   239  			desc:    "WithSchemaDescriptor",
   240  			options: []WriterOption{WithSchemaDescriptor(&descriptorpb.DescriptorProto{Name: proto.String("name")})},
   241  			want: func() *ManagedStream {
   242  				ms := &ManagedStream{
   243  					streamSettings: defaultStreamSettings(),
   244  					curTemplate:    newVersionedTemplate(),
   245  				}
   246  				ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{
   247  					Rows: &storagepb.AppendRowsRequest_ProtoRows{
   248  						ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
   249  							WriterSchema: &storagepb.ProtoSchema{
   250  								ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("name")},
   251  							},
   252  						},
   253  					},
   254  				}
   255  				return ms
   256  			}(),
   257  		},
   258  		{
   259  			desc:    "WithDefaultMissingValueInterpretation",
   260  			options: []WriterOption{WithDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE)},
   261  			want: func() *ManagedStream {
   262  				ms := &ManagedStream{
   263  					streamSettings: defaultStreamSettings(),
   264  					curTemplate:    newVersionedTemplate(),
   265  				}
   266  				ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{
   267  					DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_DEFAULT_VALUE,
   268  				}
   269  				return ms
   270  			}(),
   271  		},
   272  		{
   273  			desc: "WithtMissingValueInterpretations",
   274  			options: []WriterOption{WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
   275  				"foo": storagepb.AppendRowsRequest_DEFAULT_VALUE,
   276  				"bar": storagepb.AppendRowsRequest_NULL_VALUE,
   277  			})},
   278  			want: func() *ManagedStream {
   279  				ms := &ManagedStream{
   280  					streamSettings: defaultStreamSettings(),
   281  					curTemplate:    newVersionedTemplate(),
   282  				}
   283  				ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{
   284  					MissingValueInterpretations: map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
   285  						"foo": storagepb.AppendRowsRequest_DEFAULT_VALUE,
   286  						"bar": storagepb.AppendRowsRequest_NULL_VALUE,
   287  					},
   288  				}
   289  				return ms
   290  			}(),
   291  		},
   292  		{
   293  			desc: "multiple",
   294  			options: []WriterOption{
   295  				WithType(PendingStream),
   296  				WithMaxInflightBytes(5),
   297  				WithTraceID("traceid"),
   298  				EnableWriteRetries(true),
   299  				WithSchemaDescriptor(&descriptorpb.DescriptorProto{Name: proto.String("name")}),
   300  				WithDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE),
   301  				WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
   302  					"foo": storagepb.AppendRowsRequest_DEFAULT_VALUE,
   303  					"bar": storagepb.AppendRowsRequest_NULL_VALUE,
   304  				}),
   305  			},
   306  			want: func() *ManagedStream {
   307  				ms := &ManagedStream{
   308  					streamSettings: defaultStreamSettings(),
   309  					curTemplate:    newVersionedTemplate(),
   310  				}
   311  				ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{
   312  					Rows: &storagepb.AppendRowsRequest_ProtoRows{
   313  						ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
   314  							WriterSchema: &storagepb.ProtoSchema{
   315  								ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("name")},
   316  							},
   317  						},
   318  					},
   319  					MissingValueInterpretations: map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
   320  						"foo": storagepb.AppendRowsRequest_DEFAULT_VALUE,
   321  						"bar": storagepb.AppendRowsRequest_NULL_VALUE,
   322  					},
   323  					DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_DEFAULT_VALUE,
   324  				}
   325  				ms.streamSettings.MaxInflightBytes = 5
   326  				ms.streamSettings.streamType = PendingStream
   327  				ms.streamSettings.TraceID = "traceid"
   328  				ms.retry = newStatelessRetryer()
   329  				return ms
   330  			}(),
   331  		},
   332  	}
   333  
   334  	for _, tc := range testCases {
   335  		got := &ManagedStream{
   336  			streamSettings: defaultStreamSettings(),
   337  			curTemplate:    newVersionedTemplate(),
   338  		}
   339  		for _, o := range tc.options {
   340  			o(got)
   341  		}
   342  
   343  		if diff := cmp.Diff(got, tc.want,
   344  			cmp.AllowUnexported(ManagedStream{}, streamSettings{}),
   345  			cmp.AllowUnexported(sync.Mutex{}),
   346  			cmp.AllowUnexported(versionedTemplate{}),
   347  			cmpopts.IgnoreFields(versionedTemplate{}, "versionTime", "hashVal"),
   348  			protocmp.Transform(), // versionedTemplate embeds proto messages.
   349  			cmpopts.IgnoreUnexported(statelessRetryer{})); diff != "" {
   350  			t.Errorf("diff in case (%s):\n%v", tc.desc, diff)
   351  		}
   352  
   353  	}
   354  }
   355  

View as plain text