...
1
2
3
4
5
6
7 package integration
8
9 import (
10 "fmt"
11 "sync"
12
13 "go.mongodb.org/mongo-driver/bson"
14 "go.mongodb.org/mongo-driver/internal/assert"
15 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
16 )
17
18
19
20
21
22
23
24
25 type backgroundRoutine struct {
26 operations chan *operation
27 mt *mtest.T
28 testCase *testCase
29 wg sync.WaitGroup
30 err error
31 }
32
33 func newBackgroundRoutine(mt *mtest.T, testCase *testCase) *backgroundRoutine {
34 routine := &backgroundRoutine{
35 operations: make(chan *operation, 10),
36 mt: mt,
37 testCase: testCase,
38 }
39
40 return routine
41 }
42
43 func (b *backgroundRoutine) start() {
44 b.wg.Add(1)
45
46 go func() {
47 defer b.wg.Done()
48
49 for op := range b.operations {
50 if b.err != nil {
51 continue
52 }
53
54 if err := runOperation(b.mt, b.testCase, op, nil, nil); err != nil {
55 b.err = fmt.Errorf("error running operation %s: %w", op.Name, err)
56 }
57 }
58 }()
59 }
60
61 func (b *backgroundRoutine) stop() error {
62 close(b.operations)
63 b.wg.Wait()
64 return b.err
65 }
66
67 func (b *backgroundRoutine) addOperation(op *operation) bool {
68 select {
69 case b.operations <- op:
70 return true
71 default:
72 return false
73 }
74 }
75
76 func startThread(mt *mtest.T, testCase *testCase, op *operation) {
77 routine := newBackgroundRoutine(mt, testCase)
78 testCase.routinesMap.Store(getThreadName(op), routine)
79 routine.start()
80 }
81
82 func runOnThread(mt *mtest.T, testCase *testCase, op *operation) {
83 routineName := getThreadName(op)
84 routineVal, ok := testCase.routinesMap.Load(routineName)
85 assert.True(mt, ok, "no background routine found with name %s", routineName)
86 routine := routineVal.(*backgroundRoutine)
87
88 var routineOperation operation
89 operationDoc := op.Arguments.Lookup("operation")
90 err := bson.UnmarshalWithRegistry(specTestRegistry, operationDoc.Document(), &routineOperation)
91 assert.Nil(mt, err, "error creating operation for runOnThread: %v", err)
92
93 ok = routine.addOperation(&routineOperation)
94 assert.True(mt, ok, "failed to add operation %s to routine %s", routineOperation.Name, routineName)
95 }
96
97 func waitForThread(mt *mtest.T, testCase *testCase, op *operation) {
98 name := getThreadName(op)
99 routineVal, ok := testCase.routinesMap.Load(name)
100 assert.True(mt, ok, "no background routine found with name %s", name)
101
102 err := routineVal.(*backgroundRoutine).stop()
103 testCase.routinesMap.Delete(name)
104 assert.Nil(mt, err, "error on background routine %s: %v", name, err)
105 }
106
107 func getThreadName(op *operation) string {
108 return op.Arguments.Lookup("name").StringValue()
109 }
110
View as plain text