...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/load_balancer_prose_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package integration
     8  
     9  import (
    10  	"context"
    11  	"fmt"
    12  	"strings"
    13  	"testing"
    14  	"time"
    15  
    16  	"go.mongodb.org/mongo-driver/bson"
    17  	"go.mongodb.org/mongo-driver/internal/assert"
    18  	"go.mongodb.org/mongo-driver/internal/require"
    19  	"go.mongodb.org/mongo-driver/mongo"
    20  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    21  	"go.mongodb.org/mongo-driver/mongo/options"
    22  )
    23  
    24  func TestLoadBalancerSupport(t *testing.T) {
    25  	mt := mtest.New(t, mtest.NewOptions().Topologies(mtest.LoadBalanced).CreateClient(false))
    26  
    27  	mt.Run("RunCommandCursor pins to a connection", func(mt *mtest.T) {
    28  		// The LB spec tests cover the behavior for cursors created by CRUD operations, but RunCommandCursor is
    29  		// Go-specific so there is no spec test coverage for it.
    30  
    31  		initCollection(mt, mt.Coll)
    32  		findCmd := bson.D{
    33  			{"find", mt.Coll.Name()},
    34  			{"filter", bson.D{}},
    35  			{"batchSize", 2},
    36  		}
    37  		cursor, err := mt.DB.RunCommandCursor(context.Background(), findCmd)
    38  		assert.Nil(mt, err, "RunCommandCursor error: %v", err)
    39  		defer func() {
    40  			_ = cursor.Close(context.Background())
    41  		}()
    42  
    43  		assert.True(mt, cursor.ID() > 0, "expected cursor ID to be non-zero")
    44  		assert.Equal(mt, 1, mt.NumberConnectionsCheckedOut(),
    45  			"expected one connection to be checked out, got %d", mt.NumberConnectionsCheckedOut())
    46  	})
    47  
    48  	mt.RunOpts("wait queue timeout errors include extra information", noClientOpts, func(mt *mtest.T) {
    49  		// There are spec tests to assert this behavior, but they rely on the waitQueueTimeoutMS Client option, which is
    50  		// not supported in Go, so we have to skip them. These prose tests make the same assertions, but use context
    51  		// deadlines to force wait queue timeout errors.
    52  
    53  		assertErrorHasInfo := func(mt *mtest.T, err error, numCursorConns, numTxnConns, numOtherConns int) {
    54  			mt.Helper()
    55  
    56  			assert.NotNil(mt, err, "expected wait queue timeout error, got nil")
    57  			expectedMsg := fmt.Sprintf("maxPoolSize: 1, "+
    58  				"connections in use by cursors: %d, "+
    59  				"connections in use by transactions: %d, "+
    60  				"connections in use by other operations: %d",
    61  				numCursorConns, numTxnConns, numOtherConns,
    62  			)
    63  			assert.True(mt, strings.Contains(err.Error(), expectedMsg),
    64  				"expected error %q to contain substring %q", err, expectedMsg)
    65  		}
    66  		maxPoolSizeMtOpts := mtest.NewOptions().
    67  			ClientOptions(options.Client().SetMaxPoolSize(1))
    68  
    69  		mt.RunOpts("cursors", maxPoolSizeMtOpts, func(mt *mtest.T) {
    70  			initCollection(mt, mt.Coll)
    71  			findOpts := options.Find().SetBatchSize(2)
    72  			cursor, err := mt.Coll.Find(context.Background(), bson.M{}, findOpts)
    73  			assert.Nil(mt, err, "Find error: %v", err)
    74  			defer func() {
    75  				_ = cursor.Close(context.Background())
    76  			}()
    77  
    78  			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
    79  			defer cancel()
    80  			_, err = mt.Coll.InsertOne(ctx, bson.M{"x": 1})
    81  			assertErrorHasInfo(mt, err, 1, 0, 0)
    82  		})
    83  		mt.RunOpts("transactions", maxPoolSizeMtOpts, func(mt *mtest.T) {
    84  			{
    85  				sess, err := mt.Client.StartSession()
    86  				assert.Nil(mt, err, "StartSession error: %v", err)
    87  				defer sess.EndSession(context.Background())
    88  				ctx := mongo.NewSessionContext(context.Background(), sess)
    89  
    90  				// Start a transaction and perform one transactional operation to pin a connection.
    91  				err = sess.StartTransaction()
    92  				assert.Nil(mt, err, "StartTransaction error: %v", err)
    93  				_, err = mt.Coll.InsertOne(ctx, bson.M{"x": 1})
    94  				assert.Nil(mt, err, "InsertOne error: %v", err)
    95  			}
    96  
    97  			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
    98  			defer cancel()
    99  			_, err := mt.Coll.InsertOne(ctx, bson.M{"x": 1})
   100  			assertErrorHasInfo(mt, err, 0, 1, 0)
   101  		})
   102  
   103  		// GODRIVER-2867: Test that connections are unpinned from transactions
   104  		// when the transaction session is ended. Create a Client with
   105  		// maxPoolSize=1 and expect that it can start and commit 5 transactions
   106  		// with that 1 connection.
   107  		mt.RunOpts("transaction connections are unpinned", maxPoolSizeMtOpts, func(mt *mtest.T) {
   108  			{
   109  				ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
   110  				defer cancel()
   111  
   112  				for i := 0; i < 5; i++ {
   113  					sess, err := mt.Client.StartSession()
   114  					require.NoError(mt, err, "StartSession error")
   115  
   116  					err = sess.StartTransaction()
   117  					require.NoError(mt, err, "StartTransaction error")
   118  
   119  					ctx := mongo.NewSessionContext(ctx, sess)
   120  					_, err = mt.Coll.InsertOne(ctx, bson.M{"x": 1})
   121  					assert.NoError(mt, err, "InsertOne error")
   122  
   123  					err = sess.CommitTransaction(ctx)
   124  					assert.NoError(mt, err, "CommitTransaction error")
   125  
   126  					sess.EndSession(ctx)
   127  				}
   128  			}
   129  		})
   130  	})
   131  }
   132  

View as plain text