...

Source file src/google.golang.org/grpc/internal/idle/idle.go

Documentation: google.golang.org/grpc/internal/idle

     1  /*
     2   *
     3   * Copyright 2023 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package idle contains a component for managing idleness (entering and exiting)
    20  // based on RPC activity.
    21  package idle
    22  
    23  import (
    24  	"fmt"
    25  	"math"
    26  	"sync"
    27  	"sync/atomic"
    28  	"time"
    29  )
    30  
    31  // For overriding in unit tests.
    32  var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
    33  	return time.AfterFunc(d, f)
    34  }
    35  
    36  // Enforcer is the functionality provided by grpc.ClientConn to enter
    37  // and exit from idle mode.
    38  type Enforcer interface {
    39  	ExitIdleMode() error
    40  	EnterIdleMode()
    41  }
    42  
    43  // Manager implements idleness detection and calls the configured Enforcer to
    44  // enter/exit idle mode when appropriate.  Must be created by NewManager.
    45  type Manager struct {
    46  	// State accessed atomically.
    47  	lastCallEndTime           int64 // Unix timestamp in nanos; time when the most recent RPC completed.
    48  	activeCallsCount          int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there.
    49  	activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback.
    50  	closed                    int32 // Boolean; True when the manager is closed.
    51  
    52  	// Can be accessed without atomics or mutex since these are set at creation
    53  	// time and read-only after that.
    54  	enforcer Enforcer // Functionality provided by grpc.ClientConn.
    55  	timeout  time.Duration
    56  
    57  	// idleMu is used to guarantee mutual exclusion in two scenarios:
    58  	// - Opposing intentions:
    59  	//   - a: Idle timeout has fired and handleIdleTimeout() is trying to put
    60  	//     the channel in idle mode because the channel has been inactive.
    61  	//   - b: At the same time an RPC is made on the channel, and OnCallBegin()
    62  	//     is trying to prevent the channel from going idle.
    63  	// - Competing intentions:
    64  	//   - The channel is in idle mode and there are multiple RPCs starting at
    65  	//     the same time, all trying to move the channel out of idle. Only one
    66  	//     of them should succeed in doing so, while the other RPCs should
    67  	//     piggyback on the first one and be successfully handled.
    68  	idleMu       sync.RWMutex
    69  	actuallyIdle bool
    70  	timer        *time.Timer
    71  }
    72  
    73  // NewManager creates a new idleness manager implementation for the
    74  // given idle timeout.  It begins in idle mode.
    75  func NewManager(enforcer Enforcer, timeout time.Duration) *Manager {
    76  	return &Manager{
    77  		enforcer:         enforcer,
    78  		timeout:          timeout,
    79  		actuallyIdle:     true,
    80  		activeCallsCount: -math.MaxInt32,
    81  	}
    82  }
    83  
    84  // resetIdleTimerLocked resets the idle timer to the given duration.  Called
    85  // when exiting idle mode or when the timer fires and we need to reset it.
    86  func (m *Manager) resetIdleTimerLocked(d time.Duration) {
    87  	if m.isClosed() || m.timeout == 0 || m.actuallyIdle {
    88  		return
    89  	}
    90  
    91  	// It is safe to ignore the return value from Reset() because this method is
    92  	// only ever called from the timer callback or when exiting idle mode.
    93  	if m.timer != nil {
    94  		m.timer.Stop()
    95  	}
    96  	m.timer = timeAfterFunc(d, m.handleIdleTimeout)
    97  }
    98  
    99  func (m *Manager) resetIdleTimer(d time.Duration) {
   100  	m.idleMu.Lock()
   101  	defer m.idleMu.Unlock()
   102  	m.resetIdleTimerLocked(d)
   103  }
   104  
   105  // handleIdleTimeout is the timer callback that is invoked upon expiry of the
   106  // configured idle timeout. The channel is considered inactive if there are no
   107  // ongoing calls and no RPC activity since the last time the timer fired.
   108  func (m *Manager) handleIdleTimeout() {
   109  	if m.isClosed() {
   110  		return
   111  	}
   112  
   113  	if atomic.LoadInt32(&m.activeCallsCount) > 0 {
   114  		m.resetIdleTimer(m.timeout)
   115  		return
   116  	}
   117  
   118  	// There has been activity on the channel since we last got here. Reset the
   119  	// timer and return.
   120  	if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
   121  		// Set the timer to fire after a duration of idle timeout, calculated
   122  		// from the time the most recent RPC completed.
   123  		atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0)
   124  		m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime)-time.Now().UnixNano()) + m.timeout)
   125  		return
   126  	}
   127  
   128  	// Now that we've checked that there has been no activity, attempt to enter
   129  	// idle mode, which is very likely to succeed.
   130  	if m.tryEnterIdleMode() {
   131  		// Successfully entered idle mode. No timer needed until we exit idle.
   132  		return
   133  	}
   134  
   135  	// Failed to enter idle mode due to a concurrent RPC that kept the channel
   136  	// active, or because of an error from the channel. Undo the attempt to
   137  	// enter idle, and reset the timer to try again later.
   138  	m.resetIdleTimer(m.timeout)
   139  }
   140  
   141  // tryEnterIdleMode instructs the channel to enter idle mode. But before
   142  // that, it performs a last minute check to ensure that no new RPC has come in,
   143  // making the channel active.
   144  //
   145  // Return value indicates whether or not the channel moved to idle mode.
   146  //
   147  // Holds idleMu which ensures mutual exclusion with exitIdleMode.
   148  func (m *Manager) tryEnterIdleMode() bool {
   149  	// Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin()
   150  	// that the channel is either in idle mode or is trying to get there.
   151  	if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) {
   152  		// This CAS operation can fail if an RPC started after we checked for
   153  		// activity in the timer handler, or one was ongoing from before the
   154  		// last time the timer fired, or if a test is attempting to enter idle
   155  		// mode without checking.  In all cases, abort going into idle mode.
   156  		return false
   157  	}
   158  	// N.B. if we fail to enter idle mode after this, we must re-add
   159  	// math.MaxInt32 to m.activeCallsCount.
   160  
   161  	m.idleMu.Lock()
   162  	defer m.idleMu.Unlock()
   163  
   164  	if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 {
   165  		// We raced and lost to a new RPC. Very rare, but stop entering idle.
   166  		atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
   167  		return false
   168  	}
   169  	if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
   170  		// A very short RPC could have come in (and also finished) after we
   171  		// checked for calls count and activity in handleIdleTimeout(), but
   172  		// before the CAS operation. So, we need to check for activity again.
   173  		atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
   174  		return false
   175  	}
   176  
   177  	// No new RPCs have come in since we set the active calls count value to
   178  	// -math.MaxInt32. And since we have the lock, it is safe to enter idle mode
   179  	// unconditionally now.
   180  	m.enforcer.EnterIdleMode()
   181  	m.actuallyIdle = true
   182  	return true
   183  }
   184  
   185  func (m *Manager) EnterIdleModeForTesting() {
   186  	m.tryEnterIdleMode()
   187  }
   188  
   189  // OnCallBegin is invoked at the start of every RPC.
   190  func (m *Manager) OnCallBegin() error {
   191  	if m.isClosed() {
   192  		return nil
   193  	}
   194  
   195  	if atomic.AddInt32(&m.activeCallsCount, 1) > 0 {
   196  		// Channel is not idle now. Set the activity bit and allow the call.
   197  		atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
   198  		return nil
   199  	}
   200  
   201  	// Channel is either in idle mode or is in the process of moving to idle
   202  	// mode. Attempt to exit idle mode to allow this RPC.
   203  	if err := m.ExitIdleMode(); err != nil {
   204  		// Undo the increment to calls count, and return an error causing the
   205  		// RPC to fail.
   206  		atomic.AddInt32(&m.activeCallsCount, -1)
   207  		return err
   208  	}
   209  
   210  	atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
   211  	return nil
   212  }
   213  
   214  // ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's
   215  // internal state.
   216  func (m *Manager) ExitIdleMode() error {
   217  	// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
   218  	m.idleMu.Lock()
   219  	defer m.idleMu.Unlock()
   220  
   221  	if m.isClosed() || !m.actuallyIdle {
   222  		// This can happen in three scenarios:
   223  		// - handleIdleTimeout() set the calls count to -math.MaxInt32 and called
   224  		//   tryEnterIdleMode(). But before the latter could grab the lock, an RPC
   225  		//   came in and OnCallBegin() noticed that the calls count is negative.
   226  		// - Channel is in idle mode, and multiple new RPCs come in at the same
   227  		//   time, all of them notice a negative calls count in OnCallBegin and get
   228  		//   here. The first one to get the lock would got the channel to exit idle.
   229  		// - Channel is not in idle mode, and the user calls Connect which calls
   230  		//   m.ExitIdleMode.
   231  		//
   232  		// In any case, there is nothing to do here.
   233  		return nil
   234  	}
   235  
   236  	if err := m.enforcer.ExitIdleMode(); err != nil {
   237  		return fmt.Errorf("failed to exit idle mode: %w", err)
   238  	}
   239  
   240  	// Undo the idle entry process. This also respects any new RPC attempts.
   241  	atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
   242  	m.actuallyIdle = false
   243  
   244  	// Start a new timer to fire after the configured idle timeout.
   245  	m.resetIdleTimerLocked(m.timeout)
   246  	return nil
   247  }
   248  
   249  // OnCallEnd is invoked at the end of every RPC.
   250  func (m *Manager) OnCallEnd() {
   251  	if m.isClosed() {
   252  		return
   253  	}
   254  
   255  	// Record the time at which the most recent call finished.
   256  	atomic.StoreInt64(&m.lastCallEndTime, time.Now().UnixNano())
   257  
   258  	// Decrement the active calls count. This count can temporarily go negative
   259  	// when the timer callback is in the process of moving the channel to idle
   260  	// mode, but one or more RPCs come in and complete before the timer callback
   261  	// can get done with the process of moving to idle mode.
   262  	atomic.AddInt32(&m.activeCallsCount, -1)
   263  }
   264  
   265  func (m *Manager) isClosed() bool {
   266  	return atomic.LoadInt32(&m.closed) == 1
   267  }
   268  
   269  func (m *Manager) Close() {
   270  	atomic.StoreInt32(&m.closed, 1)
   271  
   272  	m.idleMu.Lock()
   273  	if m.timer != nil {
   274  		m.timer.Stop()
   275  		m.timer = nil
   276  	}
   277  	m.idleMu.Unlock()
   278  }
   279  

View as plain text