...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/unified/logger_verification.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration/unified

     1  // Copyright (C) MongoDB, Inc. 2023-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package unified
     8  
     9  import (
    10  	"context"
    11  	"fmt"
    12  	"sync"
    13  
    14  	"go.mongodb.org/mongo-driver/bson"
    15  	"go.mongodb.org/mongo-driver/internal/logger"
    16  )
    17  
    18  // errLoggerVerification is use to wrap errors associated with validating the
    19  // correctness of logs while testing operations.
    20  var errLoggerVerification = fmt.Errorf("logger verification failed")
    21  
    22  // logMessage is a log message that is expected to be observed by the driver.
    23  type logMessage struct {
    24  	LevelLiteral      string   `bson:"level"`
    25  	ComponentLiteral  string   `bson:"component"`
    26  	Data              bson.Raw `bson:"data"`
    27  	FailureIsRedacted bool     `bson:"failureIsRedacted"`
    28  }
    29  
    30  // newLogMessage will create a "logMessage" from the level and a slice of
    31  // arguments.
    32  func newLogMessage(level int, msg string, args ...interface{}) (*logMessage, error) {
    33  	logMessage := new(logMessage)
    34  
    35  	// Iterate over the literal levels until we get the first
    36  	// "LevelLiteral" that matches the level of the "LogMessage". It doesn't
    37  	// matter which literal is chose so long as the mapping results in the
    38  	// correct level.
    39  	for literal, logLevel := range logger.LevelLiteralMap {
    40  		if level == int(logLevel) {
    41  			logMessage.LevelLiteral = literal
    42  
    43  			break
    44  		}
    45  	}
    46  
    47  	// The argument slice must have an even number of elements, otherwise it
    48  	// would not maintain the key-value structure of the document.
    49  	if len(args)%2 != 0 {
    50  		return nil, fmt.Errorf("%w: invalid arguments: %v", errLoggerVerification, args)
    51  	}
    52  
    53  	// Create a new document from the arguments.
    54  	actualD := bson.D{{"message", msg}}
    55  	for i := 0; i < len(args); i += 2 {
    56  		actualD = append(actualD, bson.E{
    57  			Key:   args[i].(string),
    58  			Value: args[i+1],
    59  		})
    60  	}
    61  
    62  	// Marshal the document into a raw value and assign it to the
    63  	// logMessage.
    64  	bytes, err := bson.Marshal(actualD)
    65  	if err != nil {
    66  		return nil, fmt.Errorf("%w: failed to marshal: %v", errLoggerVerification, err)
    67  	}
    68  
    69  	logMessage.Data = bson.Raw(bytes)
    70  
    71  	return logMessage, nil
    72  }
    73  
    74  // clientLogMessages is a struct representing the expected "LogMessages" for a
    75  // client.
    76  type clientLogMessages struct {
    77  	Client         string        `bson:"client"`
    78  	IgnoreMessages []*logMessage `bson:"ignoreMessages"`
    79  	LogMessages    []*logMessage `bson:"messages"`
    80  }
    81  
    82  // logMessageValidator defines the expectation for log messages across all
    83  // clients.
    84  type logMessageValidator struct {
    85  	testCase   *TestCase
    86  	clientErrs map[string]chan error
    87  }
    88  
    89  // newLogMessageValidator will create a new logMessageValidator.
    90  func newLogMessageValidator(testCase *TestCase) *logMessageValidator {
    91  	validator := &logMessageValidator{testCase: testCase}
    92  	validator.clientErrs = make(map[string]chan error)
    93  
    94  	// Make the error channels for the clients.
    95  	for _, exp := range testCase.ExpectLogMessages {
    96  		validator.clientErrs[exp.Client] = make(chan error)
    97  	}
    98  
    99  	return validator
   100  }
   101  
   102  func logQueue(ctx context.Context, exp *clientLogMessages) <-chan orderedLogMessage {
   103  	clients := entities(ctx).clients()
   104  
   105  	clientEntity, ok := clients[exp.Client]
   106  	if !ok {
   107  		return nil
   108  	}
   109  
   110  	return clientEntity.logQueue
   111  }
   112  
   113  // verifyLogMatch will verify that the actual log match the expected log.
   114  func verifyLogMatch(ctx context.Context, exp, act *logMessage) error {
   115  	if act == nil && exp == nil {
   116  		return nil
   117  	}
   118  
   119  	if act == nil || exp == nil {
   120  		return fmt.Errorf("%w: document mismatch", errLoggerVerification)
   121  	}
   122  
   123  	levelExp := logger.ParseLevel(exp.LevelLiteral)
   124  	levelAct := logger.ParseLevel(act.LevelLiteral)
   125  
   126  	// The levels of the expected log message and the actual log message
   127  	// must match, upto logger.Level.
   128  	if levelExp != levelAct {
   129  		return fmt.Errorf("%w: level mismatch: want %v, got %v",
   130  			errLoggerVerification, levelExp, levelAct)
   131  	}
   132  
   133  	rawExp := documentToRawValue(exp.Data)
   134  	rawAct := documentToRawValue(act.Data)
   135  
   136  	// Top level data does not have to be 1-1 with the expectation, there
   137  	// are a number of unrequired fields that may not be present on the
   138  	// expected document.
   139  	if err := verifyValuesMatch(ctx, rawExp, rawAct, true); err != nil {
   140  		return fmt.Errorf("%w: document length mismatch: %v", errLoggerVerification, err)
   141  	}
   142  
   143  	return nil
   144  }
   145  
   146  // isUnorderedLog will return true if the log is/should be unordered in the Go
   147  // Driver.
   148  func isUnorderedLog(log *logMessage) bool {
   149  	msg, err := log.Data.LookupErr(logger.KeyMessage)
   150  	if err != nil {
   151  		return false
   152  	}
   153  
   154  	msgStr := msg.StringValue()
   155  
   156  	// There is a race condition in the connection pool's workflow where it
   157  	// is non-deterministic whether the connection pool will fail a checkout
   158  	// or close a connection first. Because of this, either log may be
   159  	// received in any order. To account for this behavior, we considered
   160  	// both logs to be "unordered".
   161  	//
   162  	// The connection pool must clear before the connection is closed.
   163  	// However, either of these conditions are valid:
   164  	//
   165  	//   1. connection checkout failed > connection pool cleared
   166  	//   2. connection pool cleared > connection checkout failed
   167  	//
   168  	// Therefore, the ConnectionPoolCleared literal is added to the
   169  	// unordered list. The check for cleared > closed is made in the
   170  	// matching logic.
   171  	return msgStr == logger.ConnectionCheckoutFailed ||
   172  		msgStr == logger.ConnectionClosed ||
   173  		msgStr == logger.ConnectionPoolCleared
   174  }
   175  
   176  type logQueues struct {
   177  	expected  *clientLogMessages
   178  	ordered   <-chan *logMessage
   179  	unordered <-chan *logMessage
   180  }
   181  
   182  // partitionLogQueue will partition the expected logs into "unordered" and
   183  // "ordered" log channels.
   184  func partitionLogQueue(ctx context.Context, exp *clientLogMessages) logQueues {
   185  	orderedLogCh := make(chan *logMessage, len(exp.LogMessages))
   186  	unorderedLogCh := make(chan *logMessage, len(exp.LogMessages))
   187  
   188  	// Get the unordered indices from the expected log messages.
   189  	unorderedIndices := make(map[int]struct{})
   190  	for i, log := range exp.LogMessages {
   191  		if isUnorderedLog(log) {
   192  			unorderedIndices[i] = struct{}{}
   193  		}
   194  	}
   195  
   196  	go func() {
   197  		defer close(orderedLogCh)
   198  		defer close(unorderedLogCh)
   199  
   200  		for actual := range logQueue(ctx, exp) {
   201  			msg := actual.logMessage
   202  			if _, ok := unorderedIndices[actual.order-2]; ok {
   203  				unorderedLogCh <- msg
   204  			} else {
   205  				orderedLogCh <- msg
   206  			}
   207  		}
   208  	}()
   209  
   210  	return logQueues{
   211  		expected:  exp,
   212  		ordered:   orderedLogCh,
   213  		unordered: unorderedLogCh,
   214  	}
   215  }
   216  
   217  func matchOrderedLogs(ctx context.Context, logs logQueues) <-chan error {
   218  	// Remove all of the unordered log messages from the expected.
   219  	expLogMessages := make([]*logMessage, 0, len(logs.expected.LogMessages))
   220  	for _, log := range logs.expected.LogMessages {
   221  		if !isUnorderedLog(log) {
   222  			expLogMessages = append(expLogMessages, log)
   223  		}
   224  	}
   225  
   226  	errs := make(chan error, 1)
   227  
   228  	go func() {
   229  		defer close(errs)
   230  
   231  		for actual := range logs.ordered {
   232  			expected := expLogMessages[0]
   233  			if expected == nil {
   234  				continue
   235  			}
   236  
   237  			err := verifyLogMatch(ctx, expected, actual)
   238  			if err != nil {
   239  				errs <- err
   240  			}
   241  
   242  			// Remove the first element from the expected log.
   243  			expLogMessages = expLogMessages[1:]
   244  		}
   245  	}()
   246  
   247  	return errs
   248  }
   249  
   250  func matchUnorderedLogs(ctx context.Context, logs logQueues) <-chan error {
   251  	unordered := make(map[*logMessage]struct{}, len(logs.expected.LogMessages))
   252  
   253  	for _, log := range logs.expected.LogMessages {
   254  		if isUnorderedLog(log) {
   255  			unordered[log] = struct{}{}
   256  		}
   257  	}
   258  
   259  	errs := make(chan error, 1)
   260  
   261  	go func() {
   262  		defer close(errs)
   263  
   264  		// Record the message literals as they occur.
   265  		actualMessageSet := map[string]bool{}
   266  
   267  		for actual := range logs.unordered {
   268  			msg, err := actual.Data.LookupErr(logger.KeyMessage)
   269  			if err != nil {
   270  				errs <- fmt.Errorf("could not lookup message from unordered log: %w", err)
   271  
   272  				break
   273  			}
   274  
   275  			msgStr := msg.StringValue()
   276  			if msgStr == logger.ConnectionPoolCleared && actualMessageSet[logger.ConnectionClosed] {
   277  				errs <- fmt.Errorf("connection has been closed before the pool could clear")
   278  			}
   279  
   280  			// Iterate over the unordered log messages and verify
   281  			// that at least one of them matches the actual log
   282  			// message.
   283  			for expected := range unordered {
   284  				err = verifyLogMatch(ctx, expected, actual)
   285  				if err == nil {
   286  					// Remove the matched unordered log
   287  					// message from the unordered map.
   288  					delete(unordered, expected)
   289  
   290  					break
   291  				}
   292  			}
   293  
   294  			// If there was no match, return an error.
   295  			if err != nil {
   296  				errs <- err
   297  			}
   298  
   299  			actualMessageSet[msgStr] = true
   300  		}
   301  	}()
   302  
   303  	return errs
   304  }
   305  
   306  // startLogValidators will start a goroutine for each client's expected log
   307  // messages, listening to the channel of actual log messages and comparing them
   308  // to the expected log messages.
   309  func startLogValidators(ctx context.Context, validator *logMessageValidator) {
   310  	for _, expected := range validator.testCase.ExpectLogMessages {
   311  		logs := partitionLogQueue(ctx, expected)
   312  
   313  		wg := &sync.WaitGroup{}
   314  		wg.Add(2)
   315  
   316  		go func(expected *clientLogMessages) {
   317  			defer wg.Done()
   318  
   319  			errCh := matchOrderedLogs(ctx, logs)
   320  			if errCh == nil {
   321  				return
   322  			}
   323  
   324  			if errs := <-errCh; errs != nil {
   325  				validator.clientErrs[expected.Client] <- errs
   326  			}
   327  		}(expected)
   328  
   329  		go func(expected *clientLogMessages) {
   330  			defer wg.Done()
   331  
   332  			errCh := matchUnorderedLogs(ctx, logs)
   333  			if errCh == nil {
   334  				return
   335  			}
   336  
   337  			if errs := <-errCh; errs != nil {
   338  				validator.clientErrs[expected.Client] <- errs
   339  			}
   340  		}(expected)
   341  
   342  		go func(expected *clientLogMessages) {
   343  			wg.Wait()
   344  
   345  			close(validator.clientErrs[expected.Client])
   346  		}(expected)
   347  	}
   348  }
   349  
   350  func stopLogValidatorsErr(clientName string, err error) error {
   351  	return fmt.Errorf("%w: %s: %v", errLoggerVerification, clientName, err)
   352  }
   353  
   354  // stopLogValidators will gracefully validate all log messages received by all
   355  // clients and return the first error encountered.
   356  func stopLogValidators(ctx context.Context, validator *logMessageValidator) error {
   357  	for clientName, errChan := range validator.clientErrs {
   358  		select {
   359  		case err := <-errChan:
   360  			if err != nil {
   361  				return stopLogValidatorsErr(clientName, err)
   362  			}
   363  		case <-ctx.Done():
   364  			return stopLogValidatorsErr(clientName, ctx.Err())
   365  		}
   366  	}
   367  
   368  	return nil
   369  }
   370  

View as plain text