1
2
3
4
5
6
7 package unified
8
9 import (
10 "context"
11 "fmt"
12 "sync"
13
14 "go.mongodb.org/mongo-driver/bson"
15 "go.mongodb.org/mongo-driver/internal/logger"
16 )
17
18
19
20 var errLoggerVerification = fmt.Errorf("logger verification failed")
21
22
23 type logMessage struct {
24 LevelLiteral string `bson:"level"`
25 ComponentLiteral string `bson:"component"`
26 Data bson.Raw `bson:"data"`
27 FailureIsRedacted bool `bson:"failureIsRedacted"`
28 }
29
30
31
32 func newLogMessage(level int, msg string, args ...interface{}) (*logMessage, error) {
33 logMessage := new(logMessage)
34
35
36
37
38
39 for literal, logLevel := range logger.LevelLiteralMap {
40 if level == int(logLevel) {
41 logMessage.LevelLiteral = literal
42
43 break
44 }
45 }
46
47
48
49 if len(args)%2 != 0 {
50 return nil, fmt.Errorf("%w: invalid arguments: %v", errLoggerVerification, args)
51 }
52
53
54 actualD := bson.D{{"message", msg}}
55 for i := 0; i < len(args); i += 2 {
56 actualD = append(actualD, bson.E{
57 Key: args[i].(string),
58 Value: args[i+1],
59 })
60 }
61
62
63
64 bytes, err := bson.Marshal(actualD)
65 if err != nil {
66 return nil, fmt.Errorf("%w: failed to marshal: %v", errLoggerVerification, err)
67 }
68
69 logMessage.Data = bson.Raw(bytes)
70
71 return logMessage, nil
72 }
73
74
75
76 type clientLogMessages struct {
77 Client string `bson:"client"`
78 IgnoreMessages []*logMessage `bson:"ignoreMessages"`
79 LogMessages []*logMessage `bson:"messages"`
80 }
81
82
83
84 type logMessageValidator struct {
85 testCase *TestCase
86 clientErrs map[string]chan error
87 }
88
89
90 func newLogMessageValidator(testCase *TestCase) *logMessageValidator {
91 validator := &logMessageValidator{testCase: testCase}
92 validator.clientErrs = make(map[string]chan error)
93
94
95 for _, exp := range testCase.ExpectLogMessages {
96 validator.clientErrs[exp.Client] = make(chan error)
97 }
98
99 return validator
100 }
101
102 func logQueue(ctx context.Context, exp *clientLogMessages) <-chan orderedLogMessage {
103 clients := entities(ctx).clients()
104
105 clientEntity, ok := clients[exp.Client]
106 if !ok {
107 return nil
108 }
109
110 return clientEntity.logQueue
111 }
112
113
114 func verifyLogMatch(ctx context.Context, exp, act *logMessage) error {
115 if act == nil && exp == nil {
116 return nil
117 }
118
119 if act == nil || exp == nil {
120 return fmt.Errorf("%w: document mismatch", errLoggerVerification)
121 }
122
123 levelExp := logger.ParseLevel(exp.LevelLiteral)
124 levelAct := logger.ParseLevel(act.LevelLiteral)
125
126
127
128 if levelExp != levelAct {
129 return fmt.Errorf("%w: level mismatch: want %v, got %v",
130 errLoggerVerification, levelExp, levelAct)
131 }
132
133 rawExp := documentToRawValue(exp.Data)
134 rawAct := documentToRawValue(act.Data)
135
136
137
138
139 if err := verifyValuesMatch(ctx, rawExp, rawAct, true); err != nil {
140 return fmt.Errorf("%w: document length mismatch: %v", errLoggerVerification, err)
141 }
142
143 return nil
144 }
145
146
147
148 func isUnorderedLog(log *logMessage) bool {
149 msg, err := log.Data.LookupErr(logger.KeyMessage)
150 if err != nil {
151 return false
152 }
153
154 msgStr := msg.StringValue()
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171 return msgStr == logger.ConnectionCheckoutFailed ||
172 msgStr == logger.ConnectionClosed ||
173 msgStr == logger.ConnectionPoolCleared
174 }
175
176 type logQueues struct {
177 expected *clientLogMessages
178 ordered <-chan *logMessage
179 unordered <-chan *logMessage
180 }
181
182
183
184 func partitionLogQueue(ctx context.Context, exp *clientLogMessages) logQueues {
185 orderedLogCh := make(chan *logMessage, len(exp.LogMessages))
186 unorderedLogCh := make(chan *logMessage, len(exp.LogMessages))
187
188
189 unorderedIndices := make(map[int]struct{})
190 for i, log := range exp.LogMessages {
191 if isUnorderedLog(log) {
192 unorderedIndices[i] = struct{}{}
193 }
194 }
195
196 go func() {
197 defer close(orderedLogCh)
198 defer close(unorderedLogCh)
199
200 for actual := range logQueue(ctx, exp) {
201 msg := actual.logMessage
202 if _, ok := unorderedIndices[actual.order-2]; ok {
203 unorderedLogCh <- msg
204 } else {
205 orderedLogCh <- msg
206 }
207 }
208 }()
209
210 return logQueues{
211 expected: exp,
212 ordered: orderedLogCh,
213 unordered: unorderedLogCh,
214 }
215 }
216
217 func matchOrderedLogs(ctx context.Context, logs logQueues) <-chan error {
218
219 expLogMessages := make([]*logMessage, 0, len(logs.expected.LogMessages))
220 for _, log := range logs.expected.LogMessages {
221 if !isUnorderedLog(log) {
222 expLogMessages = append(expLogMessages, log)
223 }
224 }
225
226 errs := make(chan error, 1)
227
228 go func() {
229 defer close(errs)
230
231 for actual := range logs.ordered {
232 expected := expLogMessages[0]
233 if expected == nil {
234 continue
235 }
236
237 err := verifyLogMatch(ctx, expected, actual)
238 if err != nil {
239 errs <- err
240 }
241
242
243 expLogMessages = expLogMessages[1:]
244 }
245 }()
246
247 return errs
248 }
249
250 func matchUnorderedLogs(ctx context.Context, logs logQueues) <-chan error {
251 unordered := make(map[*logMessage]struct{}, len(logs.expected.LogMessages))
252
253 for _, log := range logs.expected.LogMessages {
254 if isUnorderedLog(log) {
255 unordered[log] = struct{}{}
256 }
257 }
258
259 errs := make(chan error, 1)
260
261 go func() {
262 defer close(errs)
263
264
265 actualMessageSet := map[string]bool{}
266
267 for actual := range logs.unordered {
268 msg, err := actual.Data.LookupErr(logger.KeyMessage)
269 if err != nil {
270 errs <- fmt.Errorf("could not lookup message from unordered log: %w", err)
271
272 break
273 }
274
275 msgStr := msg.StringValue()
276 if msgStr == logger.ConnectionPoolCleared && actualMessageSet[logger.ConnectionClosed] {
277 errs <- fmt.Errorf("connection has been closed before the pool could clear")
278 }
279
280
281
282
283 for expected := range unordered {
284 err = verifyLogMatch(ctx, expected, actual)
285 if err == nil {
286
287
288 delete(unordered, expected)
289
290 break
291 }
292 }
293
294
295 if err != nil {
296 errs <- err
297 }
298
299 actualMessageSet[msgStr] = true
300 }
301 }()
302
303 return errs
304 }
305
306
307
308
309 func startLogValidators(ctx context.Context, validator *logMessageValidator) {
310 for _, expected := range validator.testCase.ExpectLogMessages {
311 logs := partitionLogQueue(ctx, expected)
312
313 wg := &sync.WaitGroup{}
314 wg.Add(2)
315
316 go func(expected *clientLogMessages) {
317 defer wg.Done()
318
319 errCh := matchOrderedLogs(ctx, logs)
320 if errCh == nil {
321 return
322 }
323
324 if errs := <-errCh; errs != nil {
325 validator.clientErrs[expected.Client] <- errs
326 }
327 }(expected)
328
329 go func(expected *clientLogMessages) {
330 defer wg.Done()
331
332 errCh := matchUnorderedLogs(ctx, logs)
333 if errCh == nil {
334 return
335 }
336
337 if errs := <-errCh; errs != nil {
338 validator.clientErrs[expected.Client] <- errs
339 }
340 }(expected)
341
342 go func(expected *clientLogMessages) {
343 wg.Wait()
344
345 close(validator.clientErrs[expected.Client])
346 }(expected)
347 }
348 }
349
350 func stopLogValidatorsErr(clientName string, err error) error {
351 return fmt.Errorf("%w: %s: %v", errLoggerVerification, clientName, err)
352 }
353
354
355
356 func stopLogValidators(ctx context.Context, validator *logMessageValidator) error {
357 for clientName, errChan := range validator.clientErrs {
358 select {
359 case err := <-errChan:
360 if err != nil {
361 return stopLogValidatorsErr(clientName, err)
362 }
363 case <-ctx.Done():
364 return stopLogValidatorsErr(clientName, ctx.Err())
365 }
366 }
367
368 return nil
369 }
370
View as plain text