...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/mongos_pinning_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package integration
     8  
     9  import (
    10  	"context"
    11  	"fmt"
    12  	"testing"
    13  	"time"
    14  
    15  	"go.mongodb.org/mongo-driver/bson"
    16  	"go.mongodb.org/mongo-driver/internal/assert"
    17  	"go.mongodb.org/mongo-driver/mongo"
    18  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    19  	"go.mongodb.org/mongo-driver/mongo/options"
    20  )
    21  
    22  func TestMongosPinning(t *testing.T) {
    23  	clientOpts := options.Client().SetLocalThreshold(1 * time.Second).SetWriteConcern(mtest.MajorityWc)
    24  	mtOpts := mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.1").CreateClient(false).
    25  		ClientOptions(clientOpts)
    26  	mt := mtest.New(t, mtOpts)
    27  
    28  	if len(options.Client().ApplyURI(mtest.ClusterURI()).Hosts) < 2 {
    29  		mt.Skip("skipping because at least 2 mongoses are required")
    30  	}
    31  
    32  	mt.Run("unpin for next transaction", func(mt *mtest.T) {
    33  		addresses := map[string]struct{}{}
    34  		_ = mt.Client.UseSession(context.Background(), func(sc mongo.SessionContext) error {
    35  			// Insert a document in a transaction to pin session to a mongos
    36  			err := sc.StartTransaction()
    37  			assert.Nil(mt, err, "StartTransaction error: %v", err)
    38  			_, err = mt.Coll.InsertOne(sc, bson.D{{"x", 1}})
    39  			assert.Nil(mt, err, "InsertOne error: %v", err)
    40  			err = sc.CommitTransaction(sc)
    41  			assert.Nil(mt, err, "CommitTransaction error: %v", err)
    42  
    43  			for i := 0; i < 50; i++ {
    44  				// Call Find in a new transaction to unpin from the old mongos and select a new one
    45  				err = sc.StartTransaction()
    46  				assert.Nil(mt, err, iterationErrmsg("StartTransaction", i, err))
    47  
    48  				cursor, err := mt.Coll.Find(sc, bson.D{})
    49  				assert.Nil(mt, err, iterationErrmsg("Find", i, err))
    50  				assert.True(mt, cursor.Next(context.Background()), "Next returned false on iteration %v", i)
    51  
    52  				descConn, err := mongo.BatchCursorFromCursor(cursor).Server().Connection(context.Background())
    53  				assert.Nil(mt, err, iterationErrmsg("Connection", i, err))
    54  				addresses[descConn.Description().Addr.String()] = struct{}{}
    55  				err = descConn.Close()
    56  				assert.Nil(mt, err, iterationErrmsg("connection Close", i, err))
    57  
    58  				err = sc.CommitTransaction(sc)
    59  				assert.Nil(mt, err, iterationErrmsg("CommitTransaction", i, err))
    60  			}
    61  			return nil
    62  		})
    63  		assert.True(mt, len(addresses) > 1, "expected more than 1 address, got %v", addresses)
    64  	})
    65  	mt.Run("unpin for non transaction operation", func(mt *mtest.T) {
    66  		addresses := map[string]struct{}{}
    67  		_ = mt.Client.UseSession(context.Background(), func(sc mongo.SessionContext) error {
    68  			// Insert a document in a transaction to pin session to a mongos
    69  			err := sc.StartTransaction()
    70  			assert.Nil(mt, err, "StartTransaction error: %v", err)
    71  			_, err = mt.Coll.InsertOne(sc, bson.D{{"x", 1}})
    72  			assert.Nil(mt, err, "InsertOne error: %v", err)
    73  			err = sc.CommitTransaction(sc)
    74  			assert.Nil(mt, err, "CommitTransaction error: %v", err)
    75  
    76  			for i := 0; i < 50; i++ {
    77  				// Call Find with the session but outside of a transaction
    78  				cursor, err := mt.Coll.Find(sc, bson.D{})
    79  				assert.Nil(mt, err, iterationErrmsg("Find", i, err))
    80  				assert.True(mt, cursor.Next(context.Background()), "Next returned false on iteration %v", i)
    81  
    82  				descConn, err := mongo.BatchCursorFromCursor(cursor).Server().Connection(context.Background())
    83  				assert.Nil(mt, err, iterationErrmsg("Connection", i, err))
    84  				addresses[descConn.Description().Addr.String()] = struct{}{}
    85  				err = descConn.Close()
    86  				assert.Nil(mt, err, iterationErrmsg("connection Close", i, err))
    87  			}
    88  			return nil
    89  		})
    90  		assert.True(mt, len(addresses) > 1, "expected more than 1 address, got %v", addresses)
    91  	})
    92  }
    93  
    94  func iterationErrmsg(op string, i int, wrapped error) string {
    95  	return fmt.Sprintf("%v error on iteration %v: %v", op, i, wrapped)
    96  }
    97  

View as plain text