1
2
3
4
5
6
7 package integration
8
9 import (
10 "context"
11 "testing"
12
13 "go.mongodb.org/mongo-driver/bson"
14 "go.mongodb.org/mongo-driver/bson/primitive"
15 "go.mongodb.org/mongo-driver/internal/assert"
16 "go.mongodb.org/mongo-driver/mongo"
17 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
18 "go.mongodb.org/mongo-driver/mongo/options"
19 "go.mongodb.org/mongo-driver/mongo/readconcern"
20 )
21
22
23
24 var readConcernOperations = map[string]struct{}{
25 "Aggregate": {},
26 "Distinct": {},
27 "Find": {},
28 }
29
30 func TestCausalConsistency_Supported(t *testing.T) {
31 mt := mtest.New(t, mtest.NewOptions().MinServerVersion("3.6").Topologies(mtest.ReplicaSet, mtest.Sharded).CreateClient(false))
32
33 mt.Run("operation time nil", func(mt *mtest.T) {
34
35
36 sess, err := mt.Client.StartSession()
37 assert.Nil(mt, err, "StartSession error: %v", err)
38 defer sess.EndSession(context.Background())
39 assert.Nil(mt, sess.OperationTime(), "expected nil operation time, got %v", sess.OperationTime())
40 })
41 mt.Run("no cluster time on first command", func(mt *mtest.T) {
42
43
44 ccOpts := options.Session().SetCausalConsistency(true)
45 _ = mt.Client.UseSessionWithOptions(context.Background(), ccOpts, func(sc mongo.SessionContext) error {
46 _, _ = mt.Coll.Find(sc, bson.D{})
47 return nil
48 })
49
50 evt := mt.GetStartedEvent()
51 assert.Equal(mt, "find", evt.CommandName, "expected 'find' event, got '%v'", evt.CommandName)
52 checkOperationTime(mt, evt.Command, false)
53 })
54 mt.Run("operation time updated", func(mt *mtest.T) {
55
56
57 sess, err := mt.Client.StartSession()
58 assert.Nil(mt, err, "StartSession error: %v", err)
59 defer sess.EndSession(context.Background())
60
61 _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
62 _, _ = mt.Coll.Find(sc, bson.D{})
63 return nil
64 })
65
66 evt := mt.GetSucceededEvent()
67 assert.Equal(mt, "find", evt.CommandName, "expected 'find' event, got '%v'", evt.CommandName)
68 serverT, serverI := evt.Reply.Lookup("operationTime").Timestamp()
69 serverTs := &primitive.Timestamp{serverT, serverI}
70 sessionTs := sess.OperationTime()
71 assert.NotNil(mt, sessionTs, "expected session operation time, got nil")
72 assert.True(mt, serverTs.Equal(*sessionTs), "expected operation time %v, got %v", serverTs, sessionTs)
73 })
74 mt.RunOpts("operation time sent", noClientOpts, func(mt *mtest.T) {
75
76
77
78 for _, sf := range createFunctionsSlice() {
79
80 if _, ok := readConcernOperations[sf.fnName]; !ok {
81 continue
82 }
83
84 mt.Run(sf.name, func(mt *mtest.T) {
85 sess, err := mt.Client.StartSession()
86 assert.Nil(mt, err, "StartSession error: %v", err)
87 defer sess.EndSession(context.Background())
88
89 _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
90 _ = mt.Coll.FindOne(sc, bson.D{})
91 return nil
92 })
93 currOptime := sess.OperationTime()
94 assert.NotNil(mt, currOptime, "expected session operation time, got nil")
95
96 mt.ClearEvents()
97 _ = sf.execute(mt, sess)
98 _, sentOptime := getReadConcernFields(mt, mt.GetStartedEvent().Command)
99 assert.NotNil(mt, sentOptime, "expected operation time on command, got nil")
100 assert.True(mt, currOptime.Equal(*sentOptime), "expected operation time %v, got %v", currOptime, sentOptime)
101 })
102 }
103 })
104 mt.RunOpts("write then read", noClientOpts, func(mt *mtest.T) {
105
106
107
108 for _, sf := range createFunctionsSlice() {
109
110 if _, ok := readConcernOperations[sf.fnName]; ok {
111 continue
112 }
113
114 mt.Run(sf.name, func(mt *mtest.T) {
115 sess, err := mt.Client.StartSession()
116 assert.Nil(mt, err, "StartSession error: %v", err)
117 defer sess.EndSession(context.Background())
118
119 _ = sf.execute(mt, sess)
120 currOptime := sess.OperationTime()
121 assert.NotNil(mt, currOptime, "expected session operation time, got nil")
122
123 mt.ClearEvents()
124 _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
125 _ = mt.Coll.FindOne(sc, bson.D{})
126 return nil
127 })
128 _, sentOptime := getReadConcernFields(mt, mt.GetStartedEvent().Command)
129 assert.NotNil(mt, sentOptime, "expected operation time on command, got nil")
130 assert.True(mt, currOptime.Equal(*sentOptime), "expected operation time %v, got %v", currOptime, sentOptime)
131 })
132 }
133 })
134 mt.Run("non-consistent read", func(mt *mtest.T) {
135
136
137 sessOpts := options.Session().SetCausalConsistency(false)
138 _ = mt.Client.UseSessionWithOptions(context.Background(), sessOpts, func(sc mongo.SessionContext) error {
139 _, _ = mt.Coll.Find(sc, bson.D{})
140 mt.ClearEvents()
141 _, _ = mt.Coll.Find(sc, bson.D{})
142 return nil
143 })
144 evt := mt.GetStartedEvent()
145 assert.Equal(mt, "find", evt.CommandName, "expected 'find' command, got '%v'", evt.CommandName)
146 checkOperationTime(mt, evt.Command, false)
147 })
148 mt.Run("default read concern", func(mt *mtest.T) {
149
150
151
152 sess, err := mt.Client.StartSession()
153 assert.Nil(mt, err, "StartSession error: %v", err)
154 defer sess.EndSession(context.Background())
155
156 _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
157 _ = mt.Coll.FindOne(sc, bson.D{})
158 return nil
159 })
160 currOptime := sess.OperationTime()
161 mt.ClearEvents()
162 _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
163 _ = mt.Coll.FindOne(sc, bson.D{})
164 return nil
165 })
166
167 level, sentOptime := getReadConcernFields(mt, mt.GetStartedEvent().Command)
168 assert.Equal(mt, "", level, "expected command to not have read concern level, got %s", level)
169 assert.NotNil(mt, sentOptime, "expected operation time on command, got nil")
170 assert.True(mt, currOptime.Equal(*sentOptime), "expected operation time %v, got %v", currOptime, sentOptime)
171 })
172 localRcOpts := options.Client().SetReadConcern(readconcern.Local())
173 mt.RunOpts("custom read concern", mtest.NewOptions().ClientOptions(localRcOpts), func(mt *mtest.T) {
174 sess, err := mt.Client.StartSession()
175 assert.Nil(mt, err, "StartSession error: %v", err)
176 defer sess.EndSession(context.Background())
177
178 _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
179 _ = mt.Coll.FindOne(sc, bson.D{})
180 return nil
181 })
182 currOptime := sess.OperationTime()
183 mt.ClearEvents()
184 _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
185 _ = mt.Coll.FindOne(sc, bson.D{})
186 return nil
187 })
188
189 level, sentOptime := getReadConcernFields(mt, mt.GetStartedEvent().Command)
190 assert.Equal(mt, "local", level, "expected read concern level 'local', got %s", level)
191 assert.NotNil(mt, sentOptime, "expected operation time on command, got nil")
192 assert.True(mt, currOptime.Equal(*sentOptime), "expected operation time %v, got %v", currOptime, sentOptime)
193 })
194 mt.Run("clusterTime included", func(mt *mtest.T) {
195
196
197 _ = mt.Coll.FindOne(context.Background(), bson.D{})
198 evt := mt.GetStartedEvent()
199 assert.Equal(mt, "find", evt.CommandName, "expected command 'find', got '%v'", evt.CommandName)
200 _, err := evt.Command.LookupErr("$clusterTime")
201 assert.Nil(mt, err, "expected $clusterTime in command, got nil")
202 })
203 }
204
205 func TestCausalConsistency_NotSupported(t *testing.T) {
206
207
208 rob := []mtest.RunOnBlock{
209 {MaxServerVersion: "3.4"},
210 {Topology: []mtest.TopologyKind{mtest.Single}},
211 }
212 mt := mtest.New(t, mtest.NewOptions().RunOn(rob...).CreateClient(false))
213
214 mt.Run("afterClusterTime not included", func(mt *mtest.T) {
215
216
217
218 sessOpts := options.Session().SetCausalConsistency(true)
219 _ = mt.Client.UseSessionWithOptions(context.Background(), sessOpts, func(sc mongo.SessionContext) error {
220 _, _ = mt.Coll.Find(sc, bson.D{})
221 return nil
222 })
223
224 evt := mt.GetStartedEvent()
225 assert.Equal(mt, "find", evt.CommandName, "expected command 'find', got '%v'", evt.CommandName)
226 checkOperationTime(mt, evt.Command, false)
227 })
228 mt.Run("clusterTime not included", func(mt *mtest.T) {
229
230
231 _ = mt.Coll.FindOne(context.Background(), bson.D{})
232 evt := mt.GetStartedEvent()
233 assert.Equal(mt, "find", evt.CommandName, "expected command 'find', got '%v'", evt.CommandName)
234 _, err := evt.Command.LookupErr("$clusterTime")
235 assert.NotNil(mt, err, "expected $clusterTime to not be sent, but was")
236 })
237 }
238
239 func checkOperationTime(mt *mtest.T, cmd bson.Raw, shouldInclude bool) {
240 mt.Helper()
241
242 _, optime := getReadConcernFields(mt, cmd)
243 if shouldInclude {
244 assert.NotNil(mt, optime, "expected operation time, got nil")
245 return
246 }
247 assert.Nil(mt, optime, "did not expect operation time, got %v", optime)
248 }
249
250 func getReadConcernFields(mt *mtest.T, cmd bson.Raw) (string, *primitive.Timestamp) {
251 mt.Helper()
252
253 rc, err := cmd.LookupErr("readConcern")
254 if err != nil {
255 return "", nil
256 }
257 rcDoc := rc.Document()
258
259 var level string
260 var clusterTime *primitive.Timestamp
261
262 if levelVal, err := rcDoc.LookupErr("level"); err == nil {
263 level = levelVal.StringValue()
264 }
265 if ctVal, err := rcDoc.LookupErr("afterClusterTime"); err == nil {
266 t, i := ctVal.Timestamp()
267 clusterTime = &primitive.Timestamp{t, i}
268 }
269 return level, clusterTime
270 }
271
View as plain text