...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/pool_generation_counter.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/topology

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package topology
     8  
     9  import (
    10  	"sync"
    11  	"sync/atomic"
    12  
    13  	"go.mongodb.org/mongo-driver/bson/primitive"
    14  )
    15  
    16  // Pool generation state constants.
    17  const (
    18  	generationDisconnected int64 = iota
    19  	generationConnected
    20  )
    21  
    22  // generationStats represents the version of a pool. It tracks the generation number as well as the number of
    23  // connections that have been created in the generation.
    24  type generationStats struct {
    25  	generation uint64
    26  	numConns   uint64
    27  }
    28  
    29  // poolGenerationMap tracks the version for each service ID present in a pool. For deployments that are not behind a
    30  // load balancer, there is only one service ID: primitive.NilObjectID. For load-balanced deployments, each server behind
    31  // the load balancer will have a unique service ID.
    32  type poolGenerationMap struct {
    33  	// state must be accessed using the atomic package and should be at the beginning of the struct.
    34  	// - atomic bug: https://pkg.go.dev/sync/atomic#pkg-note-BUG
    35  	// - suggested layout: https://go101.org/article/memory-layout.html
    36  	state         int64
    37  	generationMap map[primitive.ObjectID]*generationStats
    38  
    39  	sync.Mutex
    40  }
    41  
    42  func newPoolGenerationMap() *poolGenerationMap {
    43  	pgm := &poolGenerationMap{
    44  		generationMap: make(map[primitive.ObjectID]*generationStats),
    45  	}
    46  	pgm.generationMap[primitive.NilObjectID] = &generationStats{}
    47  	return pgm
    48  }
    49  
    50  func (p *poolGenerationMap) connect() {
    51  	atomic.StoreInt64(&p.state, generationConnected)
    52  }
    53  
    54  func (p *poolGenerationMap) disconnect() {
    55  	atomic.StoreInt64(&p.state, generationDisconnected)
    56  }
    57  
    58  // addConnection increments the connection count for the generation associated with the given service ID and returns the
    59  // generation number for the connection.
    60  func (p *poolGenerationMap) addConnection(serviceIDPtr *primitive.ObjectID) uint64 {
    61  	serviceID := getServiceID(serviceIDPtr)
    62  	p.Lock()
    63  	defer p.Unlock()
    64  
    65  	stats, ok := p.generationMap[serviceID]
    66  	if ok {
    67  		// If the serviceID is already being tracked, we only need to increment the connection count.
    68  		stats.numConns++
    69  		return stats.generation
    70  	}
    71  
    72  	// If the serviceID is untracked, create a new entry with a starting generation number of 0.
    73  	stats = &generationStats{
    74  		numConns: 1,
    75  	}
    76  	p.generationMap[serviceID] = stats
    77  	return 0
    78  }
    79  
    80  func (p *poolGenerationMap) removeConnection(serviceIDPtr *primitive.ObjectID) {
    81  	serviceID := getServiceID(serviceIDPtr)
    82  	p.Lock()
    83  	defer p.Unlock()
    84  
    85  	stats, ok := p.generationMap[serviceID]
    86  	if !ok {
    87  		return
    88  	}
    89  
    90  	// If the serviceID is being tracked, decrement the connection count and delete this serviceID to prevent the map
    91  	// from growing unboundedly. This case would happen if a server behind a load-balancer was permanently removed
    92  	// and its connections were pruned after a network error or idle timeout.
    93  	stats.numConns--
    94  	if stats.numConns == 0 {
    95  		delete(p.generationMap, serviceID)
    96  	}
    97  }
    98  
    99  func (p *poolGenerationMap) clear(serviceIDPtr *primitive.ObjectID) {
   100  	serviceID := getServiceID(serviceIDPtr)
   101  	p.Lock()
   102  	defer p.Unlock()
   103  
   104  	if stats, ok := p.generationMap[serviceID]; ok {
   105  		stats.generation++
   106  	}
   107  }
   108  
   109  func (p *poolGenerationMap) stale(serviceIDPtr *primitive.ObjectID, knownGeneration uint64) bool {
   110  	// If the map has been disconnected, all connections should be considered stale to ensure that they're closed.
   111  	if atomic.LoadInt64(&p.state) == generationDisconnected {
   112  		return true
   113  	}
   114  
   115  	if generation, ok := p.getGeneration(serviceIDPtr); ok {
   116  		return knownGeneration < generation
   117  	}
   118  	return false
   119  }
   120  
   121  func (p *poolGenerationMap) getGeneration(serviceIDPtr *primitive.ObjectID) (uint64, bool) {
   122  	serviceID := getServiceID(serviceIDPtr)
   123  	p.Lock()
   124  	defer p.Unlock()
   125  
   126  	if stats, ok := p.generationMap[serviceID]; ok {
   127  		return stats.generation, true
   128  	}
   129  	return 0, false
   130  }
   131  
   132  func (p *poolGenerationMap) getNumConns(serviceIDPtr *primitive.ObjectID) uint64 {
   133  	serviceID := getServiceID(serviceIDPtr)
   134  	p.Lock()
   135  	defer p.Unlock()
   136  
   137  	if stats, ok := p.generationMap[serviceID]; ok {
   138  		return stats.numConns
   139  	}
   140  	return 0
   141  }
   142  
   143  func getServiceID(oid *primitive.ObjectID) primitive.ObjectID {
   144  	if oid == nil {
   145  		return primitive.NilObjectID
   146  	}
   147  	return *oid
   148  }
   149  

View as plain text