// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package managedwriter import ( "context" "fmt" "math/rand" "testing" "time" "cloud.google.com/go/bigquery/storage/apiv1/storagepb" "github.com/googleapis/gax-go/v2" ) func TestSimpleRouter(t *testing.T) { ctx := context.Background() pool := &connectionPool{ ctx: ctx, open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { return &testAppendRowsClient{}, nil }, } router := newSimpleRouter("") if err := pool.activateRouter(router); err != nil { t.Errorf("activateRouter: %v", err) } ms := &ManagedStream{ ctx: ctx, retry: newStatelessRetryer(), } pw := newPendingWrite(ctx, ms, &storagepb.AppendRowsRequest{}, nil, "", "") // picking before attaching should yield error if _, err := pool.router.pickConnection(pw); err == nil { t.Errorf("pickConnection: expected error, got success") } writer := &ManagedStream{ id: "writer", } if err := pool.addWriter(writer); err != nil { t.Errorf("addWriter: %v", err) } if _, err := pool.router.pickConnection(pw); err != nil { t.Errorf("pickConnection error: %v", err) } if err := pool.removeWriter(writer); err != nil { t.Errorf("disconnectWriter: %v", err) } if _, err := pool.router.pickConnection(pw); err == nil { t.Errorf("pickConnection: expected error, got success") } } func TestSharedRouter_Basic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) pool := &connectionPool{ ctx: ctx, cancel: cancel, open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { return &testAppendRowsClient{}, nil }, } router := newSharedRouter(false, 0) if err := pool.activateRouter(router); err != nil { t.Errorf("activateRouter: %v", err) } if gotConns := len(router.exclusiveConns); gotConns != 0 { t.Errorf("expected zero connections are start, got %d", gotConns) } ms := &ManagedStream{ ctx: ctx, retry: newStatelessRetryer(), } pw := newPendingWrite(ctx, ms, &storagepb.AppendRowsRequest{}, nil, "", "") // picking before attaching should yield error if _, err := pool.router.pickConnection(pw); err == nil { t.Errorf("pickConnection: expected error, got success") } // attaching a writer without an ID should error. if err := pool.addWriter(ms); err == nil { t.Errorf("expected id-less addWriter to fail") } ms.id = "writer" if err := pool.addWriter(ms); err != nil { t.Errorf("addWriter: %v", err) } if _, err := pool.router.pickConnection(pw); err != nil { t.Errorf("pickConnection error: %v", err) } if err := pool.removeWriter(ms); err != nil { t.Errorf("disconnectWriter: %v", err) } if _, err := pool.router.pickConnection(pw); err == nil { t.Errorf("pickConnection: expected error, got success") } } func TestSharedRouter_Multiplex(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) pool := &connectionPool{ id: newUUID(poolIDPrefix), ctx: ctx, cancel: cancel, open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { return &testAppendRowsClient{}, nil }, baseFlowController: newFlowController(2, 10), } defer pool.Close() router := newSharedRouter(true, 3) if err := pool.activateRouter(router); err != nil { t.Errorf("activateRouter: %v", err) } wantConnCount := 0 if got := len(router.multiConns); wantConnCount != got { t.Errorf("wanted %d conns, got %d", wantConnCount, got) } writerA := &ManagedStream{ id: newUUID(writerIDPrefix), streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"}, ctx: ctx, cancel: cancel, } if err := pool.router.writerAttach(writerA); err != nil { t.Fatalf("writerA attach: %v", err) } // after a writer attached, we expect one conn. wantConnCount = 1 if got := len(router.multiConns); wantConnCount != got { t.Errorf("wanted %d conns, got %d", wantConnCount, got) } writerB := &ManagedStream{ id: newUUID(writerIDPrefix), streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"}, ctx: ctx, cancel: cancel, } if err := pool.router.writerAttach(writerB); err != nil { t.Fatalf("writerA attach: %v", err) } writerC := &ManagedStream{ id: newUUID(writerIDPrefix), streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"}, ctx: ctx, cancel: cancel, } if err := pool.router.writerAttach(writerC); err != nil { t.Fatalf("writerA attach: %v", err) } wantConnCount = 1 if got := len(router.multiConns); wantConnCount != got { t.Fatalf("wanted %d conns, got %d", wantConnCount, got) } pw := newPendingWrite(ctx, writerA, &storagepb.AppendRowsRequest{}, nil, "", "") conn, err := router.pickConnection(pw) if err != nil { t.Fatalf("pickConnection writerA: %v", err) } // generate fake load on the conn associated with writer A conn.fc.acquire(ctx, 1) conn.fc.acquire(ctx, 1) if !conn.isLoaded() { t.Errorf("expected conn to be loaded, was not") } // wait for a watchdog interval time.Sleep(watchDogInterval * 2) wantConnCount = 2 // grab read lock so we can assert internal state of the router router.multiMu.RLock() defer router.multiMu.RUnlock() if got := len(router.multiConns); wantConnCount != got { t.Fatalf("wanted %d conns, got %d", wantConnCount, got) } gotLoad0 := router.multiConns[0].curLoad() gotLoad1 := router.multiConns[1].curLoad() if gotLoad0 > gotLoad1 { t.Errorf("expected connections to be ordered by load, got %f, %f", gotLoad0, gotLoad1) } // verify that rebalance occurred connsWithWriters := 0 for _, v := range router.invertedMultiMap { if len(v) > 0 { connsWithWriters++ } } if connsWithWriters < wantConnCount { t.Errorf("wanted at least %d connections to have writers attached, got %d", wantConnCount, connsWithWriters) } } func BenchmarkRoutingParallel(b *testing.B) { for _, bm := range []struct { desc string router poolRouter numWriters int numDefaultWriters int }{ { desc: "SimpleRouter", router: newSimpleRouter(""), numWriters: 1, numDefaultWriters: 1, }, { desc: "SimpleRouter", router: newSimpleRouter(""), numWriters: 10, numDefaultWriters: 10, }, { desc: "SharedRouter_NoMultiplex", router: newSharedRouter(false, 0), numWriters: 1, numDefaultWriters: 1, }, { desc: "SharedRouter_NoMultiplex", router: newSharedRouter(false, 0), numWriters: 10, numDefaultWriters: 10, }, { desc: "SharedRouter_Multiplex1conn", router: newSharedRouter(true, 1), numWriters: 1, numDefaultWriters: 1, }, { desc: "SharedRouterMultiplex1conn", router: newSharedRouter(true, 1), numWriters: 10, numDefaultWriters: 10, }, { desc: "SharedRouterMultiplex1conn", router: newSharedRouter(true, 1), numWriters: 50, numDefaultWriters: 50, }, { desc: "SharedRouterMultiplex10conn", router: newSharedRouter(true, 10), numWriters: 50, numDefaultWriters: 50, }, } { ctx, cancel := context.WithCancel(context.Background()) pool := &connectionPool{ ctx: ctx, cancel: cancel, open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { return &testAppendRowsClient{}, nil }, } if err := pool.activateRouter(bm.router); err != nil { b.Errorf("%q: activateRouter: %v", bm.desc, err) } // setup both explicit and default stream writers. var explicitWriters []*ManagedStream var defaultWriters []*ManagedStream for i := 0; i < bm.numWriters; i++ { wCtx, wCancel := context.WithCancel(ctx) writer := &ManagedStream{ id: newUUID(writerIDPrefix), streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/abc123"}, ctx: wCtx, cancel: wCancel, retry: newStatelessRetryer(), } explicitWriters = append(explicitWriters, writer) } for i := 0; i < bm.numDefaultWriters; i++ { wCtx, wCancel := context.WithCancel(ctx) writer := &ManagedStream{ id: newUUID(writerIDPrefix), streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"}, ctx: wCtx, cancel: wCancel, retry: newStatelessRetryer(), } defaultWriters = append(defaultWriters, writer) } // attach all writers to router. for k, writer := range explicitWriters { if err := pool.addWriter(writer); err != nil { b.Errorf("addWriter %d: %v", k, err) } } for k, writer := range defaultWriters { if err := pool.addWriter(writer); err != nil { b.Errorf("addWriter %d: %v", k, err) } } baseBenchName := fmt.Sprintf("%s_%dexwriters_%dmpwriters", bm.desc, bm.numWriters, bm.numDefaultWriters) // Benchmark routing for explicit writers. if bm.numWriters > 0 { benchName := fmt.Sprintf("%s_explicitwriters", baseBenchName) b.Run(benchName, func(b *testing.B) { r := rand.New(rand.NewSource(1)) b.ResetTimer() for i := 0; i < b.N; i++ { // pick a random explicit writer each time. writer := explicitWriters[r.Intn(bm.numWriters)] pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "") if _, err := bm.router.pickConnection(pw); err != nil { b.Errorf("pickConnection: %v", err) } } }) } // Benchmark concurrent routing for explicit writers. if bm.numWriters > 0 { benchName := fmt.Sprintf("%s_explicitwriters_concurrent", baseBenchName) b.Run(benchName, func(b *testing.B) { b.RunParallel(func(pb *testing.PB) { r := rand.New(rand.NewSource(1)) for pb.Next() { writer := explicitWriters[r.Intn(bm.numWriters)] pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "") if _, err := bm.router.pickConnection(pw); err != nil { b.Errorf("pickConnection: %v", err) } } }) }) } // Benchmark routing for default writers. if bm.numDefaultWriters > 0 { benchName := fmt.Sprintf("%s_defaultwriters", baseBenchName) b.Run(benchName, func(b *testing.B) { r := rand.New(rand.NewSource(1)) b.ResetTimer() for i := 0; i < b.N; i++ { // pick a random default writer each time. writer := defaultWriters[r.Intn(bm.numDefaultWriters)] pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "") if _, err := bm.router.pickConnection(pw); err != nil { b.Errorf("pickConnection: %v", err) } } }) } // Benchmark concurrent routing for default writers. if bm.numDefaultWriters > 0 { benchName := fmt.Sprintf("%s_defaultwriters_concurrent", baseBenchName) b.Run(benchName, func(b *testing.B) { b.RunParallel(func(pb *testing.PB) { r := rand.New(rand.NewSource(1)) for pb.Next() { writer := defaultWriters[r.Intn(bm.numDefaultWriters)] pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "") if _, err := bm.router.pickConnection(pw); err != nil { b.Errorf("pickConnection: %v", err) } } }) }) } for _, writer := range explicitWriters { writer.Close() } for _, writer := range defaultWriters { writer.Close() } pool.Close() } } func BenchmarkWatchdogPulse(b *testing.B) { maxFlowInserts := 100 maxFlowBytes := 1024 for _, numWriters := range []int{1, 2, 5, 10, 50, 100, 250} { for _, numConnections := range []int{1, 2, 4} { ctx, cancel := context.WithCancel(context.Background()) // we build the router manually so we can control the watchdog for this benchmark. router := newSharedRouter(false, numConnections) pool := &connectionPool{ ctx: ctx, cancel: cancel, open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { return &testAppendRowsClient{}, nil }, baseFlowController: newFlowController(maxFlowInserts, maxFlowBytes), } if err := pool.activateRouter(router); err != nil { b.Fatalf("(@%d-@%d): activateRouter: %v", numWriters, numConnections, err) } // now, set router as multiplex. We do this to avoid router activation starting the watchdog // in a seperate goroutine. router.multiplex = true var writers []*ManagedStream for i := 0; i < numWriters; i++ { wCtx, wCancel := context.WithCancel(ctx) writer := &ManagedStream{ id: newUUID(writerIDPrefix), streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"}, ctx: wCtx, cancel: wCancel, retry: newStatelessRetryer(), } writers = append(writers, writer) if err := pool.addWriter(writer); err != nil { b.Fatalf("addWriter %d (@%d-@%d): %v", i, numWriters, numConnections, err) } } // Generate fake load for all connections. r := rand.New(rand.NewSource(time.Now().UnixNano())) countLoad := make([]int, numConnections) byteLoad := make([]int, numConnections) for i := 0; i < numConnections; i++ { countLoad[i] = r.Intn(maxFlowInserts) byteLoad[i] = r.Intn(maxFlowBytes) } benchName := fmt.Sprintf("%dwriters_%dconns", numWriters, numConnections) b.Run(benchName, func(b *testing.B) { if b.N > 9999 { b.Skip("benchmark unstable, only run with -benchtime=NNNNx") } for i := 0; i < b.N; i++ { b.StopTimer() // Each iteration, we reset the loads to the predetermined values, and repoint // all writers to the first connection. for c := 0; c < len(router.multiConns); c++ { router.multiConns[c].fc.countTracked = int64(countLoad[c]) router.multiConns[c].fc.bytesTracked = int64(byteLoad[c]) } for k := range router.multiMap { router.multiMap[k] = router.multiConns[0] } router.invertedMultiMap = make(map[string][]*ManagedStream) writerSlice := make([]*ManagedStream, len(writers)) copy(writerSlice, writers) router.invertedMultiMap[router.multiConns[0].id] = writerSlice b.StartTimer() router.watchdogPulse() } }) for _, writer := range writers { writer.Close() } pool.Close() } } }