1
2
3
4
5
6
7 package integration
8
9 import (
10 "context"
11 "sync"
12 "testing"
13 "time"
14
15 "go.mongodb.org/mongo-driver/bson"
16 "go.mongodb.org/mongo-driver/event"
17 "go.mongodb.org/mongo-driver/internal/assert"
18 "go.mongodb.org/mongo-driver/internal/eventtest"
19 "go.mongodb.org/mongo-driver/internal/require"
20 "go.mongodb.org/mongo-driver/mongo/description"
21 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
22 "go.mongodb.org/mongo-driver/mongo/options"
23 )
24
25 type saturatedConnections map[uint64]bool
26
27
28 type saturatedHosts map[string]saturatedConnections
29
30 func (set saturatedHosts) add(host string, connectionID uint64) {
31 if set[host] == nil {
32 set[host] = make(saturatedConnections)
33 }
34 set[host][connectionID] = true
35 }
36
37
38 func (set saturatedHosts) isSaturated(tolerance uint64) bool {
39 for _, host := range options.Client().ApplyURI(mtest.ClusterURI()).Hosts {
40 if cxns := set[host]; cxns == nil || uint64(len(cxns)) < tolerance {
41 return false
42 }
43 }
44 return true
45 }
46
47
48
49
50 func awaitSaturation(ctx context.Context, mt *mtest.T, monitor *eventtest.TestPoolMonitor, tolerance uint64) error {
51 set := make(saturatedHosts)
52 var err error
53 for !set.isSaturated(tolerance) {
54 if err = ctx.Err(); err != nil {
55 break
56 }
57 if err = mt.Coll.FindOne(ctx, bson.D{}).Err(); err != nil {
58 break
59 }
60 monitor.Events(func(evt *event.PoolEvent) bool {
61
62 if evt.Type == event.ConnectionReady {
63 set.add(evt.Address, evt.ConnectionID)
64 }
65 return true
66 })
67 }
68 return err
69 }
70
71
72
73
74 func runsServerSelection(mt *mtest.T, monitor *eventtest.TestPoolMonitor,
75 threadCount, opCount int) (map[string]int, []*event.PoolEvent) {
76 var wg sync.WaitGroup
77 for i := 0; i < threadCount; i++ {
78 wg.Add(1)
79 go func() {
80 defer wg.Done()
81
82 for i := 0; i < opCount; i++ {
83 res := mt.Coll.FindOne(context.Background(), bson.D{})
84 assert.NoError(mt.T, res.Err(), "FindOne() error for Collection '%s'", mt.Coll.Name())
85 }
86 }()
87 }
88 wg.Wait()
89
90
91
92 checkOutEvents := monitor.Events(func(evt *event.PoolEvent) bool {
93 return evt.Type == event.GetStarted
94 })
95 counts := make(map[string]int)
96 for _, evt := range checkOutEvents {
97 counts[evt.Address]++
98 }
99 assert.Equal(mt, 2, len(counts), "expected exactly 2 server addresses")
100 return counts, checkOutEvents
101 }
102
103
104
105 func TestServerSelectionProse(t *testing.T) {
106 const maxPoolSize = 10
107 const localThreshold = 30 * time.Second
108
109 mt := mtest.New(t, mtest.NewOptions().CreateClient(false))
110
111 mtOpts := mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.9")
112 mt.RunOpts("operationCount-based selection within latency window, with failpoint", mtOpts, func(mt *mtest.T) {
113 _, err := mt.Coll.InsertOne(context.Background(), bson.D{})
114 require.NoError(mt, err, "InsertOne() error")
115
116 hosts := options.Client().ApplyURI(mtest.ClusterURI()).Hosts
117 require.GreaterOrEqualf(mt, len(hosts), 2, "test cluster must have at least 2 mongos hosts")
118
119
120
121 failpointHost := hosts[0]
122 mt.ResetClient(options.Client().
123 SetHosts([]string{failpointHost}))
124 mt.SetFailPoint(mtest.FailPoint{
125 ConfigureFailPoint: "failCommand",
126 Mode: mtest.FailPointMode{
127 Times: 10000,
128 },
129 Data: mtest.FailPointData{
130 FailCommands: []string{"find"},
131 BlockConnection: true,
132 BlockTimeMS: 500,
133 AppName: "loadBalancingTest",
134 },
135 })
136
137
138 defer func() {
139 mt.ResetClient(options.Client().
140 SetHosts([]string{failpointHost}))
141 mt.ClearFailPoints()
142 }()
143
144
145
146 topologyEvents := make(chan *event.TopologyDescriptionChangedEvent, 10)
147 tpm := eventtest.NewTestPoolMonitor()
148 mt.ResetClient(options.Client().
149 SetLocalThreshold(localThreshold).
150 SetMaxPoolSize(maxPoolSize).
151 SetMinPoolSize(maxPoolSize).
152 SetHosts(hosts[:2]).
153 SetPoolMonitor(tpm.PoolMonitor).
154 SetAppName("loadBalancingTest").
155 SetServerMonitor(&event.ServerMonitor{
156 TopologyDescriptionChanged: func(evt *event.TopologyDescriptionChangedEvent) {
157 topologyEvents <- evt
158 },
159 }))
160 for evt := range topologyEvents {
161 servers := evt.NewDescription.Servers
162 if len(servers) == 2 && servers[0].Kind == description.Mongos && servers[1].Kind == description.Mongos {
163 break
164 }
165 }
166 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
167 defer cancel()
168
169 if err := awaitSaturation(ctx, mt, tpm, maxPoolSize); err != nil {
170 mt.Fatalf("Error awaiting saturation: %v", err.Error())
171 }
172
173 counts, checkOutEvents := runsServerSelection(mt, tpm, 10, 10)
174
175
176 frequency := float64(counts[failpointHost]) / float64(len(checkOutEvents))
177 assert.Lessf(mt,
178 frequency,
179 0.25,
180 "expected failpoint host %q to be selected less than 25%% of the time",
181 failpointHost)
182 })
183
184 mtOpts = mtest.NewOptions().Topologies(mtest.Sharded)
185 mt.RunOpts("operationCount-based selection within latency window, no failpoint", mtOpts, func(mt *mtest.T) {
186
187 mt.Skip("Test fails frequently, skipping. See GODRIVER-2842")
188
189 _, err := mt.Coll.InsertOne(context.Background(), bson.D{})
190 require.NoError(mt, err, "InsertOne() error")
191
192 hosts := options.Client().ApplyURI(mtest.ClusterURI()).Hosts
193 require.GreaterOrEqualf(mt, len(hosts), 2, "test cluster must have at least 2 mongos hosts")
194
195
196
197 topologyEvents := make(chan *event.TopologyDescriptionChangedEvent, 10)
198 tpm := eventtest.NewTestPoolMonitor()
199 mt.ResetClient(options.Client().
200 SetHosts(hosts[:2]).
201 SetPoolMonitor(tpm.PoolMonitor).
202 SetLocalThreshold(localThreshold).
203 SetMaxPoolSize(maxPoolSize).
204 SetMinPoolSize(maxPoolSize).
205 SetServerMonitor(&event.ServerMonitor{
206 TopologyDescriptionChanged: func(evt *event.TopologyDescriptionChangedEvent) {
207 topologyEvents <- evt
208 },
209 }))
210 for evt := range topologyEvents {
211 servers := evt.NewDescription.Servers
212 if len(servers) == 2 && servers[0].Kind == description.Mongos && servers[1].Kind == description.Mongos {
213 break
214 }
215 }
216 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
217 defer cancel()
218
219 if err := awaitSaturation(ctx, mt, tpm, maxPoolSize); err != nil {
220 mt.Fatalf("Error awaiting saturation: %v", err.Error())
221 }
222
223 counts, checkOutEvents := runsServerSelection(mt, tpm, 10, 100)
224
225
226 for addr, count := range counts {
227 frequency := float64(count) / float64(len(checkOutEvents))
228 assert.InDeltaf(mt,
229 0.5,
230 frequency,
231 0.1,
232 "expected server %q to be selected 50%% (+/- 10%%) of the time, but was selected %v%% of the time",
233 addr, frequency*100)
234 }
235 })
236 }
237
View as plain text