1
16
17 package flowcontrol
18
19 import (
20 "context"
21 "fmt"
22 "io"
23 "math"
24 "strings"
25 "sync"
26 "testing"
27 "time"
28
29 "github.com/prometheus/common/expfmt"
30 "github.com/prometheus/common/model"
31
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apiserver/pkg/authorization/authorizer"
34 clientset "k8s.io/client-go/kubernetes"
35 "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
36 "k8s.io/kubernetes/pkg/controlplane"
37 "k8s.io/kubernetes/test/integration/framework"
38 "k8s.io/kubernetes/test/utils/ktesting"
39 )
40
41 const (
42 nominalConcurrencyLimitMetricsName = "apiserver_flowcontrol_nominal_limit_seats"
43 requestExecutionSecondsSumName = "apiserver_flowcontrol_request_execution_seconds_sum"
44 requestExecutionSecondsCountName = "apiserver_flowcontrol_request_execution_seconds_count"
45 priorityLevelSeatUtilSumName = "apiserver_flowcontrol_priority_level_seat_utilization_sum"
46 priorityLevelSeatUtilCountName = "apiserver_flowcontrol_priority_level_seat_utilization_count"
47 fakeworkDuration = 200 * time.Millisecond
48 testWarmUpTime = 2 * time.Second
49 testTime = 10 * time.Second
50 )
51
52 type SumAndCount struct {
53 Sum float64
54 Count int
55 }
56
57 type plMetrics struct {
58 execSeconds SumAndCount
59 seatUtil SumAndCount
60 availableSeats int
61 }
62
63
64
65 type metricSnapshot map[string]plMetrics
66
67
68 type clientLatencyMeasurement struct {
69 SumAndCount
70 SumSq float64
71 Mu sync.Mutex
72 }
73
74 func (clm *clientLatencyMeasurement) reset() {
75 clm.Mu.Lock()
76 defer clm.Mu.Unlock()
77 clm.Sum = 0
78 clm.Count = 0
79 clm.SumSq = 0
80 }
81
82 func (clm *clientLatencyMeasurement) update(duration float64) {
83 clm.Mu.Lock()
84 defer clm.Mu.Unlock()
85 clm.Count += 1
86 clm.Sum += duration
87 clm.SumSq += duration * duration
88 }
89
90 func (clm *clientLatencyMeasurement) getStats() clientLatencyStats {
91 clm.Mu.Lock()
92 defer clm.Mu.Unlock()
93 mean := clm.Sum / float64(clm.Count)
94 ss := clm.SumSq - mean*clm.Sum
95
96 if ss < 0 {
97 ss = 0
98 }
99 stdDev := math.Sqrt(ss / float64(clm.Count))
100 cv := stdDev / mean
101 return clientLatencyStats{mean: mean, stdDev: stdDev, cv: cv}
102 }
103
104 type clientLatencyStats struct {
105 mean float64
106 stdDev float64
107 cv float64
108 }
109
110 type plMetricAvg struct {
111 reqExecution float64
112 seatUtil float64
113 }
114
115 func intervalMetricAvg(snapshot0, snapshot1 metricSnapshot, plLabel string) plMetricAvg {
116 plmT0 := snapshot0[plLabel]
117 plmT1 := snapshot1[plLabel]
118 return plMetricAvg{
119 reqExecution: (plmT1.execSeconds.Sum - plmT0.execSeconds.Sum) / float64(plmT1.execSeconds.Count-plmT0.execSeconds.Count),
120 seatUtil: (plmT1.seatUtil.Sum - plmT0.seatUtil.Sum) / float64(plmT1.seatUtil.Count-plmT0.seatUtil.Count),
121 }
122 }
123
124 type noxuDelayingAuthorizer struct {
125 Authorizer authorizer.Authorizer
126 }
127
128 func (d *noxuDelayingAuthorizer) Authorize(ctx context.Context, a authorizer.Attributes) (authorizer.Decision, string, error) {
129 if a.GetUser().GetName() == "noxu1" || a.GetUser().GetName() == "noxu2" {
130 time.Sleep(fakeworkDuration)
131 }
132 return d.Authorizer.Authorize(ctx, a)
133 }
134
135
136
137
138
139
140
141
142
143
144
145
146
147 func TestConcurrencyIsolation(t *testing.T) {
148 tCtx := ktesting.Init(t)
149 _, kubeConfig, closeFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
150 ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
151
152 opts.Authorization.Modes = []string{"AlwaysAllow"}
153 opts.GenericServerRunOptions.MaxRequestsInFlight = 10
154 opts.GenericServerRunOptions.MaxMutatingRequestsInFlight = 10
155 },
156 ModifyServerConfig: func(config *controlplane.Config) {
157
158 config.GenericConfig.Authorization.Authorizer = &noxuDelayingAuthorizer{config.GenericConfig.Authorization.Authorizer}
159 },
160 })
161 defer closeFn()
162
163 loopbackClient := clientset.NewForConfigOrDie(kubeConfig)
164 noxu1Client := getClientFor(kubeConfig, "noxu1")
165 noxu2Client := getClientFor(kubeConfig, "noxu2")
166
167 queueLength := 50
168 concurrencyShares := 100
169
170 plNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
171 loopbackClient, "noxu1", concurrencyShares, queueLength)
172 if err != nil {
173 t.Error(err)
174 }
175 plNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
176 loopbackClient, "noxu2", concurrencyShares, queueLength)
177 if err != nil {
178 t.Error(err)
179 }
180
181 stopCh := make(chan struct{})
182 wg := sync.WaitGroup{}
183
184
185 noxu1NumGoroutines := 5 + queueLength
186 var noxu1LatMeasure clientLatencyMeasurement
187 wg.Add(noxu1NumGoroutines)
188 streamRequests(noxu1NumGoroutines, func() {
189 start := time.Now()
190 _, err := noxu1Client.CoreV1().Namespaces().Get(tCtx, "default", metav1.GetOptions{})
191 duration := time.Since(start).Seconds()
192 noxu1LatMeasure.update(duration)
193 if err != nil {
194 t.Error(err)
195 }
196 }, &wg, stopCh)
197
198 noxu2NumGoroutines := 3
199 var noxu2LatMeasure clientLatencyMeasurement
200 wg.Add(noxu2NumGoroutines)
201 streamRequests(noxu2NumGoroutines, func() {
202 start := time.Now()
203 _, err := noxu2Client.CoreV1().Namespaces().Get(tCtx, "default", metav1.GetOptions{})
204 duration := time.Since(start).Seconds()
205 noxu2LatMeasure.update(duration)
206 if err != nil {
207 t.Error(err)
208 }
209 }, &wg, stopCh)
210
211
212 time.Sleep(testWarmUpTime)
213
214 noxu1LatMeasure.reset()
215 noxu2LatMeasure.reset()
216 snapshot0, err := getRequestMetricsSnapshot(loopbackClient)
217 if err != nil {
218 t.Error(err)
219 }
220 time.Sleep(testTime)
221 snapshot1, err := getRequestMetricsSnapshot(loopbackClient)
222 if err != nil {
223 t.Error(err)
224 }
225 close(stopCh)
226
227
228 noxu1T0 := snapshot0[plNoxu1.Name]
229 noxu1T1 := snapshot1[plNoxu1.Name]
230 noxu2T0 := snapshot0[plNoxu2.Name]
231 noxu2T1 := snapshot1[plNoxu2.Name]
232 if noxu1T0.seatUtil.Count >= noxu1T1.seatUtil.Count || noxu2T0.seatUtil.Count >= noxu2T1.seatUtil.Count {
233 t.Errorf("SeatUtilCount check failed: noxu1 t0 count %d, t1 count %d; noxu2 t0 count %d, t1 count %d",
234 noxu1T0.seatUtil.Count, noxu1T1.seatUtil.Count, noxu2T0.seatUtil.Count, noxu2T1.seatUtil.Count)
235 }
236 t.Logf("noxu1 priority level concurrency limit: %d", noxu1T0.availableSeats)
237 t.Logf("noxu2 priority level concurrency limit: %d", noxu2T0.availableSeats)
238 if (noxu1T0.availableSeats != noxu1T1.availableSeats) || (noxu2T0.availableSeats != noxu2T1.availableSeats) {
239 t.Errorf("The number of available seats changed: noxu1 (%d, %d) noxu2 (%d, %d)",
240 noxu1T0.availableSeats, noxu1T1.availableSeats, noxu2T0.availableSeats, noxu2T1.availableSeats)
241 }
242 if (noxu1T0.availableSeats <= 4) || (noxu2T0.availableSeats <= 4) {
243 t.Errorf("The number of available seats for test client priority levels are too small: (%d, %d). Expecting a number > 4",
244 noxu1T0.availableSeats, noxu2T0.availableSeats)
245 }
246
247 _, rejectedReqCounts, err := getRequestCountOfPriorityLevel(loopbackClient)
248 if err != nil {
249 t.Error(err)
250 }
251 if rejectedReqCounts[plNoxu1.Name] > 0 {
252 t.Errorf(`%d requests from the "elephant" stream were rejected unexpectedly`, rejectedReqCounts[plNoxu1.Name])
253 }
254 if rejectedReqCounts[plNoxu2.Name] > 0 {
255 t.Errorf(`%d requests from the "mouse" stream were rejected unexpectedly`, rejectedReqCounts[plNoxu2.Name])
256 }
257
258
259 noxu1Avg := intervalMetricAvg(snapshot0, snapshot1, plNoxu1.Name)
260 noxu2Avg := intervalMetricAvg(snapshot0, snapshot1, plNoxu2.Name)
261 t.Logf("\nnoxu1 avg request execution time %v\nnoxu2 avg request execution time %v", noxu1Avg.reqExecution, noxu2Avg.reqExecution)
262 t.Logf("\nnoxu1 avg seat utilization %v\nnoxu2 avg seat utilization %v", noxu1Avg.seatUtil, noxu2Avg.seatUtil)
263
264
265 wg.Wait()
266 noxu1LatStats := noxu1LatMeasure.getStats()
267 noxu2LatStats := noxu2LatMeasure.getStats()
268 t.Logf("noxu1 client request count %d duration mean %v stddev %v cv %v", noxu1LatMeasure.Count, noxu1LatStats.mean, noxu1LatStats.stdDev, noxu1LatStats.cv)
269 t.Logf("noxu2 client request count %d duration mean %v stddev %v cv %v", noxu2LatMeasure.Count, noxu2LatStats.mean, noxu2LatStats.stdDev, noxu2LatStats.cv)
270
271
272 noxu1ObservedConcurrency := noxu1Avg.seatUtil * float64(noxu1T0.availableSeats)
273 noxu2ObservedConcurrency := noxu2Avg.seatUtil * float64(noxu2T0.availableSeats)
274
275 noxu1ExpectedConcurrency := float64(noxu1NumGoroutines) * noxu1Avg.reqExecution / noxu1LatStats.mean
276 noxu2ExpectedConcurrency := float64(noxu2NumGoroutines) * noxu2Avg.reqExecution / noxu2LatStats.mean
277 t.Logf("Concurrency of noxu1:noxu2 - expected (%v:%v), observed (%v:%v)", noxu1ExpectedConcurrency, noxu2ExpectedConcurrency, noxu1ObservedConcurrency, noxu2ObservedConcurrency)
278
279
280
281
282
283
284
285 margin := 2 * math.Min(noxu1LatStats.cv, noxu2LatStats.cv)
286 t.Logf("Error margin is %v", margin)
287
288 isConcurrencyExpected := func(name string, observed float64, expected float64) bool {
289 relativeErr := math.Abs(expected-observed) / expected
290 t.Logf("%v relative error is %v", name, relativeErr)
291 return relativeErr <= margin
292 }
293 if !isConcurrencyExpected(plNoxu1.Name, noxu1ObservedConcurrency, noxu1ExpectedConcurrency) {
294 t.Errorf("Concurrency observed by noxu1 is off. Expected: %v, observed: %v", noxu1ExpectedConcurrency, noxu1ObservedConcurrency)
295 }
296 if !isConcurrencyExpected(plNoxu2.Name, noxu2ObservedConcurrency, noxu2ExpectedConcurrency) {
297 t.Errorf("Concurrency observed by noxu2 is off. Expected: %v, observed: %v", noxu2ExpectedConcurrency, noxu2ObservedConcurrency)
298 }
299
300
301 if math.Abs(1-noxu1Avg.seatUtil) > 0.05 {
302 t.Errorf("noxu1Avg.seatUtil=%v is too far from expected=1.0", noxu1Avg.seatUtil)
303 }
304 noxu2ExpectedSeatUtil := float64(noxu2NumGoroutines) / float64(noxu2T0.availableSeats)
305 if math.Abs(noxu2ExpectedSeatUtil-noxu2Avg.seatUtil) > 0.05 {
306 t.Errorf("noxu2Avg.seatUtil=%v is too far from expected=%v", noxu2Avg.seatUtil, noxu2ExpectedSeatUtil)
307 }
308 }
309
310 func getRequestMetricsSnapshot(c clientset.Interface) (metricSnapshot, error) {
311
312 resp, err := getMetrics(c)
313 if err != nil {
314 return nil, err
315 }
316
317 dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
318 decoder := expfmt.SampleDecoder{
319 Dec: dec,
320 Opts: &expfmt.DecodeOptions{},
321 }
322
323 snapshot := metricSnapshot{}
324
325 for {
326 var v model.Vector
327 if err := decoder.Decode(&v); err != nil {
328 if err == io.EOF {
329
330 return snapshot, nil
331 }
332 return nil, fmt.Errorf("failed decoding metrics: %v", err)
333 }
334 for _, metric := range v {
335 plLabel := string(metric.Metric[labelPriorityLevel])
336 entry := plMetrics{}
337 if v, ok := snapshot[plLabel]; ok {
338 entry = v
339 }
340 switch name := string(metric.Metric[model.MetricNameLabel]); name {
341 case requestExecutionSecondsSumName:
342 entry.execSeconds.Sum = float64(metric.Value)
343 case requestExecutionSecondsCountName:
344 entry.execSeconds.Count = int(metric.Value)
345 case priorityLevelSeatUtilSumName:
346 entry.seatUtil.Sum = float64(metric.Value)
347 case priorityLevelSeatUtilCountName:
348 entry.seatUtil.Count = int(metric.Value)
349 case nominalConcurrencyLimitMetricsName:
350 entry.availableSeats = int(metric.Value)
351 }
352 snapshot[plLabel] = entry
353 }
354 }
355 }
356
View as plain text