1
2
3
4
5
6
7 package topology
8
9 import (
10 "context"
11 "encoding/json"
12 "fmt"
13 "io/ioutil"
14 "net"
15 "path"
16 "strings"
17 "sync"
18 "sync/atomic"
19 "testing"
20 "time"
21
22 "go.mongodb.org/mongo-driver/bson/primitive"
23 "go.mongodb.org/mongo-driver/event"
24 "go.mongodb.org/mongo-driver/internal/require"
25 "go.mongodb.org/mongo-driver/internal/spectest"
26 "go.mongodb.org/mongo-driver/x/mongo/driver/operation"
27 )
28
29
30
31 var skippedTestDescriptions = map[string]string{
32
33
34
35 "When a pool is closed, it MUST first destroy all available connections in that pool": "test requires that close does not aggressively close used connections",
36 "must destroy checked in connection if pool has been closed": "test requires that close does not aggressively close used connections",
37
38
39
40
41
42
43
44
45 "error during minPoolSize population clears pool": "event ordering is incompatible with load-balancer SDAM spec test (DRIVERS-1785)",
46
47
48
49
50
51
52
53
54 "threads blocked by maxConnecting check out minPoolSize connections": "test requires that connections established by minPoolSize are immediately used to satisfy check-out requests (DRIVERS-2225)",
55
56
57
58
59
60
61 "threads blocked by maxConnecting check out returned connections": "test requires a checked-in connections cannot satisfy a check-out waiting on a new connection (DRIVERS-2223)",
62 }
63
64 type cmapEvent struct {
65 EventType string `json:"type"`
66 Address interface{} `json:"address"`
67 ConnectionID uint64 `json:"connectionId"`
68 Options interface{} `json:"options"`
69 Reason string `json:"reason"`
70 }
71
72 type poolOptions struct {
73 MaxPoolSize int32 `json:"maxPoolSize"`
74 MinPoolSize int32 `json:"minPoolSize"`
75 MaxConnecting int32 `json:"maxConnecting"`
76 MaxIdleTimeMS int32 `json:"maxIdleTimeMS"`
77 WaitQueueTimeoutMS int32 `json:"waitQueueTimeoutMS"`
78 BackgroundThreadIntervalMS int32 `json:"backgroundThreadIntervalMS"`
79 }
80
81 type cmapTestFile struct {
82 Version uint64 `json:"version"`
83 Style string `json:"style"`
84 Description string `json:"description"`
85 SkipReason string `json:"skipReason"`
86 FailPoint map[string]interface{} `json:"failPoint"`
87 PoolOptions poolOptions `json:"poolOptions"`
88 Operations []map[string]interface{} `json:"operations"`
89 Error *cmapTestError `json:"error"`
90 Events []cmapEvent `json:"events"`
91 Ignore []string `json:"ignore"`
92 }
93
94 type cmapTestError struct {
95 ErrorType string `json:"type"`
96 Message string `json:"message"`
97 Address string `json:"address"`
98 }
99
100 type simThread struct {
101 JobQueue chan func()
102 JobsAssigned int32
103 JobsCompleted int32
104 }
105
106 type testInfo struct {
107 objects map[string]interface{}
108 originalEventChan chan *event.PoolEvent
109 finalEventChan chan *event.PoolEvent
110 threads map[string]*simThread
111 backgroundThreadErrors chan error
112 eventCounts map[string]uint64
113 sync.Mutex
114 }
115
116 const cmapTestDir = "../../../../testdata/connection-monitoring-and-pooling/"
117
118 func TestCMAPSpec(t *testing.T) {
119 for _, testFileName := range spectest.FindJSONFilesInDir(t, cmapTestDir) {
120 t.Run(testFileName, func(t *testing.T) {
121 runCMAPTest(t, testFileName)
122 })
123 }
124 }
125
126 func runCMAPTest(t *testing.T, testFileName string) {
127 content, err := ioutil.ReadFile(path.Join(cmapTestDir, testFileName))
128 require.NoErrorf(t, err, "unable to read content of test file")
129
130 var test cmapTestFile
131 err = json.Unmarshal(content, &test)
132 require.NoErrorf(t, err, "error unmarshalling testFile")
133
134 if test.SkipReason != "" {
135 t.Skip(test.SkipReason)
136 }
137 if msg, ok := skippedTestDescriptions[test.Description]; ok {
138 t.Skip(msg)
139 }
140
141 testInfo := &testInfo{
142 objects: make(map[string]interface{}),
143 originalEventChan: make(chan *event.PoolEvent, 200),
144 finalEventChan: make(chan *event.PoolEvent, 200),
145 threads: make(map[string]*simThread),
146 eventCounts: make(map[string]uint64),
147 backgroundThreadErrors: make(chan error, 100),
148 }
149
150 sOpts := []ServerOption{
151 WithMaxConnections(func(uint64) uint64 {
152 return uint64(test.PoolOptions.MaxPoolSize)
153 }),
154 WithMinConnections(func(uint64) uint64 {
155 return uint64(test.PoolOptions.MinPoolSize)
156 }),
157 WithMaxConnecting(func(uint64) uint64 {
158 return uint64(test.PoolOptions.MaxConnecting)
159 }),
160 WithConnectionPoolMaxIdleTime(func(time.Duration) time.Duration {
161 return time.Duration(test.PoolOptions.MaxIdleTimeMS) * time.Millisecond
162 }),
163 WithConnectionPoolMaintainInterval(func(time.Duration) time.Duration {
164 return time.Duration(test.PoolOptions.BackgroundThreadIntervalMS) * time.Millisecond
165 }),
166 WithConnectionPoolMonitor(func(*event.PoolMonitor) *event.PoolMonitor {
167 return &event.PoolMonitor{
168 Event: func(evt *event.PoolEvent) { testInfo.originalEventChan <- evt },
169 }
170 }),
171 }
172
173 var delay time.Duration
174 var closeConnection bool
175
176 if test.FailPoint != nil {
177 data, ok := test.FailPoint["data"].(map[string]interface{})
178 if !ok {
179 t.Fatalf("expected to find \"data\" map in failPoint (%v)", test.FailPoint)
180 }
181
182 blockConnection, _ := data["blockConnection"].(bool)
183 if blockTimeMS, ok := data["blockTimeMS"].(float64); ok && blockConnection {
184 delay = time.Duration(blockTimeMS) * time.Millisecond
185 }
186
187 closeConnection, _ = data["closeConnection"].(bool)
188 }
189
190
191
192
193 sOpts = append(sOpts, WithConnectionOptions(func(...ConnectionOption) []ConnectionOption {
194 return []ConnectionOption{
195 WithDialer(func(Dialer) Dialer {
196 return DialerFunc(func(_ context.Context, _, _ string) (net.Conn, error) {
197 msc := newMockSlowConn(makeHelloReply(), delay)
198 if closeConnection {
199 msc.Close()
200 }
201 return msc, nil
202 })
203 }),
204 WithHandshaker(func(h Handshaker) Handshaker {
205 return operation.NewHello()
206 }),
207 }
208 }))
209
210 s := NewServer("mongodb://fake", primitive.NewObjectID(), sOpts...)
211 s.state = serverConnected
212 require.NoError(t, err, "error connecting connection pool")
213 defer s.pool.close(context.Background())
214
215 for _, op := range test.Operations {
216 if tempErr := runOperation(t, op, testInfo, s, test.PoolOptions.WaitQueueTimeoutMS); tempErr != nil {
217 if err != nil {
218 t.Fatalf("received multiple errors in primary thread: %v and %v", err, tempErr)
219 }
220 err = tempErr
221 }
222 }
223
224
225 testInfo.Lock()
226 threadNames := make([]string, 0)
227 for threadName := range testInfo.threads {
228 threadNames = append(threadNames, threadName)
229 }
230 testInfo.Unlock()
231
232 for _, threadName := range threadNames {
233 WAIT:
234 for {
235 testInfo.Lock()
236 thread, ok := testInfo.threads[threadName]
237 if !ok {
238 t.Fatalf("thread was unexpectedly ended: %v", threadName)
239 }
240 if len(thread.JobQueue) == 0 && atomic.LoadInt32(&thread.JobsCompleted) == atomic.LoadInt32(&thread.JobsAssigned) {
241 break WAIT
242 }
243 testInfo.Unlock()
244 }
245 close(testInfo.threads[threadName].JobQueue)
246 testInfo.Unlock()
247 }
248
249 if test.Error != nil {
250 if err == nil || strings.ToLower(test.Error.Message) != err.Error() {
251 var erroredCorrectly bool
252 errs := make([]error, 0, len(testInfo.backgroundThreadErrors)+1)
253 errs = append(errs, err)
254 for len(testInfo.backgroundThreadErrors) > 0 {
255 bgErr := <-testInfo.backgroundThreadErrors
256 errs = append(errs, bgErr)
257 if bgErr != nil && strings.Contains(bgErr.Error(), strings.ToLower(test.Error.Message)) {
258 erroredCorrectly = true
259 break
260 }
261 }
262 if !erroredCorrectly {
263 t.Fatalf("error differed from expected error, expected: %v, actual errors received: %v", test.Error.Message, errs)
264 }
265 }
266 }
267
268 testInfo.Lock()
269 defer testInfo.Unlock()
270 for len(testInfo.originalEventChan) > 0 {
271 temp := <-testInfo.originalEventChan
272 testInfo.finalEventChan <- temp
273 }
274
275 checkEvents(t, test.Events, testInfo.finalEventChan, test.Ignore)
276
277 }
278
279 func checkEvents(t *testing.T, expectedEvents []cmapEvent, actualEvents chan *event.PoolEvent, ignoreEvents []string) {
280 for _, expectedEvent := range expectedEvents {
281 validEvent := nextValidEvent(t, actualEvents, ignoreEvents)
282
283 if expectedEvent.EventType != validEvent.Type {
284 var reason string
285 if validEvent.Type == "ConnectionCheckOutFailed" {
286 reason = ": " + validEvent.Reason
287 }
288 t.Errorf("unexpected event occurred: expected: %v, actual: %v%v", expectedEvent.EventType, validEvent.Type, reason)
289 }
290
291 if expectedEvent.Address != nil {
292
293 if expectedEvent.Address == float64(42) {
294 if validEvent.Address == "" {
295 t.Errorf("expected address in event, instead received none in %v", expectedEvent.EventType)
296 }
297 } else {
298 addr, ok := expectedEvent.Address.(string)
299 if !ok {
300 t.Errorf("received non string address: %v", expectedEvent.Address)
301 }
302 if addr != validEvent.Address {
303 t.Errorf("received unexpected address: %v, expected: %v", validEvent.Address, expectedEvent.Address)
304 }
305 }
306 }
307
308 if expectedEvent.ConnectionID != 0 {
309 if expectedEvent.ConnectionID == 42 {
310 if validEvent.ConnectionID == 0 {
311 t.Errorf("expected a connectionId but found none in %v", validEvent.Type)
312 }
313 } else if expectedEvent.ConnectionID != validEvent.ConnectionID {
314 t.Errorf("expected and actual connectionIds differed: expected: %v, actual: %v for event: %v", expectedEvent.ConnectionID, validEvent.ConnectionID, expectedEvent.EventType)
315 }
316 }
317
318 if expectedEvent.Reason != "" && expectedEvent.Reason != validEvent.Reason {
319 t.Errorf("event reason differed from expected: expected: %v, actual: %v for %v", expectedEvent.Reason, validEvent.Reason, expectedEvent.EventType)
320 }
321
322 if expectedEvent.Options != nil {
323 if expectedEvent.Options == float64(42) {
324 if validEvent.PoolOptions == nil {
325 t.Errorf("expected poolevent options but found none")
326 }
327 } else {
328 opts, ok := expectedEvent.Options.(map[string]interface{})
329 if !ok {
330 t.Errorf("event options were unexpected type: %T for %v", expectedEvent.Options, expectedEvent.EventType)
331 }
332
333 if maxSize, ok := opts["maxPoolSize"]; ok && validEvent.PoolOptions.MaxPoolSize != uint64(maxSize.(float64)) {
334 t.Errorf("event's max pool size differed from expected: %v, actual: %v", maxSize, validEvent.PoolOptions.MaxPoolSize)
335 }
336
337 if minSize, ok := opts["minPoolSize"]; ok && validEvent.PoolOptions.MinPoolSize != uint64(minSize.(float64)) {
338 t.Errorf("event's min pool size differed from expected: %v, actual: %v", minSize, validEvent.PoolOptions.MinPoolSize)
339 }
340
341 if waitQueueTimeoutMS, ok := opts["waitQueueTimeoutMS"]; ok && validEvent.PoolOptions.WaitQueueTimeoutMS != uint64(waitQueueTimeoutMS.(float64)) {
342 t.Errorf("event's min pool size differed from expected: %v, actual: %v", waitQueueTimeoutMS, validEvent.PoolOptions.WaitQueueTimeoutMS)
343 }
344 }
345 }
346 }
347 }
348
349 func nextValidEvent(t *testing.T, events chan *event.PoolEvent, ignoreEvents []string) *event.PoolEvent {
350 t.Helper()
351 NextEvent:
352 for {
353 if len(events) == 0 {
354 t.Fatalf("unable to get next event. too few events occurred")
355 }
356
357 event := <-events
358 for _, Type := range ignoreEvents {
359 if event.Type == Type {
360 continue NextEvent
361 }
362 }
363 return event
364 }
365 }
366
367 func runOperation(t *testing.T, operation map[string]interface{}, testInfo *testInfo, s *Server, checkOutTimeout int32) error {
368 threadName, ok := operation["thread"]
369 if ok {
370 testInfo.Lock()
371 thread, ok := testInfo.threads[threadName.(string)]
372 if !ok {
373 thread = &simThread{
374 JobQueue: make(chan func(), 200),
375 }
376 testInfo.threads[threadName.(string)] = thread
377
378 go func() {
379 for {
380 job, more := <-thread.JobQueue
381 if !more {
382 break
383 }
384 job()
385 atomic.AddInt32(&thread.JobsCompleted, 1)
386 }
387 }()
388 }
389 testInfo.Unlock()
390
391 atomic.AddInt32(&thread.JobsAssigned, 1)
392 thread.JobQueue <- func() {
393 err := runOperationInThread(t, operation, testInfo, s, checkOutTimeout)
394 testInfo.backgroundThreadErrors <- err
395 }
396
397 return nil
398 }
399 return runOperationInThread(t, operation, testInfo, s, checkOutTimeout)
400 }
401
402 func runOperationInThread(t *testing.T, operation map[string]interface{}, testInfo *testInfo, s *Server, checkOutTimeout int32) error {
403 name, ok := operation["name"]
404 if !ok {
405 t.Fatalf("unable to find name in operation")
406 }
407
408 switch name {
409 case "start":
410 return nil
411 case "wait":
412 timeMs, ok := operation["ms"]
413 if !ok {
414 t.Fatalf("unable to find ms in wait operation")
415 }
416 dur := time.Duration(int64(timeMs.(float64))) * time.Millisecond
417 time.Sleep(dur)
418 case "waitForThread":
419 threadName, ok := operation["target"]
420 if !ok {
421 t.Fatalf("unable to waitForThread without specified threadName")
422 }
423
424 testInfo.Lock()
425 thread, ok := testInfo.threads[threadName.(string)]
426 testInfo.Unlock()
427 if !ok {
428 t.Fatalf("unable to find thread to wait for: %v", threadName)
429 }
430
431 for {
432 if atomic.LoadInt32(&thread.JobsCompleted) == atomic.LoadInt32(&thread.JobsAssigned) {
433 break
434 }
435 }
436 case "waitForEvent":
437 var targetCount int
438 {
439 f, ok := operation["count"].(float64)
440 if !ok {
441 t.Fatalf("count is required to waitForEvent")
442 }
443 targetCount = int(f)
444 }
445
446 targetEventName, ok := operation["event"].(string)
447 if !ok {
448 t.Fatalf("event is require to waitForEvent")
449 }
450
451
452
453
454
455 timeout := 10 * time.Second
456 if timeoutMS, ok := operation["timeout"].(float64); ok {
457 timeout = time.Duration(timeoutMS) * time.Millisecond
458 }
459
460 originalChan := testInfo.originalEventChan
461 finalChan := testInfo.finalEventChan
462
463 for {
464 var event *event.PoolEvent
465 {
466 timer := time.NewTimer(timeout)
467 select {
468 case event = <-originalChan:
469 case <-timer.C:
470 t.Fatalf("timed out waiting for %d %q events", targetCount, targetEventName)
471 }
472 timer.Stop()
473 }
474 finalChan <- event
475
476 testInfo.Lock()
477 _, ok = testInfo.eventCounts[event.Type]
478 if !ok {
479 testInfo.eventCounts[event.Type] = 0
480 }
481 testInfo.eventCounts[event.Type]++
482 count := testInfo.eventCounts[event.Type]
483 testInfo.Unlock()
484
485 if event.Type == targetEventName && count == uint64(targetCount) {
486 break
487 }
488 }
489 case "checkOut":
490 checkoutContext := context.Background()
491 if checkOutTimeout != 0 {
492 var cancel context.CancelFunc
493 checkoutContext, cancel = context.WithTimeout(context.Background(), time.Duration(checkOutTimeout)*time.Millisecond)
494 defer cancel()
495 }
496
497 c, err := s.Connection(checkoutContext)
498 if label, ok := operation["label"]; ok {
499 testInfo.Lock()
500 testInfo.objects[label.(string)] = c
501 testInfo.Unlock()
502 }
503
504 return err
505 case "checkIn":
506 cName, ok := operation["connection"]
507 if !ok {
508 t.Fatalf("unable to find connection to checkin")
509 }
510
511 var cEmptyInterface interface{}
512 testInfo.Lock()
513 cEmptyInterface, ok = testInfo.objects[cName.(string)]
514 delete(testInfo.objects, cName.(string))
515 testInfo.Unlock()
516 if !ok {
517 t.Fatalf("was unable to find %v in objects when expected", cName)
518 }
519
520 c, ok := cEmptyInterface.(*Connection)
521 if !ok {
522 t.Fatalf("object in objects was expected to be a connection, but was instead a %T", cEmptyInterface)
523 }
524 return c.Close()
525 case "clear":
526 needInterruption, ok := operation["interruptInUseConnections"].(bool)
527 if ok && needInterruption {
528 s.pool.clearAll(fmt.Errorf("spec test clear"), nil)
529 } else {
530 s.pool.clear(fmt.Errorf("spec test clear"), nil)
531 }
532 case "close":
533 s.pool.close(context.Background())
534 case "ready":
535 return s.pool.ready()
536 default:
537 t.Fatalf("unknown operation: %v", name)
538 }
539
540 return nil
541 }
542
View as plain text