1
16
17 package flowcontrol
18
19 import (
20 "context"
21 "fmt"
22 "io"
23 "strings"
24 "sync"
25 "testing"
26 "time"
27
28 "github.com/prometheus/common/expfmt"
29 "github.com/prometheus/common/model"
30
31 flowcontrol "k8s.io/api/flowcontrol/v1"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/util/wait"
34 clientset "k8s.io/client-go/kubernetes"
35 "k8s.io/client-go/rest"
36 "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
37 "k8s.io/kubernetes/test/integration/framework"
38 "k8s.io/kubernetes/test/utils/ktesting"
39 "k8s.io/utils/ptr"
40 )
41
42 const (
43 nominalConcurrencyMetricsName = "apiserver_flowcontrol_nominal_limit_seats"
44 dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total"
45 rejectedRequestCountMetricsName = "apiserver_flowcontrol_rejected_requests_total"
46 labelPriorityLevel = "priority_level"
47 timeout = time.Second * 10
48 )
49
50 func setup(t testing.TB, maxReadonlyRequestsInFlight, maxMutatingRequestsInFlight int) (context.Context, *rest.Config, framework.TearDownFunc) {
51 tCtx := ktesting.Init(t)
52
53 _, kubeConfig, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
54 ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
55
56 opts.Authorization.Modes = []string{"AlwaysAllow"}
57 opts.GenericServerRunOptions.MaxRequestsInFlight = maxReadonlyRequestsInFlight
58 opts.GenericServerRunOptions.MaxMutatingRequestsInFlight = maxMutatingRequestsInFlight
59 },
60 })
61
62 newTeardown := func() {
63 tCtx.Cancel("tearing down apiserver")
64 tearDownFn()
65 }
66 return tCtx, kubeConfig, newTeardown
67 }
68
69 func TestPriorityLevelIsolation(t *testing.T) {
70 ctx, kubeConfig, closeFn := setup(t, 1, 1)
71 defer closeFn()
72
73 loopbackClient := clientset.NewForConfigOrDie(kubeConfig)
74 noxu1Client := getClientFor(kubeConfig, "noxu1")
75 noxu2Client := getClientFor(kubeConfig, "noxu2")
76
77 queueLength := 50
78 concurrencyShares := 1
79
80 priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
81 loopbackClient, "noxu1", concurrencyShares, queueLength)
82 if err != nil {
83 t.Error(err)
84 }
85 priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
86 loopbackClient, "noxu2", concurrencyShares, queueLength)
87 if err != nil {
88 t.Error(err)
89 }
90
91 nominalConcurrency, err := getNominalConcurrencyOfPriorityLevel(loopbackClient)
92 if err != nil {
93 t.Error(err)
94 }
95
96 if 1 != nominalConcurrency[priorityLevelNoxu1.Name] {
97 t.Errorf("unexpected shared concurrency %v instead of %v", nominalConcurrency[priorityLevelNoxu1.Name], 1)
98 }
99 if 1 != nominalConcurrency[priorityLevelNoxu2.Name] {
100 t.Errorf("unexpected shared concurrency %v instead of %v", nominalConcurrency[priorityLevelNoxu2.Name], 1)
101 }
102
103 stopCh := make(chan struct{})
104 wg := sync.WaitGroup{}
105 defer func() {
106 close(stopCh)
107 wg.Wait()
108 }()
109
110
111 wg.Add(concurrencyShares + queueLength)
112 streamRequests(concurrencyShares+queueLength, func() {
113 _, err := noxu1Client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
114 if err != nil {
115 t.Error(err)
116 }
117 }, &wg, stopCh)
118
119 wg.Add(3)
120 streamRequests(3, func() {
121 _, err := noxu2Client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
122 if err != nil {
123 t.Error(err)
124 }
125 }, &wg, stopCh)
126
127 time.Sleep(time.Second * 10)
128
129 allDispatchedReqCounts, rejectedReqCounts, err := getRequestCountOfPriorityLevel(loopbackClient)
130 if err != nil {
131 t.Error(err)
132 }
133
134 noxu1RequestCount := allDispatchedReqCounts[priorityLevelNoxu1.Name]
135 noxu2RequestCount := allDispatchedReqCounts[priorityLevelNoxu2.Name]
136
137 if rejectedReqCounts[priorityLevelNoxu1.Name] > 0 {
138 t.Errorf(`%v requests from the "elephant" stream were rejected unexpectedly`, rejectedReqCounts[priorityLevelNoxu2.Name])
139 }
140 if rejectedReqCounts[priorityLevelNoxu2.Name] > 0 {
141 t.Errorf(`%v requests from the "mouse" stream were rejected unexpectedly`, rejectedReqCounts[priorityLevelNoxu2.Name])
142 }
143
144
145
146 if (noxu1RequestCount/2) > noxu2RequestCount || (noxu2RequestCount/2) > noxu1RequestCount {
147 t.Errorf("imbalanced requests made by noxu1/2: (%d:%d)", noxu1RequestCount, noxu2RequestCount)
148 }
149 }
150
151 func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interface {
152 config := rest.CopyConfig(loopbackConfig)
153 config.Impersonate = rest.ImpersonationConfig{
154 UserName: username,
155 }
156 return clientset.NewForConfigOrDie(config)
157 }
158
159 func getMetrics(c clientset.Interface) (string, error) {
160 resp, err := c.CoreV1().
161 RESTClient().
162 Get().
163 RequestURI("/metrics").
164 DoRaw(context.Background())
165 if err != nil {
166 return "", err
167 }
168 return string(resp), err
169 }
170
171 func getNominalConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
172 resp, err := getMetrics(c)
173 if err != nil {
174 return nil, err
175 }
176
177 dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
178 decoder := expfmt.SampleDecoder{
179 Dec: dec,
180 Opts: &expfmt.DecodeOptions{},
181 }
182
183 concurrency := make(map[string]int)
184 for {
185 var v model.Vector
186 if err := decoder.Decode(&v); err != nil {
187 if err == io.EOF {
188
189 return concurrency, nil
190 }
191 return nil, fmt.Errorf("failed decoding metrics: %v", err)
192 }
193 for _, metric := range v {
194 switch name := string(metric.Metric[model.MetricNameLabel]); name {
195 case nominalConcurrencyMetricsName:
196 concurrency[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
197 }
198 }
199 }
200 }
201
202 func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, map[string]int, error) {
203 resp, err := getMetrics(c)
204 if err != nil {
205 return nil, nil, err
206 }
207
208 dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
209 decoder := expfmt.SampleDecoder{
210 Dec: dec,
211 Opts: &expfmt.DecodeOptions{},
212 }
213
214 allReqCounts := make(map[string]int)
215 rejectReqCounts := make(map[string]int)
216 for {
217 var v model.Vector
218 if err := decoder.Decode(&v); err != nil {
219 if err == io.EOF {
220
221 return allReqCounts, rejectReqCounts, nil
222 }
223 return nil, nil, fmt.Errorf("failed decoding metrics: %v", err)
224 }
225 for _, metric := range v {
226 switch name := string(metric.Metric[model.MetricNameLabel]); name {
227 case dispatchedRequestCountMetricsName:
228 allReqCounts[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
229 case rejectedRequestCountMetricsName:
230 rejectReqCounts[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
231 }
232 }
233 }
234 }
235
236 func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string, concurrencyShares, queuelength int) (*flowcontrol.PriorityLevelConfiguration, *flowcontrol.FlowSchema, error) {
237 i0 := int32(0)
238 pl, err := c.FlowcontrolV1().PriorityLevelConfigurations().Create(context.Background(), &flowcontrol.PriorityLevelConfiguration{
239 ObjectMeta: metav1.ObjectMeta{
240 Name: username,
241 },
242 Spec: flowcontrol.PriorityLevelConfigurationSpec{
243 Type: flowcontrol.PriorityLevelEnablementLimited,
244 Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
245 NominalConcurrencyShares: ptr.To(int32(concurrencyShares)),
246 BorrowingLimitPercent: &i0,
247 LimitResponse: flowcontrol.LimitResponse{
248 Type: flowcontrol.LimitResponseTypeQueue,
249 Queuing: &flowcontrol.QueuingConfiguration{
250 Queues: 100,
251 HandSize: 1,
252 QueueLengthLimit: int32(queuelength),
253 },
254 },
255 },
256 },
257 }, metav1.CreateOptions{})
258 if err != nil {
259 return nil, nil, err
260 }
261 fs, err := c.FlowcontrolV1().FlowSchemas().Create(context.TODO(), &flowcontrol.FlowSchema{
262 ObjectMeta: metav1.ObjectMeta{
263 Name: username,
264 },
265 Spec: flowcontrol.FlowSchemaSpec{
266 DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
267 Type: flowcontrol.FlowDistinguisherMethodByUserType,
268 },
269 MatchingPrecedence: 1000,
270 PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
271 Name: username,
272 },
273 Rules: []flowcontrol.PolicyRulesWithSubjects{
274 {
275 ResourceRules: []flowcontrol.ResourcePolicyRule{
276 {
277 Verbs: []string{flowcontrol.VerbAll},
278 APIGroups: []string{flowcontrol.APIGroupAll},
279 Resources: []string{flowcontrol.ResourceAll},
280 Namespaces: []string{flowcontrol.NamespaceEvery},
281 ClusterScope: true,
282 },
283 },
284 Subjects: []flowcontrol.Subject{
285 {
286 Kind: flowcontrol.SubjectKindUser,
287 User: &flowcontrol.UserSubject{
288 Name: username,
289 },
290 },
291 },
292 },
293 },
294 },
295 }, metav1.CreateOptions{})
296 if err != nil {
297 return nil, nil, err
298 }
299
300 return pl, fs, wait.Poll(time.Second, timeout, func() (bool, error) {
301 fs, err := c.FlowcontrolV1().FlowSchemas().Get(context.TODO(), username, metav1.GetOptions{})
302 if err != nil {
303 return false, err
304 }
305 for _, condition := range fs.Status.Conditions {
306 if condition.Type == flowcontrol.FlowSchemaConditionDangling {
307 if condition.Status == flowcontrol.ConditionFalse {
308 return true, nil
309 }
310 }
311 }
312 return false, nil
313 })
314 }
315
316 func streamRequests(parallel int, request func(), wg *sync.WaitGroup, stopCh <-chan struct{}) {
317 for i := 0; i < parallel; i++ {
318 go func() {
319 defer wg.Done()
320 for {
321 select {
322 case <-stopCh:
323 return
324 default:
325 request()
326 }
327 }
328 }()
329 }
330 }
331
View as plain text