1 /* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 // Package adaptive provides functionality for adaptive client-side throttling. 20 package adaptive 21 22 import ( 23 "sync" 24 "time" 25 26 "google.golang.org/grpc/internal/grpcrand" 27 ) 28 29 // For overriding in unittests. 30 var ( 31 timeNowFunc = func() time.Time { return time.Now() } 32 randFunc = func() float64 { return grpcrand.Float64() } 33 ) 34 35 const ( 36 defaultDuration = 30 * time.Second 37 defaultBins = 100 38 defaultRatioForAccepts = 2.0 39 defaultRequestsPadding = 8.0 40 ) 41 42 // Throttler implements a client-side throttling recommendation system. All 43 // methods are safe for concurrent use by multiple goroutines. 44 // 45 // The throttler has the following knobs for which we will use defaults for 46 // now. If there is a need to make them configurable at a later point in time, 47 // support for the same will be added. 48 // - Duration: amount of recent history that will be taken into account for 49 // making client-side throttling decisions. A default of 30 seconds is used. 50 // - Bins: number of bins to be used for bucketing historical data. A default 51 // of 100 is used. 52 // - RatioForAccepts: ratio by which accepts are multiplied, typically a value 53 // slightly larger than 1.0. This is used to make the throttler behave as if 54 // the backend had accepted more requests than it actually has, which lets us 55 // err on the side of sending to the backend more requests than we think it 56 // will accept for the sake of speeding up the propagation of state. A 57 // default of 2.0 is used. 58 // - RequestsPadding: is used to decrease the (client-side) throttling 59 // probability in the low QPS regime (to speed up propagation of state), as 60 // well as to safeguard against hitting a client-side throttling probability 61 // of 100%. The weight of this value decreases as the number of requests in 62 // recent history grows. A default of 8 is used. 63 // 64 // The adaptive throttler attempts to estimate the probability that a request 65 // will be throttled using recent history. Server requests (both throttled and 66 // accepted) are registered with the throttler (via the RegisterBackendResponse 67 // method), which then recommends client-side throttling (via the 68 // ShouldThrottle method) with probability given by: 69 // (requests - RatioForAccepts * accepts) / (requests + RequestsPadding) 70 type Throttler struct { 71 ratioForAccepts float64 72 requestsPadding float64 73 74 // Number of total accepts and throttles in the lookback period. 75 mu sync.Mutex 76 accepts *lookback 77 throttles *lookback 78 } 79 80 // New initializes a new adaptive throttler with the default values. 81 func New() *Throttler { 82 return newWithArgs(defaultDuration, defaultBins, defaultRatioForAccepts, defaultRequestsPadding) 83 } 84 85 // newWithArgs initializes a new adaptive throttler with the provided values. 86 // Used only in unittests. 87 func newWithArgs(duration time.Duration, bins int64, ratioForAccepts, requestsPadding float64) *Throttler { 88 return &Throttler{ 89 ratioForAccepts: ratioForAccepts, 90 requestsPadding: requestsPadding, 91 accepts: newLookback(bins, duration), 92 throttles: newLookback(bins, duration), 93 } 94 } 95 96 // ShouldThrottle returns a probabilistic estimate of whether the server would 97 // throttle the next request. This should be called for every request before 98 // allowing it to hit the network. If the returned value is true, the request 99 // should be aborted immediately (as if it had been throttled by the server). 100 func (t *Throttler) ShouldThrottle() bool { 101 randomProbability := randFunc() 102 now := timeNowFunc() 103 104 t.mu.Lock() 105 defer t.mu.Unlock() 106 107 accepts, throttles := float64(t.accepts.sum(now)), float64(t.throttles.sum(now)) 108 requests := accepts + throttles 109 throttleProbability := (requests - t.ratioForAccepts*accepts) / (requests + t.requestsPadding) 110 if throttleProbability <= randomProbability { 111 return false 112 } 113 114 t.throttles.add(now, 1) 115 return true 116 } 117 118 // RegisterBackendResponse registers a response received from the backend for a 119 // request allowed by ShouldThrottle. This should be called for every response 120 // received from the backend (i.e., once for each request for which 121 // ShouldThrottle returned false). 122 func (t *Throttler) RegisterBackendResponse(throttled bool) { 123 now := timeNowFunc() 124 125 t.mu.Lock() 126 if throttled { 127 t.throttles.add(now, 1) 128 } else { 129 t.accepts.add(now, 1) 130 } 131 t.mu.Unlock() 132 } 133