...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/unified_runner_thread_helpers_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package integration
     8  
     9  import (
    10  	"fmt"
    11  	"sync"
    12  
    13  	"go.mongodb.org/mongo-driver/bson"
    14  	"go.mongodb.org/mongo-driver/internal/assert"
    15  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    16  )
    17  
    18  // Helper functions for the operations in the unified spec test runner that require creating and synchronizing
    19  // background goroutines.
    20  
    21  // backgroundRoutine represents a background goroutine that can execute operations. The goroutine reads operations from
    22  // a channel and executes them in order. It exits when an operation with name exitRoutineOperationName is read. If any
    23  // of the operations error, all future operations passed to the routine are skipped. The first error is reported by the
    24  // stop() function.
    25  type backgroundRoutine struct {
    26  	operations chan *operation
    27  	mt         *mtest.T
    28  	testCase   *testCase
    29  	wg         sync.WaitGroup
    30  	err        error
    31  }
    32  
    33  func newBackgroundRoutine(mt *mtest.T, testCase *testCase) *backgroundRoutine {
    34  	routine := &backgroundRoutine{
    35  		operations: make(chan *operation, 10),
    36  		mt:         mt,
    37  		testCase:   testCase,
    38  	}
    39  
    40  	return routine
    41  }
    42  
    43  func (b *backgroundRoutine) start() {
    44  	b.wg.Add(1)
    45  
    46  	go func() {
    47  		defer b.wg.Done()
    48  
    49  		for op := range b.operations {
    50  			if b.err != nil {
    51  				continue
    52  			}
    53  
    54  			if err := runOperation(b.mt, b.testCase, op, nil, nil); err != nil {
    55  				b.err = fmt.Errorf("error running operation %s: %w", op.Name, err)
    56  			}
    57  		}
    58  	}()
    59  }
    60  
    61  func (b *backgroundRoutine) stop() error {
    62  	close(b.operations)
    63  	b.wg.Wait()
    64  	return b.err
    65  }
    66  
    67  func (b *backgroundRoutine) addOperation(op *operation) bool {
    68  	select {
    69  	case b.operations <- op:
    70  		return true
    71  	default:
    72  		return false
    73  	}
    74  }
    75  
    76  func startThread(mt *mtest.T, testCase *testCase, op *operation) {
    77  	routine := newBackgroundRoutine(mt, testCase)
    78  	testCase.routinesMap.Store(getThreadName(op), routine)
    79  	routine.start()
    80  }
    81  
    82  func runOnThread(mt *mtest.T, testCase *testCase, op *operation) {
    83  	routineName := getThreadName(op)
    84  	routineVal, ok := testCase.routinesMap.Load(routineName)
    85  	assert.True(mt, ok, "no background routine found with name %s", routineName)
    86  	routine := routineVal.(*backgroundRoutine)
    87  
    88  	var routineOperation operation
    89  	operationDoc := op.Arguments.Lookup("operation")
    90  	err := bson.UnmarshalWithRegistry(specTestRegistry, operationDoc.Document(), &routineOperation)
    91  	assert.Nil(mt, err, "error creating operation for runOnThread: %v", err)
    92  
    93  	ok = routine.addOperation(&routineOperation)
    94  	assert.True(mt, ok, "failed to add operation %s to routine %s", routineOperation.Name, routineName)
    95  }
    96  
    97  func waitForThread(mt *mtest.T, testCase *testCase, op *operation) {
    98  	name := getThreadName(op)
    99  	routineVal, ok := testCase.routinesMap.Load(name)
   100  	assert.True(mt, ok, "no background routine found with name %s", name)
   101  
   102  	err := routineVal.(*backgroundRoutine).stop()
   103  	testCase.routinesMap.Delete(name)
   104  	assert.Nil(mt, err, "error on background routine %s: %v", name, err)
   105  }
   106  
   107  func getThreadName(op *operation) string {
   108  	return op.Arguments.Lookup("name").StringValue()
   109  }
   110  

View as plain text