...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/causal_consistency_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package integration
     8  
     9  import (
    10  	"context"
    11  	"testing"
    12  
    13  	"go.mongodb.org/mongo-driver/bson"
    14  	"go.mongodb.org/mongo-driver/bson/primitive"
    15  	"go.mongodb.org/mongo-driver/internal/assert"
    16  	"go.mongodb.org/mongo-driver/mongo"
    17  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    18  	"go.mongodb.org/mongo-driver/mongo/options"
    19  	"go.mongodb.org/mongo-driver/mongo/readconcern"
    20  )
    21  
    22  // set of operations that support read concerns taken from read/write concern spec.
    23  // the spec also lists "count" but that has been deprecated and removed from the driver.
    24  var readConcernOperations = map[string]struct{}{
    25  	"Aggregate": {},
    26  	"Distinct":  {},
    27  	"Find":      {},
    28  }
    29  
    30  func TestCausalConsistency_Supported(t *testing.T) {
    31  	mt := mtest.New(t, mtest.NewOptions().MinServerVersion("3.6").Topologies(mtest.ReplicaSet, mtest.Sharded).CreateClient(false))
    32  
    33  	mt.Run("operation time nil", func(mt *mtest.T) {
    34  		// when a ClientSession is first created, the operation time is nil
    35  
    36  		sess, err := mt.Client.StartSession()
    37  		assert.Nil(mt, err, "StartSession error: %v", err)
    38  		defer sess.EndSession(context.Background())
    39  		assert.Nil(mt, sess.OperationTime(), "expected nil operation time, got %v", sess.OperationTime())
    40  	})
    41  	mt.Run("no cluster time on first command", func(mt *mtest.T) {
    42  		// first read in a causally consistent session must not send afterClusterTime to the server
    43  
    44  		ccOpts := options.Session().SetCausalConsistency(true)
    45  		_ = mt.Client.UseSessionWithOptions(context.Background(), ccOpts, func(sc mongo.SessionContext) error {
    46  			_, _ = mt.Coll.Find(sc, bson.D{})
    47  			return nil
    48  		})
    49  
    50  		evt := mt.GetStartedEvent()
    51  		assert.Equal(mt, "find", evt.CommandName, "expected 'find' event, got '%v'", evt.CommandName)
    52  		checkOperationTime(mt, evt.Command, false)
    53  	})
    54  	mt.Run("operation time updated", func(mt *mtest.T) {
    55  		// first read or write on a ClientSession should update the operationTime of the session, even if there is an error
    56  
    57  		sess, err := mt.Client.StartSession()
    58  		assert.Nil(mt, err, "StartSession error: %v", err)
    59  		defer sess.EndSession(context.Background())
    60  
    61  		_ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
    62  			_, _ = mt.Coll.Find(sc, bson.D{})
    63  			return nil
    64  		})
    65  
    66  		evt := mt.GetSucceededEvent()
    67  		assert.Equal(mt, "find", evt.CommandName, "expected 'find' event, got '%v'", evt.CommandName)
    68  		serverT, serverI := evt.Reply.Lookup("operationTime").Timestamp()
    69  		serverTs := &primitive.Timestamp{serverT, serverI}
    70  		sessionTs := sess.OperationTime()
    71  		assert.NotNil(mt, sessionTs, "expected session operation time, got nil")
    72  		assert.True(mt, serverTs.Equal(*sessionTs), "expected operation time %v, got %v", serverTs, sessionTs)
    73  	})
    74  	mt.RunOpts("operation time sent", noClientOpts, func(mt *mtest.T) {
    75  		// findOne followed by another read operation should include operationTime returned by server for the first
    76  		// operation as the afterClusterTime field of the second operation
    77  
    78  		for _, sf := range createFunctionsSlice() {
    79  			// skip write operations
    80  			if _, ok := readConcernOperations[sf.fnName]; !ok {
    81  				continue
    82  			}
    83  
    84  			mt.Run(sf.name, func(mt *mtest.T) {
    85  				sess, err := mt.Client.StartSession()
    86  				assert.Nil(mt, err, "StartSession error: %v", err)
    87  				defer sess.EndSession(context.Background())
    88  
    89  				_ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
    90  					_ = mt.Coll.FindOne(sc, bson.D{})
    91  					return nil
    92  				})
    93  				currOptime := sess.OperationTime()
    94  				assert.NotNil(mt, currOptime, "expected session operation time, got nil")
    95  
    96  				mt.ClearEvents()
    97  				_ = sf.execute(mt, sess)
    98  				_, sentOptime := getReadConcernFields(mt, mt.GetStartedEvent().Command)
    99  				assert.NotNil(mt, sentOptime, "expected operation time on command, got nil")
   100  				assert.True(mt, currOptime.Equal(*sentOptime), "expected operation time %v, got %v", currOptime, sentOptime)
   101  			})
   102  		}
   103  	})
   104  	mt.RunOpts("write then read", noClientOpts, func(mt *mtest.T) {
   105  		// any write operation followed by a findOne should include operationTime of the first operation as afterClusterTime
   106  		// in the second operation
   107  
   108  		for _, sf := range createFunctionsSlice() {
   109  			// skip read operations
   110  			if _, ok := readConcernOperations[sf.fnName]; ok {
   111  				continue
   112  			}
   113  
   114  			mt.Run(sf.name, func(mt *mtest.T) {
   115  				sess, err := mt.Client.StartSession()
   116  				assert.Nil(mt, err, "StartSession error: %v", err)
   117  				defer sess.EndSession(context.Background())
   118  
   119  				_ = sf.execute(mt, sess)
   120  				currOptime := sess.OperationTime()
   121  				assert.NotNil(mt, currOptime, "expected session operation time, got nil")
   122  
   123  				mt.ClearEvents()
   124  				_ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   125  					_ = mt.Coll.FindOne(sc, bson.D{})
   126  					return nil
   127  				})
   128  				_, sentOptime := getReadConcernFields(mt, mt.GetStartedEvent().Command)
   129  				assert.NotNil(mt, sentOptime, "expected operation time on command, got nil")
   130  				assert.True(mt, currOptime.Equal(*sentOptime), "expected operation time %v, got %v", currOptime, sentOptime)
   131  			})
   132  		}
   133  	})
   134  	mt.Run("non-consistent read", func(mt *mtest.T) {
   135  		// a read operation in a non causally-consistent session should not include afterClusterTime
   136  
   137  		sessOpts := options.Session().SetCausalConsistency(false)
   138  		_ = mt.Client.UseSessionWithOptions(context.Background(), sessOpts, func(sc mongo.SessionContext) error {
   139  			_, _ = mt.Coll.Find(sc, bson.D{})
   140  			mt.ClearEvents()
   141  			_, _ = mt.Coll.Find(sc, bson.D{})
   142  			return nil
   143  		})
   144  		evt := mt.GetStartedEvent()
   145  		assert.Equal(mt, "find", evt.CommandName, "expected 'find' command, got '%v'", evt.CommandName)
   146  		checkOperationTime(mt, evt.Command, false)
   147  	})
   148  	mt.Run("default read concern", func(mt *mtest.T) {
   149  		// when using the default server read concern, the readConcern parameter in the command sent to the server should
   150  		// not include a level field
   151  
   152  		sess, err := mt.Client.StartSession()
   153  		assert.Nil(mt, err, "StartSession error: %v", err)
   154  		defer sess.EndSession(context.Background())
   155  
   156  		_ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   157  			_ = mt.Coll.FindOne(sc, bson.D{})
   158  			return nil
   159  		})
   160  		currOptime := sess.OperationTime()
   161  		mt.ClearEvents()
   162  		_ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   163  			_ = mt.Coll.FindOne(sc, bson.D{})
   164  			return nil
   165  		})
   166  
   167  		level, sentOptime := getReadConcernFields(mt, mt.GetStartedEvent().Command)
   168  		assert.Equal(mt, "", level, "expected command to not have read concern level, got %s", level)
   169  		assert.NotNil(mt, sentOptime, "expected operation time on command, got nil")
   170  		assert.True(mt, currOptime.Equal(*sentOptime), "expected operation time %v, got %v", currOptime, sentOptime)
   171  	})
   172  	localRcOpts := options.Client().SetReadConcern(readconcern.Local())
   173  	mt.RunOpts("custom read concern", mtest.NewOptions().ClientOptions(localRcOpts), func(mt *mtest.T) {
   174  		sess, err := mt.Client.StartSession()
   175  		assert.Nil(mt, err, "StartSession error: %v", err)
   176  		defer sess.EndSession(context.Background())
   177  
   178  		_ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   179  			_ = mt.Coll.FindOne(sc, bson.D{})
   180  			return nil
   181  		})
   182  		currOptime := sess.OperationTime()
   183  		mt.ClearEvents()
   184  		_ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   185  			_ = mt.Coll.FindOne(sc, bson.D{})
   186  			return nil
   187  		})
   188  
   189  		level, sentOptime := getReadConcernFields(mt, mt.GetStartedEvent().Command)
   190  		assert.Equal(mt, "local", level, "expected read concern level 'local', got %s", level)
   191  		assert.NotNil(mt, sentOptime, "expected operation time on command, got nil")
   192  		assert.True(mt, currOptime.Equal(*sentOptime), "expected operation time %v, got %v", currOptime, sentOptime)
   193  	})
   194  	mt.Run("clusterTime included", func(mt *mtest.T) {
   195  		// $clusterTime should be included in commands if the deployment supports cluster times
   196  
   197  		_ = mt.Coll.FindOne(context.Background(), bson.D{})
   198  		evt := mt.GetStartedEvent()
   199  		assert.Equal(mt, "find", evt.CommandName, "expected command 'find', got '%v'", evt.CommandName)
   200  		_, err := evt.Command.LookupErr("$clusterTime")
   201  		assert.Nil(mt, err, "expected $clusterTime in command, got nil")
   202  	})
   203  }
   204  
   205  func TestCausalConsistency_NotSupported(t *testing.T) {
   206  	// use RunOnBlock instead of mtest.NewOptions().MaxServerVersion("3.4").Topologies(mtest.Single) because
   207  	// these tests should be run on servers <= 3.4 OR standalones
   208  	rob := []mtest.RunOnBlock{
   209  		{MaxServerVersion: "3.4"},
   210  		{Topology: []mtest.TopologyKind{mtest.Single}},
   211  	}
   212  	mt := mtest.New(t, mtest.NewOptions().RunOn(rob...).CreateClient(false))
   213  
   214  	mt.Run("afterClusterTime not included", func(mt *mtest.T) {
   215  		// a read in a causally consistent session does not include afterClusterTime in a deployment that does not
   216  		// support cluster times
   217  
   218  		sessOpts := options.Session().SetCausalConsistency(true)
   219  		_ = mt.Client.UseSessionWithOptions(context.Background(), sessOpts, func(sc mongo.SessionContext) error {
   220  			_, _ = mt.Coll.Find(sc, bson.D{})
   221  			return nil
   222  		})
   223  
   224  		evt := mt.GetStartedEvent()
   225  		assert.Equal(mt, "find", evt.CommandName, "expected command 'find', got '%v'", evt.CommandName)
   226  		checkOperationTime(mt, evt.Command, false)
   227  	})
   228  	mt.Run("clusterTime not included", func(mt *mtest.T) {
   229  		// $clusterTime should not be included in commands if the deployment does not support cluster times
   230  
   231  		_ = mt.Coll.FindOne(context.Background(), bson.D{})
   232  		evt := mt.GetStartedEvent()
   233  		assert.Equal(mt, "find", evt.CommandName, "expected command 'find', got '%v'", evt.CommandName)
   234  		_, err := evt.Command.LookupErr("$clusterTime")
   235  		assert.NotNil(mt, err, "expected $clusterTime to not be sent, but was")
   236  	})
   237  }
   238  
   239  func checkOperationTime(mt *mtest.T, cmd bson.Raw, shouldInclude bool) {
   240  	mt.Helper()
   241  
   242  	_, optime := getReadConcernFields(mt, cmd)
   243  	if shouldInclude {
   244  		assert.NotNil(mt, optime, "expected operation time, got nil")
   245  		return
   246  	}
   247  	assert.Nil(mt, optime, "did not expect operation time, got %v", optime)
   248  }
   249  
   250  func getReadConcernFields(mt *mtest.T, cmd bson.Raw) (string, *primitive.Timestamp) {
   251  	mt.Helper()
   252  
   253  	rc, err := cmd.LookupErr("readConcern")
   254  	if err != nil {
   255  		return "", nil
   256  	}
   257  	rcDoc := rc.Document()
   258  
   259  	var level string
   260  	var clusterTime *primitive.Timestamp
   261  
   262  	if levelVal, err := rcDoc.LookupErr("level"); err == nil {
   263  		level = levelVal.StringValue()
   264  	}
   265  	if ctVal, err := rcDoc.LookupErr("afterClusterTime"); err == nil {
   266  		t, i := ctVal.Timestamp()
   267  		clusterTime = &primitive.Timestamp{t, i}
   268  	}
   269  	return level, clusterTime
   270  }
   271  

View as plain text