1 /* 2 * 3 * Copyright 2023 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 // Package idle contains a component for managing idleness (entering and exiting) 20 // based on RPC activity. 21 package idle 22 23 import ( 24 "fmt" 25 "math" 26 "sync" 27 "sync/atomic" 28 "time" 29 ) 30 31 // For overriding in unit tests. 32 var timeAfterFunc = func(d time.Duration, f func()) *time.Timer { 33 return time.AfterFunc(d, f) 34 } 35 36 // Enforcer is the functionality provided by grpc.ClientConn to enter 37 // and exit from idle mode. 38 type Enforcer interface { 39 ExitIdleMode() error 40 EnterIdleMode() 41 } 42 43 // Manager implements idleness detection and calls the configured Enforcer to 44 // enter/exit idle mode when appropriate. Must be created by NewManager. 45 type Manager struct { 46 // State accessed atomically. 47 lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed. 48 activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there. 49 activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback. 50 closed int32 // Boolean; True when the manager is closed. 51 52 // Can be accessed without atomics or mutex since these are set at creation 53 // time and read-only after that. 54 enforcer Enforcer // Functionality provided by grpc.ClientConn. 55 timeout time.Duration 56 57 // idleMu is used to guarantee mutual exclusion in two scenarios: 58 // - Opposing intentions: 59 // - a: Idle timeout has fired and handleIdleTimeout() is trying to put 60 // the channel in idle mode because the channel has been inactive. 61 // - b: At the same time an RPC is made on the channel, and OnCallBegin() 62 // is trying to prevent the channel from going idle. 63 // - Competing intentions: 64 // - The channel is in idle mode and there are multiple RPCs starting at 65 // the same time, all trying to move the channel out of idle. Only one 66 // of them should succeed in doing so, while the other RPCs should 67 // piggyback on the first one and be successfully handled. 68 idleMu sync.RWMutex 69 actuallyIdle bool 70 timer *time.Timer 71 } 72 73 // NewManager creates a new idleness manager implementation for the 74 // given idle timeout. It begins in idle mode. 75 func NewManager(enforcer Enforcer, timeout time.Duration) *Manager { 76 return &Manager{ 77 enforcer: enforcer, 78 timeout: timeout, 79 actuallyIdle: true, 80 activeCallsCount: -math.MaxInt32, 81 } 82 } 83 84 // resetIdleTimerLocked resets the idle timer to the given duration. Called 85 // when exiting idle mode or when the timer fires and we need to reset it. 86 func (m *Manager) resetIdleTimerLocked(d time.Duration) { 87 if m.isClosed() || m.timeout == 0 || m.actuallyIdle { 88 return 89 } 90 91 // It is safe to ignore the return value from Reset() because this method is 92 // only ever called from the timer callback or when exiting idle mode. 93 if m.timer != nil { 94 m.timer.Stop() 95 } 96 m.timer = timeAfterFunc(d, m.handleIdleTimeout) 97 } 98 99 func (m *Manager) resetIdleTimer(d time.Duration) { 100 m.idleMu.Lock() 101 defer m.idleMu.Unlock() 102 m.resetIdleTimerLocked(d) 103 } 104 105 // handleIdleTimeout is the timer callback that is invoked upon expiry of the 106 // configured idle timeout. The channel is considered inactive if there are no 107 // ongoing calls and no RPC activity since the last time the timer fired. 108 func (m *Manager) handleIdleTimeout() { 109 if m.isClosed() { 110 return 111 } 112 113 if atomic.LoadInt32(&m.activeCallsCount) > 0 { 114 m.resetIdleTimer(m.timeout) 115 return 116 } 117 118 // There has been activity on the channel since we last got here. Reset the 119 // timer and return. 120 if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 { 121 // Set the timer to fire after a duration of idle timeout, calculated 122 // from the time the most recent RPC completed. 123 atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0) 124 m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime)-time.Now().UnixNano()) + m.timeout) 125 return 126 } 127 128 // Now that we've checked that there has been no activity, attempt to enter 129 // idle mode, which is very likely to succeed. 130 if m.tryEnterIdleMode() { 131 // Successfully entered idle mode. No timer needed until we exit idle. 132 return 133 } 134 135 // Failed to enter idle mode due to a concurrent RPC that kept the channel 136 // active, or because of an error from the channel. Undo the attempt to 137 // enter idle, and reset the timer to try again later. 138 m.resetIdleTimer(m.timeout) 139 } 140 141 // tryEnterIdleMode instructs the channel to enter idle mode. But before 142 // that, it performs a last minute check to ensure that no new RPC has come in, 143 // making the channel active. 144 // 145 // Return value indicates whether or not the channel moved to idle mode. 146 // 147 // Holds idleMu which ensures mutual exclusion with exitIdleMode. 148 func (m *Manager) tryEnterIdleMode() bool { 149 // Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() 150 // that the channel is either in idle mode or is trying to get there. 151 if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { 152 // This CAS operation can fail if an RPC started after we checked for 153 // activity in the timer handler, or one was ongoing from before the 154 // last time the timer fired, or if a test is attempting to enter idle 155 // mode without checking. In all cases, abort going into idle mode. 156 return false 157 } 158 // N.B. if we fail to enter idle mode after this, we must re-add 159 // math.MaxInt32 to m.activeCallsCount. 160 161 m.idleMu.Lock() 162 defer m.idleMu.Unlock() 163 164 if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 { 165 // We raced and lost to a new RPC. Very rare, but stop entering idle. 166 atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) 167 return false 168 } 169 if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 { 170 // A very short RPC could have come in (and also finished) after we 171 // checked for calls count and activity in handleIdleTimeout(), but 172 // before the CAS operation. So, we need to check for activity again. 173 atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) 174 return false 175 } 176 177 // No new RPCs have come in since we set the active calls count value to 178 // -math.MaxInt32. And since we have the lock, it is safe to enter idle mode 179 // unconditionally now. 180 m.enforcer.EnterIdleMode() 181 m.actuallyIdle = true 182 return true 183 } 184 185 func (m *Manager) EnterIdleModeForTesting() { 186 m.tryEnterIdleMode() 187 } 188 189 // OnCallBegin is invoked at the start of every RPC. 190 func (m *Manager) OnCallBegin() error { 191 if m.isClosed() { 192 return nil 193 } 194 195 if atomic.AddInt32(&m.activeCallsCount, 1) > 0 { 196 // Channel is not idle now. Set the activity bit and allow the call. 197 atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1) 198 return nil 199 } 200 201 // Channel is either in idle mode or is in the process of moving to idle 202 // mode. Attempt to exit idle mode to allow this RPC. 203 if err := m.ExitIdleMode(); err != nil { 204 // Undo the increment to calls count, and return an error causing the 205 // RPC to fail. 206 atomic.AddInt32(&m.activeCallsCount, -1) 207 return err 208 } 209 210 atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1) 211 return nil 212 } 213 214 // ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's 215 // internal state. 216 func (m *Manager) ExitIdleMode() error { 217 // Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. 218 m.idleMu.Lock() 219 defer m.idleMu.Unlock() 220 221 if m.isClosed() || !m.actuallyIdle { 222 // This can happen in three scenarios: 223 // - handleIdleTimeout() set the calls count to -math.MaxInt32 and called 224 // tryEnterIdleMode(). But before the latter could grab the lock, an RPC 225 // came in and OnCallBegin() noticed that the calls count is negative. 226 // - Channel is in idle mode, and multiple new RPCs come in at the same 227 // time, all of them notice a negative calls count in OnCallBegin and get 228 // here. The first one to get the lock would got the channel to exit idle. 229 // - Channel is not in idle mode, and the user calls Connect which calls 230 // m.ExitIdleMode. 231 // 232 // In any case, there is nothing to do here. 233 return nil 234 } 235 236 if err := m.enforcer.ExitIdleMode(); err != nil { 237 return fmt.Errorf("failed to exit idle mode: %w", err) 238 } 239 240 // Undo the idle entry process. This also respects any new RPC attempts. 241 atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) 242 m.actuallyIdle = false 243 244 // Start a new timer to fire after the configured idle timeout. 245 m.resetIdleTimerLocked(m.timeout) 246 return nil 247 } 248 249 // OnCallEnd is invoked at the end of every RPC. 250 func (m *Manager) OnCallEnd() { 251 if m.isClosed() { 252 return 253 } 254 255 // Record the time at which the most recent call finished. 256 atomic.StoreInt64(&m.lastCallEndTime, time.Now().UnixNano()) 257 258 // Decrement the active calls count. This count can temporarily go negative 259 // when the timer callback is in the process of moving the channel to idle 260 // mode, but one or more RPCs come in and complete before the timer callback 261 // can get done with the process of moving to idle mode. 262 atomic.AddInt32(&m.activeCallsCount, -1) 263 } 264 265 func (m *Manager) isClosed() bool { 266 return atomic.LoadInt32(&m.closed) == 1 267 } 268 269 func (m *Manager) Close() { 270 atomic.StoreInt32(&m.closed, 1) 271 272 m.idleMu.Lock() 273 if m.timer != nil { 274 m.timer.Stop() 275 m.timer = nil 276 } 277 m.idleMu.Unlock() 278 } 279