...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/session/session_pool.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/session

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package session
     8  
     9  import (
    10  	"sync"
    11  	"sync/atomic"
    12  
    13  	"go.mongodb.org/mongo-driver/mongo/description"
    14  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    15  )
    16  
    17  // Node represents a server session in a linked list
    18  type Node struct {
    19  	*Server
    20  	next *Node
    21  	prev *Node
    22  }
    23  
    24  // topologyDescription is used to track a subset of the fields present in a description.Topology instance that are
    25  // relevant for determining session expiration.
    26  type topologyDescription struct {
    27  	kind           description.TopologyKind
    28  	timeoutMinutes *int64
    29  }
    30  
    31  // Pool is a pool of server sessions that can be reused.
    32  type Pool struct {
    33  	// number of sessions checked out of pool (accessed atomically)
    34  	checkedOut int64
    35  
    36  	descChan       <-chan description.Topology
    37  	head           *Node
    38  	tail           *Node
    39  	latestTopology topologyDescription
    40  	mutex          sync.Mutex // mutex to protect list and sessionTimeout
    41  }
    42  
    43  func (p *Pool) createServerSession() (*Server, error) {
    44  	s, err := newServerSession()
    45  	if err != nil {
    46  		return nil, err
    47  	}
    48  
    49  	atomic.AddInt64(&p.checkedOut, 1)
    50  	return s, nil
    51  }
    52  
    53  // NewPool creates a new server session pool
    54  func NewPool(descChan <-chan description.Topology) *Pool {
    55  	p := &Pool{
    56  		descChan: descChan,
    57  	}
    58  
    59  	return p
    60  }
    61  
    62  // assumes caller has mutex to protect the pool
    63  func (p *Pool) updateTimeout() {
    64  	select {
    65  	case newDesc := <-p.descChan:
    66  		p.latestTopology = topologyDescription{
    67  			kind:           newDesc.Kind,
    68  			timeoutMinutes: newDesc.SessionTimeoutMinutesPtr,
    69  		}
    70  	default:
    71  		// no new description waiting
    72  	}
    73  }
    74  
    75  // GetSession retrieves an unexpired session from the pool.
    76  func (p *Pool) GetSession() (*Server, error) {
    77  	p.mutex.Lock() // prevent changing the linked list while seeing if sessions have expired
    78  	defer p.mutex.Unlock()
    79  
    80  	// empty pool
    81  	if p.head == nil && p.tail == nil {
    82  		return p.createServerSession()
    83  	}
    84  
    85  	p.updateTimeout()
    86  	for p.head != nil {
    87  		// pull session from head of queue and return if it is valid for at least 1 more minute
    88  		if p.head.expired(p.latestTopology) {
    89  			p.head = p.head.next
    90  			continue
    91  		}
    92  
    93  		// found unexpired session
    94  		session := p.head.Server
    95  		if p.head.next != nil {
    96  			p.head.next.prev = nil
    97  		}
    98  		if p.tail == p.head {
    99  			p.tail = nil
   100  			p.head = nil
   101  		} else {
   102  			p.head = p.head.next
   103  		}
   104  
   105  		atomic.AddInt64(&p.checkedOut, 1)
   106  		return session, nil
   107  	}
   108  
   109  	// no valid session found
   110  	p.tail = nil // empty list
   111  	return p.createServerSession()
   112  }
   113  
   114  // ReturnSession returns a session to the pool if it has not expired.
   115  func (p *Pool) ReturnSession(ss *Server) {
   116  	if ss == nil {
   117  		return
   118  	}
   119  
   120  	p.mutex.Lock()
   121  	defer p.mutex.Unlock()
   122  
   123  	atomic.AddInt64(&p.checkedOut, -1)
   124  	p.updateTimeout()
   125  	// check sessions at end of queue for expired
   126  	// stop checking after hitting the first valid session
   127  	for p.tail != nil && p.tail.expired(p.latestTopology) {
   128  		if p.tail.prev != nil {
   129  			p.tail.prev.next = nil
   130  		}
   131  		p.tail = p.tail.prev
   132  	}
   133  
   134  	// session expired
   135  	if ss.expired(p.latestTopology) {
   136  		return
   137  	}
   138  
   139  	// session is dirty
   140  	if ss.Dirty {
   141  		return
   142  	}
   143  
   144  	newNode := &Node{
   145  		Server: ss,
   146  		next:   nil,
   147  		prev:   nil,
   148  	}
   149  
   150  	// empty list
   151  	if p.tail == nil {
   152  		p.head = newNode
   153  		p.tail = newNode
   154  		return
   155  	}
   156  
   157  	// at least 1 valid session in list
   158  	newNode.next = p.head
   159  	p.head.prev = newNode
   160  	p.head = newNode
   161  }
   162  
   163  // IDSlice returns a slice of session IDs for each session in the pool
   164  func (p *Pool) IDSlice() []bsoncore.Document {
   165  	p.mutex.Lock()
   166  	defer p.mutex.Unlock()
   167  
   168  	var ids []bsoncore.Document
   169  	for node := p.head; node != nil; node = node.next {
   170  		ids = append(ids, node.SessionID)
   171  	}
   172  
   173  	return ids
   174  }
   175  
   176  // String implements the Stringer interface
   177  func (p *Pool) String() string {
   178  	p.mutex.Lock()
   179  	defer p.mutex.Unlock()
   180  
   181  	s := ""
   182  	for head := p.head; head != nil; head = head.next {
   183  		s += head.SessionID.String() + "\n"
   184  	}
   185  
   186  	return s
   187  }
   188  
   189  // CheckedOut returns number of sessions checked out from pool.
   190  func (p *Pool) CheckedOut() int64 {
   191  	return atomic.LoadInt64(&p.checkedOut)
   192  }
   193  

View as plain text