...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/routers_test.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2023 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"math/rand"
    21  	"testing"
    22  	"time"
    23  
    24  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    25  	"github.com/googleapis/gax-go/v2"
    26  )
    27  
    28  func TestSimpleRouter(t *testing.T) {
    29  	ctx := context.Background()
    30  
    31  	pool := &connectionPool{
    32  		ctx: ctx,
    33  		open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
    34  			return &testAppendRowsClient{}, nil
    35  		},
    36  	}
    37  
    38  	router := newSimpleRouter("")
    39  	if err := pool.activateRouter(router); err != nil {
    40  		t.Errorf("activateRouter: %v", err)
    41  	}
    42  
    43  	ms := &ManagedStream{
    44  		ctx:   ctx,
    45  		retry: newStatelessRetryer(),
    46  	}
    47  
    48  	pw := newPendingWrite(ctx, ms, &storagepb.AppendRowsRequest{}, nil, "", "")
    49  
    50  	// picking before attaching should yield error
    51  	if _, err := pool.router.pickConnection(pw); err == nil {
    52  		t.Errorf("pickConnection: expected error, got success")
    53  	}
    54  	writer := &ManagedStream{
    55  		id: "writer",
    56  	}
    57  	if err := pool.addWriter(writer); err != nil {
    58  		t.Errorf("addWriter: %v", err)
    59  	}
    60  	if _, err := pool.router.pickConnection(pw); err != nil {
    61  		t.Errorf("pickConnection error: %v", err)
    62  	}
    63  	if err := pool.removeWriter(writer); err != nil {
    64  		t.Errorf("disconnectWriter: %v", err)
    65  	}
    66  	if _, err := pool.router.pickConnection(pw); err == nil {
    67  		t.Errorf("pickConnection: expected error, got success")
    68  	}
    69  }
    70  
    71  func TestSharedRouter_Basic(t *testing.T) {
    72  	ctx, cancel := context.WithCancel(context.Background())
    73  
    74  	pool := &connectionPool{
    75  		ctx:    ctx,
    76  		cancel: cancel,
    77  		open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
    78  			return &testAppendRowsClient{}, nil
    79  		},
    80  	}
    81  
    82  	router := newSharedRouter(false, 0)
    83  	if err := pool.activateRouter(router); err != nil {
    84  		t.Errorf("activateRouter: %v", err)
    85  	}
    86  	if gotConns := len(router.exclusiveConns); gotConns != 0 {
    87  		t.Errorf("expected zero connections are start, got %d", gotConns)
    88  	}
    89  
    90  	ms := &ManagedStream{
    91  		ctx:   ctx,
    92  		retry: newStatelessRetryer(),
    93  	}
    94  	pw := newPendingWrite(ctx, ms, &storagepb.AppendRowsRequest{}, nil, "", "")
    95  	// picking before attaching should yield error
    96  	if _, err := pool.router.pickConnection(pw); err == nil {
    97  		t.Errorf("pickConnection: expected error, got success")
    98  	}
    99  	// attaching a writer without an ID should error.
   100  	if err := pool.addWriter(ms); err == nil {
   101  		t.Errorf("expected id-less addWriter to fail")
   102  	}
   103  	ms.id = "writer"
   104  	if err := pool.addWriter(ms); err != nil {
   105  		t.Errorf("addWriter: %v", err)
   106  	}
   107  
   108  	if _, err := pool.router.pickConnection(pw); err != nil {
   109  		t.Errorf("pickConnection error: %v", err)
   110  	}
   111  	if err := pool.removeWriter(ms); err != nil {
   112  		t.Errorf("disconnectWriter: %v", err)
   113  	}
   114  	if _, err := pool.router.pickConnection(pw); err == nil {
   115  		t.Errorf("pickConnection: expected error, got success")
   116  	}
   117  }
   118  
   119  func TestSharedRouter_Multiplex(t *testing.T) {
   120  	ctx, cancel := context.WithCancel(context.Background())
   121  
   122  	pool := &connectionPool{
   123  		id:     newUUID(poolIDPrefix),
   124  		ctx:    ctx,
   125  		cancel: cancel,
   126  		open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
   127  			return &testAppendRowsClient{}, nil
   128  		},
   129  		baseFlowController: newFlowController(2, 10),
   130  	}
   131  	defer pool.Close()
   132  
   133  	router := newSharedRouter(true, 3)
   134  	if err := pool.activateRouter(router); err != nil {
   135  		t.Errorf("activateRouter: %v", err)
   136  	}
   137  
   138  	wantConnCount := 0
   139  	if got := len(router.multiConns); wantConnCount != got {
   140  		t.Errorf("wanted %d conns, got %d", wantConnCount, got)
   141  	}
   142  
   143  	writerA := &ManagedStream{
   144  		id:             newUUID(writerIDPrefix),
   145  		streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
   146  		ctx:            ctx,
   147  		cancel:         cancel,
   148  	}
   149  	if err := pool.router.writerAttach(writerA); err != nil {
   150  		t.Fatalf("writerA attach: %v", err)
   151  	}
   152  
   153  	// after a writer attached, we expect one conn.
   154  	wantConnCount = 1
   155  	if got := len(router.multiConns); wantConnCount != got {
   156  		t.Errorf("wanted %d conns, got %d", wantConnCount, got)
   157  	}
   158  
   159  	writerB := &ManagedStream{
   160  		id:             newUUID(writerIDPrefix),
   161  		streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
   162  		ctx:            ctx,
   163  		cancel:         cancel,
   164  	}
   165  	if err := pool.router.writerAttach(writerB); err != nil {
   166  		t.Fatalf("writerA attach: %v", err)
   167  	}
   168  	writerC := &ManagedStream{
   169  		id:             newUUID(writerIDPrefix),
   170  		streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
   171  		ctx:            ctx,
   172  		cancel:         cancel,
   173  	}
   174  	if err := pool.router.writerAttach(writerC); err != nil {
   175  		t.Fatalf("writerA attach: %v", err)
   176  	}
   177  
   178  	wantConnCount = 1
   179  	if got := len(router.multiConns); wantConnCount != got {
   180  		t.Fatalf("wanted %d conns, got %d", wantConnCount, got)
   181  	}
   182  
   183  	pw := newPendingWrite(ctx, writerA, &storagepb.AppendRowsRequest{}, nil, "", "")
   184  	conn, err := router.pickConnection(pw)
   185  	if err != nil {
   186  		t.Fatalf("pickConnection writerA: %v", err)
   187  	}
   188  	// generate fake load on the conn associated with writer A
   189  	conn.fc.acquire(ctx, 1)
   190  	conn.fc.acquire(ctx, 1)
   191  
   192  	if !conn.isLoaded() {
   193  		t.Errorf("expected conn to be loaded, was not")
   194  	}
   195  	// wait for a watchdog interval
   196  	time.Sleep(watchDogInterval * 2)
   197  
   198  	wantConnCount = 2
   199  	// grab read lock so we can assert internal state of the router
   200  	router.multiMu.RLock()
   201  	defer router.multiMu.RUnlock()
   202  	if got := len(router.multiConns); wantConnCount != got {
   203  		t.Fatalf("wanted %d conns, got %d", wantConnCount, got)
   204  	}
   205  	gotLoad0 := router.multiConns[0].curLoad()
   206  	gotLoad1 := router.multiConns[1].curLoad()
   207  	if gotLoad0 > gotLoad1 {
   208  		t.Errorf("expected connections to be ordered by load, got %f, %f", gotLoad0, gotLoad1)
   209  	}
   210  	// verify that rebalance occurred
   211  	connsWithWriters := 0
   212  	for _, v := range router.invertedMultiMap {
   213  		if len(v) > 0 {
   214  			connsWithWriters++
   215  		}
   216  	}
   217  	if connsWithWriters < wantConnCount {
   218  		t.Errorf("wanted at least %d connections to have writers attached, got %d", wantConnCount, connsWithWriters)
   219  	}
   220  
   221  }
   222  
   223  func BenchmarkRoutingParallel(b *testing.B) {
   224  
   225  	for _, bm := range []struct {
   226  		desc              string
   227  		router            poolRouter
   228  		numWriters        int
   229  		numDefaultWriters int
   230  	}{
   231  		{
   232  			desc:              "SimpleRouter",
   233  			router:            newSimpleRouter(""),
   234  			numWriters:        1,
   235  			numDefaultWriters: 1,
   236  		},
   237  		{
   238  			desc:              "SimpleRouter",
   239  			router:            newSimpleRouter(""),
   240  			numWriters:        10,
   241  			numDefaultWriters: 10,
   242  		},
   243  		{
   244  			desc:              "SharedRouter_NoMultiplex",
   245  			router:            newSharedRouter(false, 0),
   246  			numWriters:        1,
   247  			numDefaultWriters: 1,
   248  		},
   249  		{
   250  			desc:              "SharedRouter_NoMultiplex",
   251  			router:            newSharedRouter(false, 0),
   252  			numWriters:        10,
   253  			numDefaultWriters: 10,
   254  		},
   255  		{
   256  			desc:              "SharedRouter_Multiplex1conn",
   257  			router:            newSharedRouter(true, 1),
   258  			numWriters:        1,
   259  			numDefaultWriters: 1,
   260  		},
   261  		{
   262  			desc:              "SharedRouterMultiplex1conn",
   263  			router:            newSharedRouter(true, 1),
   264  			numWriters:        10,
   265  			numDefaultWriters: 10,
   266  		},
   267  		{
   268  			desc:              "SharedRouterMultiplex1conn",
   269  			router:            newSharedRouter(true, 1),
   270  			numWriters:        50,
   271  			numDefaultWriters: 50,
   272  		},
   273  		{
   274  			desc:              "SharedRouterMultiplex10conn",
   275  			router:            newSharedRouter(true, 10),
   276  			numWriters:        50,
   277  			numDefaultWriters: 50,
   278  		},
   279  	} {
   280  
   281  		ctx, cancel := context.WithCancel(context.Background())
   282  		pool := &connectionPool{
   283  			ctx:    ctx,
   284  			cancel: cancel,
   285  			open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
   286  				return &testAppendRowsClient{}, nil
   287  			},
   288  		}
   289  		if err := pool.activateRouter(bm.router); err != nil {
   290  			b.Errorf("%q: activateRouter: %v", bm.desc, err)
   291  		}
   292  
   293  		// setup both explicit and default stream writers.
   294  		var explicitWriters []*ManagedStream
   295  		var defaultWriters []*ManagedStream
   296  
   297  		for i := 0; i < bm.numWriters; i++ {
   298  			wCtx, wCancel := context.WithCancel(ctx)
   299  			writer := &ManagedStream{
   300  				id:             newUUID(writerIDPrefix),
   301  				streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/abc123"},
   302  				ctx:            wCtx,
   303  				cancel:         wCancel,
   304  				retry:          newStatelessRetryer(),
   305  			}
   306  			explicitWriters = append(explicitWriters, writer)
   307  		}
   308  		for i := 0; i < bm.numDefaultWriters; i++ {
   309  			wCtx, wCancel := context.WithCancel(ctx)
   310  			writer := &ManagedStream{
   311  				id:             newUUID(writerIDPrefix),
   312  				streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
   313  
   314  				ctx:    wCtx,
   315  				cancel: wCancel,
   316  				retry:  newStatelessRetryer(),
   317  			}
   318  			defaultWriters = append(defaultWriters, writer)
   319  		}
   320  
   321  		// attach all writers to router.
   322  		for k, writer := range explicitWriters {
   323  			if err := pool.addWriter(writer); err != nil {
   324  				b.Errorf("addWriter %d: %v", k, err)
   325  			}
   326  		}
   327  		for k, writer := range defaultWriters {
   328  			if err := pool.addWriter(writer); err != nil {
   329  				b.Errorf("addWriter %d: %v", k, err)
   330  			}
   331  		}
   332  
   333  		baseBenchName := fmt.Sprintf("%s_%dexwriters_%dmpwriters", bm.desc, bm.numWriters, bm.numDefaultWriters)
   334  
   335  		// Benchmark routing for explicit writers.
   336  		if bm.numWriters > 0 {
   337  			benchName := fmt.Sprintf("%s_explicitwriters", baseBenchName)
   338  
   339  			b.Run(benchName, func(b *testing.B) {
   340  				r := rand.New(rand.NewSource(1))
   341  				b.ResetTimer()
   342  				for i := 0; i < b.N; i++ {
   343  					// pick a random explicit writer each time.
   344  					writer := explicitWriters[r.Intn(bm.numWriters)]
   345  					pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "")
   346  					if _, err := bm.router.pickConnection(pw); err != nil {
   347  						b.Errorf("pickConnection: %v", err)
   348  					}
   349  				}
   350  			})
   351  		}
   352  
   353  		// Benchmark concurrent routing for explicit writers.
   354  		if bm.numWriters > 0 {
   355  			benchName := fmt.Sprintf("%s_explicitwriters_concurrent", baseBenchName)
   356  			b.Run(benchName, func(b *testing.B) {
   357  				b.RunParallel(func(pb *testing.PB) {
   358  					r := rand.New(rand.NewSource(1))
   359  					for pb.Next() {
   360  						writer := explicitWriters[r.Intn(bm.numWriters)]
   361  						pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "")
   362  						if _, err := bm.router.pickConnection(pw); err != nil {
   363  							b.Errorf("pickConnection: %v", err)
   364  						}
   365  					}
   366  				})
   367  			})
   368  		}
   369  
   370  		// Benchmark routing for default writers.
   371  		if bm.numDefaultWriters > 0 {
   372  			benchName := fmt.Sprintf("%s_defaultwriters", baseBenchName)
   373  
   374  			b.Run(benchName, func(b *testing.B) {
   375  				r := rand.New(rand.NewSource(1))
   376  				b.ResetTimer()
   377  				for i := 0; i < b.N; i++ {
   378  					// pick a random default writer each time.
   379  					writer := defaultWriters[r.Intn(bm.numDefaultWriters)]
   380  					pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "")
   381  					if _, err := bm.router.pickConnection(pw); err != nil {
   382  						b.Errorf("pickConnection: %v", err)
   383  					}
   384  				}
   385  			})
   386  		}
   387  
   388  		// Benchmark concurrent routing for default writers.
   389  		if bm.numDefaultWriters > 0 {
   390  			benchName := fmt.Sprintf("%s_defaultwriters_concurrent", baseBenchName)
   391  
   392  			b.Run(benchName, func(b *testing.B) {
   393  				b.RunParallel(func(pb *testing.PB) {
   394  					r := rand.New(rand.NewSource(1))
   395  					for pb.Next() {
   396  						writer := defaultWriters[r.Intn(bm.numDefaultWriters)]
   397  						pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "")
   398  						if _, err := bm.router.pickConnection(pw); err != nil {
   399  							b.Errorf("pickConnection: %v", err)
   400  						}
   401  					}
   402  				})
   403  			})
   404  		}
   405  
   406  		for _, writer := range explicitWriters {
   407  			writer.Close()
   408  		}
   409  		for _, writer := range defaultWriters {
   410  			writer.Close()
   411  		}
   412  
   413  		pool.Close()
   414  
   415  	}
   416  
   417  }
   418  
   419  func BenchmarkWatchdogPulse(b *testing.B) {
   420  	maxFlowInserts := 100
   421  	maxFlowBytes := 1024
   422  	for _, numWriters := range []int{1, 2, 5, 10, 50, 100, 250} {
   423  		for _, numConnections := range []int{1, 2, 4} {
   424  
   425  			ctx, cancel := context.WithCancel(context.Background())
   426  			// we build the router manually so we can control the watchdog for this benchmark.
   427  			router := newSharedRouter(false, numConnections)
   428  
   429  			pool := &connectionPool{
   430  				ctx:    ctx,
   431  				cancel: cancel,
   432  				open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
   433  					return &testAppendRowsClient{}, nil
   434  				},
   435  				baseFlowController: newFlowController(maxFlowInserts, maxFlowBytes),
   436  			}
   437  			if err := pool.activateRouter(router); err != nil {
   438  				b.Fatalf("(@%d-@%d): activateRouter: %v", numWriters, numConnections, err)
   439  			}
   440  			// now, set router as multiplex.  We do this to avoid router activation starting the watchdog
   441  			// in a seperate goroutine.
   442  			router.multiplex = true
   443  
   444  			var writers []*ManagedStream
   445  
   446  			for i := 0; i < numWriters; i++ {
   447  				wCtx, wCancel := context.WithCancel(ctx)
   448  				writer := &ManagedStream{
   449  					id:             newUUID(writerIDPrefix),
   450  					streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
   451  
   452  					ctx:    wCtx,
   453  					cancel: wCancel,
   454  					retry:  newStatelessRetryer(),
   455  				}
   456  				writers = append(writers, writer)
   457  				if err := pool.addWriter(writer); err != nil {
   458  					b.Fatalf("addWriter %d (@%d-@%d): %v", i, numWriters, numConnections, err)
   459  				}
   460  			}
   461  
   462  			// Generate fake load for all connections.
   463  			r := rand.New(rand.NewSource(time.Now().UnixNano()))
   464  			countLoad := make([]int, numConnections)
   465  			byteLoad := make([]int, numConnections)
   466  			for i := 0; i < numConnections; i++ {
   467  				countLoad[i] = r.Intn(maxFlowInserts)
   468  				byteLoad[i] = r.Intn(maxFlowBytes)
   469  			}
   470  
   471  			benchName := fmt.Sprintf("%dwriters_%dconns", numWriters, numConnections)
   472  			b.Run(benchName, func(b *testing.B) {
   473  				if b.N > 9999 {
   474  					b.Skip("benchmark unstable, only run with -benchtime=NNNNx")
   475  				}
   476  				for i := 0; i < b.N; i++ {
   477  					b.StopTimer()
   478  					// Each iteration, we reset the loads to the predetermined values, and repoint
   479  					// all writers to the first connection.
   480  					for c := 0; c < len(router.multiConns); c++ {
   481  						router.multiConns[c].fc.countTracked = int64(countLoad[c])
   482  						router.multiConns[c].fc.bytesTracked = int64(byteLoad[c])
   483  					}
   484  					for k := range router.multiMap {
   485  						router.multiMap[k] = router.multiConns[0]
   486  					}
   487  					router.invertedMultiMap = make(map[string][]*ManagedStream)
   488  					writerSlice := make([]*ManagedStream, len(writers))
   489  					copy(writerSlice, writers)
   490  					router.invertedMultiMap[router.multiConns[0].id] = writerSlice
   491  					b.StartTimer()
   492  					router.watchdogPulse()
   493  				}
   494  			})
   495  
   496  			for _, writer := range writers {
   497  				writer.Close()
   498  			}
   499  
   500  			pool.Close()
   501  
   502  		}
   503  	}
   504  
   505  }
   506  

View as plain text