1
16
17 package apimachinery
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "io"
25 "net/http"
26 "sync"
27 "sync/atomic"
28 "time"
29
30 "github.com/onsi/ginkgo/v2"
31 "github.com/onsi/gomega"
32 "github.com/prometheus/common/expfmt"
33 "github.com/prometheus/common/model"
34
35 flowcontrol "k8s.io/api/flowcontrol/v1"
36 apierrors "k8s.io/apimachinery/pkg/api/errors"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 "k8s.io/apimachinery/pkg/types"
39 utilrand "k8s.io/apimachinery/pkg/util/rand"
40 "k8s.io/apimachinery/pkg/util/wait"
41 "k8s.io/apimachinery/pkg/watch"
42 "k8s.io/apiserver/pkg/util/apihelpers"
43 clientset "k8s.io/client-go/kubernetes"
44 "k8s.io/client-go/rest"
45 clientsideflowcontrol "k8s.io/client-go/util/flowcontrol"
46 "k8s.io/client-go/util/retry"
47 "k8s.io/kubernetes/test/e2e/framework"
48 admissionapi "k8s.io/pod-security-admission/api"
49 "k8s.io/utils/ptr"
50 )
51
52 const (
53 nominalConcurrencyLimitMetricName = "apiserver_flowcontrol_nominal_limit_seats"
54 priorityLevelLabelName = "priority_level"
55 )
56
57 var (
58 errPriorityLevelNotFound = errors.New("cannot find a metric sample with a matching priority level name label")
59 )
60
61 var _ = SIGDescribe("API priority and fairness", func() {
62 f := framework.NewDefaultFramework("apf")
63 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
64
65 ginkgo.It("should ensure that requests can be classified by adding FlowSchema and PriorityLevelConfiguration", func(ctx context.Context) {
66 testingFlowSchemaName := "e2e-testing-flowschema"
67 testingPriorityLevelName := "e2e-testing-prioritylevel"
68 matchingUsername := "noxu"
69 nonMatchingUsername := "foo"
70
71 ginkgo.By("creating a testing PriorityLevelConfiguration object")
72 createdPriorityLevel := createPriorityLevel(ctx, f, testingPriorityLevelName, 1)
73
74 ginkgo.By("creating a testing FlowSchema object")
75 createdFlowSchema := createFlowSchema(ctx, f, testingFlowSchemaName, 1000, testingPriorityLevelName, []string{matchingUsername})
76
77 ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state")
78 waitForSteadyState(ctx, f, testingFlowSchemaName, testingPriorityLevelName)
79
80 var response *http.Response
81 ginkgo.By("response headers should contain the UID of the appropriate FlowSchema and PriorityLevelConfiguration for a matching user")
82 response = makeRequest(f, matchingUsername)
83 if plUIDWant, plUIDGot := string(createdPriorityLevel.UID), getPriorityLevelUID(response); plUIDWant != plUIDGot {
84 framework.Failf("expected PriorityLevelConfiguration UID in the response header: %s, but got: %s, response header: %#v", plUIDWant, plUIDGot, response.Header)
85 }
86 if fsUIDWant, fsUIDGot := string(createdFlowSchema.UID), getFlowSchemaUID(response); fsUIDWant != fsUIDGot {
87 framework.Failf("expected FlowSchema UID in the response header: %s, but got: %s, response header: %#v", fsUIDWant, fsUIDGot, response.Header)
88 }
89
90 ginkgo.By("response headers should contain non-empty UID of FlowSchema and PriorityLevelConfiguration for a non-matching user")
91 response = makeRequest(f, nonMatchingUsername)
92 if plUIDGot := getPriorityLevelUID(response); plUIDGot == "" {
93 framework.Failf("expected a non-empty PriorityLevelConfiguration UID in the response header, but got: %s, response header: %#v", plUIDGot, response.Header)
94 }
95 if fsUIDGot := getFlowSchemaUID(response); fsUIDGot == "" {
96 framework.Failf("expected a non-empty FlowSchema UID in the response header but got: %s, response header: %#v", fsUIDGot, response.Header)
97 }
98 })
99
100
101
102
103
104
105 ginkgo.It("should ensure that requests can't be drowned out (priority)", func(ctx context.Context) {
106
107 ginkgo.Skip("skipping test until flakiness is resolved")
108
109 flowSchemaNamePrefix := "e2e-testing-flowschema-" + f.UniqueName
110 priorityLevelNamePrefix := "e2e-testing-prioritylevel-" + f.UniqueName
111 loadDuration := 10 * time.Second
112 highQPSClientName := "highqps-" + f.UniqueName
113 lowQPSClientName := "lowqps-" + f.UniqueName
114
115 type client struct {
116 username string
117 qps float64
118 priorityLevelName string
119 concurrencyMultiplier float64
120 concurrency int32
121 flowSchemaName string
122 matchingPrecedence int32
123 completedRequests int32
124 expectedCompletedPercentage float64
125 }
126 clients := []client{
127
128
129
130
131
132 {username: highQPSClientName, qps: 90, concurrencyMultiplier: 2.0, matchingPrecedence: 999, expectedCompletedPercentage: 0.90},
133 {username: lowQPSClientName, qps: 4, concurrencyMultiplier: 0.5, matchingPrecedence: 1000, expectedCompletedPercentage: 0.90},
134 }
135
136 ginkgo.By("creating test priority levels and flow schemas")
137 for i := range clients {
138 clients[i].priorityLevelName = fmt.Sprintf("%s-%s", priorityLevelNamePrefix, clients[i].username)
139 framework.Logf("creating PriorityLevel %q", clients[i].priorityLevelName)
140 createPriorityLevel(ctx, f, clients[i].priorityLevelName, 1)
141
142 clients[i].flowSchemaName = fmt.Sprintf("%s-%s", flowSchemaNamePrefix, clients[i].username)
143 framework.Logf("creating FlowSchema %q", clients[i].flowSchemaName)
144 createFlowSchema(ctx, f, clients[i].flowSchemaName, clients[i].matchingPrecedence, clients[i].priorityLevelName, []string{clients[i].username})
145
146 ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state")
147 waitForSteadyState(ctx, f, clients[i].flowSchemaName, clients[i].priorityLevelName)
148 }
149
150 ginkgo.By("getting request concurrency from metrics")
151 for i := range clients {
152 realConcurrency, err := getPriorityLevelNominalConcurrency(ctx, f.ClientSet, clients[i].priorityLevelName)
153 framework.ExpectNoError(err)
154 clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
155 if clients[i].concurrency < 1 {
156 clients[i].concurrency = 1
157 }
158 framework.Logf("request concurrency for %q will be %d (that is %d times client multiplier)", clients[i].username, clients[i].concurrency, realConcurrency)
159 }
160
161 ginkgo.By(fmt.Sprintf("starting uniform QPS load for %s", loadDuration.String()))
162 var wg sync.WaitGroup
163 for i := range clients {
164 wg.Add(1)
165 go func(c *client) {
166 defer wg.Done()
167 framework.Logf("starting uniform QPS load for %q: concurrency=%d, qps=%.1f", c.username, c.concurrency, c.qps)
168 c.completedRequests = uniformQPSLoadConcurrent(f, c.username, c.concurrency, c.qps, loadDuration)
169 }(&clients[i])
170 }
171 wg.Wait()
172
173 ginkgo.By("checking completed requests with expected values")
174 for _, client := range clients {
175
176 maxCompletedRequests := float64(client.concurrency) * client.qps * loadDuration.Seconds()
177 fractionCompleted := float64(client.completedRequests) / maxCompletedRequests
178 framework.Logf("client %q completed %d/%d requests (%.1f%%)", client.username, client.completedRequests, int32(maxCompletedRequests), 100*fractionCompleted)
179 if fractionCompleted < client.expectedCompletedPercentage {
180 framework.Failf("client %q: got %.1f%% completed requests, want at least %.1f%%", client.username, 100*fractionCompleted, 100*client.expectedCompletedPercentage)
181 }
182 }
183 })
184
185
186
187
188
189
190 ginkgo.It("should ensure that requests can't be drowned out (fairness)", func(ctx context.Context) {
191
192 ginkgo.Skip("skipping test until flakiness is resolved")
193
194 priorityLevelName := "e2e-testing-prioritylevel-" + f.UniqueName
195 flowSchemaName := "e2e-testing-flowschema-" + f.UniqueName
196 loadDuration := 10 * time.Second
197
198 framework.Logf("creating PriorityLevel %q", priorityLevelName)
199 createPriorityLevel(ctx, f, priorityLevelName, 1)
200
201 highQPSClientName := "highqps-" + f.UniqueName
202 lowQPSClientName := "lowqps-" + f.UniqueName
203 framework.Logf("creating FlowSchema %q", flowSchemaName)
204 createFlowSchema(ctx, f, flowSchemaName, 1000, priorityLevelName, []string{highQPSClientName, lowQPSClientName})
205
206 ginkgo.By("waiting for testing flow schema and priority level to reach steady state")
207 waitForSteadyState(ctx, f, flowSchemaName, priorityLevelName)
208
209 type client struct {
210 username string
211 qps float64
212 concurrencyMultiplier float64
213 concurrency int32
214 completedRequests int32
215 expectedCompletedPercentage float64
216 }
217 clients := []client{
218 {username: highQPSClientName, qps: 90, concurrencyMultiplier: 2.0, expectedCompletedPercentage: 0.90},
219 {username: lowQPSClientName, qps: 4, concurrencyMultiplier: 0.5, expectedCompletedPercentage: 0.90},
220 }
221
222 framework.Logf("getting real concurrency")
223 realConcurrency, err := getPriorityLevelNominalConcurrency(ctx, f.ClientSet, priorityLevelName)
224 framework.ExpectNoError(err)
225 for i := range clients {
226 clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
227 if clients[i].concurrency < 1 {
228 clients[i].concurrency = 1
229 }
230 framework.Logf("request concurrency for %q will be %d", clients[i].username, clients[i].concurrency)
231 }
232
233 ginkgo.By(fmt.Sprintf("starting uniform QPS load for %s", loadDuration.String()))
234 var wg sync.WaitGroup
235 for i := range clients {
236 wg.Add(1)
237 go func(c *client) {
238 defer wg.Done()
239 framework.Logf("starting uniform QPS load for %q: concurrency=%d, qps=%.1f", c.username, c.concurrency, c.qps)
240 c.completedRequests = uniformQPSLoadConcurrent(f, c.username, c.concurrency, c.qps, loadDuration)
241 }(&clients[i])
242 }
243 wg.Wait()
244
245 ginkgo.By("checking completed requests with expected values")
246 for _, client := range clients {
247
248 maxCompletedRequests := float64(client.concurrency) * client.qps * float64(loadDuration/time.Second)
249 fractionCompleted := float64(client.completedRequests) / maxCompletedRequests
250 framework.Logf("client %q completed %d/%d requests (%.1f%%)", client.username, client.completedRequests, int32(maxCompletedRequests), 100*fractionCompleted)
251 if fractionCompleted < client.expectedCompletedPercentage {
252 framework.Failf("client %q: got %.1f%% completed requests, want at least %.1f%%", client.username, 100*fractionCompleted, 100*client.expectedCompletedPercentage)
253 }
254 }
255 })
256
257
270 framework.ConformanceIt("should support FlowSchema API operations", func(ctx context.Context) {
271 fsVersion := "v1"
272 ginkgo.By("getting /apis")
273 {
274 discoveryGroups, err := f.ClientSet.Discovery().ServerGroups()
275 framework.ExpectNoError(err)
276 found := false
277 for _, group := range discoveryGroups.Groups {
278 if group.Name == flowcontrol.GroupName {
279 for _, version := range group.Versions {
280 if version.Version == fsVersion {
281 found = true
282 break
283 }
284 }
285 }
286 }
287 if !found {
288 framework.Failf("expected flowcontrol API group/version, got %#v", discoveryGroups.Groups)
289 }
290 }
291
292 ginkgo.By("getting /apis/flowcontrol.apiserver.k8s.io")
293 {
294 group := &metav1.APIGroup{}
295 err := f.ClientSet.Discovery().RESTClient().Get().AbsPath("/apis/flowcontrol.apiserver.k8s.io").Do(ctx).Into(group)
296 framework.ExpectNoError(err)
297 found := false
298 for _, version := range group.Versions {
299 if version.Version == fsVersion {
300 found = true
301 break
302 }
303 }
304 if !found {
305 framework.Failf("expected flowschemas API version, got %#v", group.Versions)
306 }
307 }
308
309 ginkgo.By("getting /apis/flowcontrol.apiserver.k8s.io/" + fsVersion)
310 {
311 resources, err := f.ClientSet.Discovery().ServerResourcesForGroupVersion(flowcontrol.SchemeGroupVersion.String())
312 framework.ExpectNoError(err)
313 foundFS, foundFSStatus := false, false
314 for _, resource := range resources.APIResources {
315 switch resource.Name {
316 case "flowschemas":
317 foundFS = true
318 case "flowschemas/status":
319 foundFSStatus = true
320 }
321 }
322 if !foundFS {
323 framework.Failf("expected flowschemas, got %#v", resources.APIResources)
324 }
325 if !foundFSStatus {
326 framework.Failf("expected flowschemas/status, got %#v", resources.APIResources)
327 }
328 }
329
330 client := f.ClientSet.FlowcontrolV1().FlowSchemas()
331 labelKey, labelValue := "example-e2e-fs-label", utilrand.String(8)
332 label := fmt.Sprintf("%s=%s", labelKey, labelValue)
333
334 template := &flowcontrol.FlowSchema{
335 ObjectMeta: metav1.ObjectMeta{
336 GenerateName: "e2e-example-fs-",
337 Labels: map[string]string{
338 labelKey: labelValue,
339 },
340 },
341 Spec: flowcontrol.FlowSchemaSpec{
342 MatchingPrecedence: 10000,
343 PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
344 Name: "global-default",
345 },
346 DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
347 Type: flowcontrol.FlowDistinguisherMethodByUserType,
348 },
349 Rules: []flowcontrol.PolicyRulesWithSubjects{
350 {
351 Subjects: []flowcontrol.Subject{
352 {
353 Kind: flowcontrol.SubjectKindUser,
354 User: &flowcontrol.UserSubject{
355 Name: "example-e2e-non-existent-user",
356 },
357 },
358 },
359 NonResourceRules: []flowcontrol.NonResourcePolicyRule{
360 {
361 Verbs: []string{flowcontrol.VerbAll},
362 NonResourceURLs: []string{flowcontrol.NonResourceAll},
363 },
364 },
365 },
366 },
367 },
368 }
369
370 ginkgo.DeferCleanup(func(ctx context.Context) {
371 err := client.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: label})
372 framework.ExpectNoError(err)
373 })
374
375 ginkgo.By("creating")
376 _, err := client.Create(ctx, template, metav1.CreateOptions{})
377 framework.ExpectNoError(err)
378 _, err = client.Create(ctx, template, metav1.CreateOptions{})
379 framework.ExpectNoError(err)
380 fsCreated, err := client.Create(ctx, template, metav1.CreateOptions{})
381 framework.ExpectNoError(err)
382
383 ginkgo.By("getting")
384 fsRead, err := client.Get(ctx, fsCreated.Name, metav1.GetOptions{})
385 framework.ExpectNoError(err)
386 gomega.Expect(fsRead.UID).To(gomega.Equal(fsCreated.UID))
387
388 ginkgo.By("listing")
389 list, err := client.List(ctx, metav1.ListOptions{LabelSelector: label})
390 framework.ExpectNoError(err)
391 gomega.Expect(list.Items).To(gomega.HaveLen(3), "filtered list should have 3 items")
392
393 ginkgo.By("watching")
394 framework.Logf("starting watch")
395 fsWatch, err := client.Watch(ctx, metav1.ListOptions{ResourceVersion: list.ResourceVersion, LabelSelector: label})
396 framework.ExpectNoError(err)
397
398 ginkgo.By("patching")
399 patchBytes := []byte(`{"metadata":{"annotations":{"patched":"true"}},"spec":{"matchingPrecedence":9999}}`)
400 fsPatched, err := client.Patch(ctx, fsCreated.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
401 framework.ExpectNoError(err)
402 gomega.Expect(fsPatched.Annotations).To(gomega.HaveKeyWithValue("patched", "true"), "patched object should have the applied annotation")
403 gomega.Expect(fsPatched.Spec.MatchingPrecedence).To(gomega.Equal(int32(9999)), "patched object should have the applied spec")
404
405 ginkgo.By("updating")
406 var fsUpdated *flowcontrol.FlowSchema
407 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
408 fs, err := client.Get(ctx, fsCreated.Name, metav1.GetOptions{})
409 framework.ExpectNoError(err)
410
411 fsToUpdate := fs.DeepCopy()
412 fsToUpdate.Annotations["updated"] = "true"
413 fsToUpdate.Spec.MatchingPrecedence = int32(9000)
414
415 fsUpdated, err = client.Update(ctx, fsToUpdate, metav1.UpdateOptions{})
416 return err
417 })
418 framework.ExpectNoError(err, "failed to update flowschema %q", fsCreated.Name)
419 gomega.Expect(fsUpdated.Annotations).To(gomega.HaveKeyWithValue("updated", "true"), "updated object should have the applied annotation")
420 gomega.Expect(fsUpdated.Spec.MatchingPrecedence).To(gomega.Equal(int32(9000)), "updated object should have the applied spec")
421
422 framework.Logf("waiting for watch events with expected annotations")
423 for sawAnnotation := false; !sawAnnotation; {
424 select {
425 case evt, ok := <-fsWatch.ResultChan():
426 if !ok {
427 framework.Fail("watch channel should not close")
428 }
429 gomega.Expect(evt.Type).To(gomega.Equal(watch.Modified))
430 fsWatched, isFS := evt.Object.(*flowcontrol.FlowSchema)
431 if !isFS {
432 framework.Failf("expected an object of type: %T, but got %T", &flowcontrol.FlowSchema{}, evt.Object)
433 }
434 if fsWatched.Annotations["patched"] == "true" {
435 sawAnnotation = true
436 fsWatch.Stop()
437 } else {
438 framework.Logf("missing expected annotations, waiting: %#v", fsWatched.Annotations)
439 }
440 case <-time.After(wait.ForeverTestTimeout):
441 framework.Fail("timed out waiting for watch event")
442 }
443 }
444
445 ginkgo.By("getting /status")
446 resource := flowcontrol.SchemeGroupVersion.WithResource("flowschemas")
447 fsStatusRead, err := f.DynamicClient.Resource(resource).Get(ctx, fsCreated.Name, metav1.GetOptions{}, "status")
448 framework.ExpectNoError(err)
449 gomega.Expect(fsStatusRead.GetObjectKind().GroupVersionKind()).To(gomega.Equal(flowcontrol.SchemeGroupVersion.WithKind("FlowSchema")))
450 gomega.Expect(fsStatusRead.GetUID()).To(gomega.Equal(fsCreated.UID))
451
452 ginkgo.By("patching /status")
453 patchBytes = []byte(`{"status":{"conditions":[{"type":"PatchStatusFailed","status":"False","reason":"e2e"}]}}`)
454 fsStatusPatched, err := client.Patch(ctx, fsCreated.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
455 framework.ExpectNoError(err)
456 condition := apihelpers.GetFlowSchemaConditionByType(fsStatusPatched, flowcontrol.FlowSchemaConditionType("PatchStatusFailed"))
457 gomega.Expect(condition).NotTo(gomega.BeNil())
458
459 ginkgo.By("updating /status")
460 var fsStatusUpdated *flowcontrol.FlowSchema
461 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
462 fs, err := client.Get(ctx, fsCreated.Name, metav1.GetOptions{})
463 framework.ExpectNoError(err)
464
465 fsStatusToUpdate := fs.DeepCopy()
466 fsStatusToUpdate.Status.Conditions = append(fsStatusToUpdate.Status.Conditions, flowcontrol.FlowSchemaCondition{
467 Type: "StatusUpdateFailed",
468 Status: flowcontrol.ConditionFalse,
469 Reason: "E2E",
470 Message: "Set from an e2e test",
471 })
472 fsStatusUpdated, err = client.UpdateStatus(ctx, fsStatusToUpdate, metav1.UpdateOptions{})
473 return err
474 })
475 framework.ExpectNoError(err, "failed to update status of flowschema %q", fsCreated.Name)
476 condition = apihelpers.GetFlowSchemaConditionByType(fsStatusUpdated, flowcontrol.FlowSchemaConditionType("StatusUpdateFailed"))
477 gomega.Expect(condition).NotTo(gomega.BeNil())
478
479 ginkgo.By("deleting")
480 err = client.Delete(ctx, fsCreated.Name, metav1.DeleteOptions{})
481 framework.ExpectNoError(err)
482 _, err = client.Get(ctx, fsCreated.Name, metav1.GetOptions{})
483 if !apierrors.IsNotFound(err) {
484 framework.Failf("expected 404, got %#v", err)
485 }
486
487 list, err = client.List(ctx, metav1.ListOptions{LabelSelector: label})
488 framework.ExpectNoError(err)
489 gomega.Expect(list.Items).To(gomega.HaveLen(2), "filtered list should have 2 items")
490
491 ginkgo.By("deleting a collection")
492 err = client.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: label})
493 framework.ExpectNoError(err)
494
495 list, err = client.List(ctx, metav1.ListOptions{LabelSelector: label})
496 framework.ExpectNoError(err)
497 gomega.Expect(list.Items).To(gomega.BeEmpty(), "filtered list should have 0 items")
498 })
499
500
514 framework.ConformanceIt("should support PriorityLevelConfiguration API operations", func(ctx context.Context) {
515 plVersion := "v1"
516 ginkgo.By("getting /apis")
517 {
518 discoveryGroups, err := f.ClientSet.Discovery().ServerGroups()
519 framework.ExpectNoError(err)
520 found := false
521 for _, group := range discoveryGroups.Groups {
522 if group.Name == flowcontrol.GroupName {
523 for _, version := range group.Versions {
524 if version.Version == plVersion {
525 found = true
526 break
527 }
528 }
529 }
530 }
531 if !found {
532 framework.Failf("expected flowcontrol API group/version, got %#v", discoveryGroups.Groups)
533 }
534 }
535
536 ginkgo.By("getting /apis/flowcontrol.apiserver.k8s.io")
537 {
538 group := &metav1.APIGroup{}
539 err := f.ClientSet.Discovery().RESTClient().Get().AbsPath("/apis/flowcontrol.apiserver.k8s.io").Do(ctx).Into(group)
540 framework.ExpectNoError(err)
541 found := false
542 for _, version := range group.Versions {
543 if version.Version == plVersion {
544 found = true
545 break
546 }
547 }
548 if !found {
549 framework.Failf("expected flowcontrol API version, got %#v", group.Versions)
550 }
551 }
552
553 ginkgo.By("getting /apis/flowcontrol.apiserver.k8s.io/" + plVersion)
554 {
555 resources, err := f.ClientSet.Discovery().ServerResourcesForGroupVersion(flowcontrol.SchemeGroupVersion.String())
556 framework.ExpectNoError(err)
557 foundPL, foundPLStatus := false, false
558 for _, resource := range resources.APIResources {
559 switch resource.Name {
560 case "prioritylevelconfigurations":
561 foundPL = true
562 case "prioritylevelconfigurations/status":
563 foundPLStatus = true
564 }
565 }
566 if !foundPL {
567 framework.Failf("expected prioritylevelconfigurations, got %#v", resources.APIResources)
568 }
569 if !foundPLStatus {
570 framework.Failf("expected prioritylevelconfigurations/status, got %#v", resources.APIResources)
571 }
572 }
573
574 client := f.ClientSet.FlowcontrolV1().PriorityLevelConfigurations()
575 labelKey, labelValue := "example-e2e-pl-label", utilrand.String(8)
576 label := fmt.Sprintf("%s=%s", labelKey, labelValue)
577
578 template := &flowcontrol.PriorityLevelConfiguration{
579 ObjectMeta: metav1.ObjectMeta{
580 GenerateName: "e2e-example-pl-",
581 Labels: map[string]string{
582 labelKey: labelValue,
583 },
584 },
585 Spec: flowcontrol.PriorityLevelConfigurationSpec{
586 Type: flowcontrol.PriorityLevelEnablementLimited,
587 Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
588 NominalConcurrencyShares: ptr.To(int32(2)),
589 LimitResponse: flowcontrol.LimitResponse{
590 Type: flowcontrol.LimitResponseTypeReject,
591 },
592 },
593 },
594 }
595
596 ginkgo.DeferCleanup(func(ctx context.Context) {
597 err := client.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: label})
598 framework.ExpectNoError(err)
599 })
600
601 ginkgo.By("creating")
602 _, err := client.Create(ctx, template, metav1.CreateOptions{})
603 framework.ExpectNoError(err)
604 _, err = client.Create(ctx, template, metav1.CreateOptions{})
605 framework.ExpectNoError(err)
606 plCreated, err := client.Create(ctx, template, metav1.CreateOptions{})
607 framework.ExpectNoError(err)
608
609 ginkgo.By("getting")
610 plRead, err := client.Get(ctx, plCreated.Name, metav1.GetOptions{})
611 framework.ExpectNoError(err)
612 gomega.Expect(plRead.UID).To(gomega.Equal(plCreated.UID))
613
614 ginkgo.By("listing")
615 list, err := client.List(ctx, metav1.ListOptions{LabelSelector: label})
616 framework.ExpectNoError(err)
617 gomega.Expect(list.Items).To(gomega.HaveLen(3), "filtered list should have 3 items")
618
619 ginkgo.By("watching")
620 framework.Logf("starting watch")
621 plWatch, err := client.Watch(ctx, metav1.ListOptions{ResourceVersion: list.ResourceVersion, LabelSelector: label})
622 framework.ExpectNoError(err)
623
624 ginkgo.By("patching")
625 patchBytes := []byte(`{"metadata":{"annotations":{"patched":"true"}},"spec":{"limited":{"nominalConcurrencyShares":4}}}`)
626 plPatched, err := client.Patch(ctx, plCreated.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
627 framework.ExpectNoError(err)
628 gomega.Expect(plPatched.Annotations).To(gomega.HaveKeyWithValue("patched", "true"), "patched object should have the applied annotation")
629 gomega.Expect(plPatched.Spec.Limited.NominalConcurrencyShares).To(gomega.Equal(ptr.To(int32(4))), "patched object should have the applied spec")
630
631 ginkgo.By("updating")
632 var plUpdated *flowcontrol.PriorityLevelConfiguration
633 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
634 pl, err := client.Get(ctx, plCreated.Name, metav1.GetOptions{})
635 framework.ExpectNoError(err)
636
637 plToUpdate := pl.DeepCopy()
638 plToUpdate.Annotations["updated"] = "true"
639 plToUpdate.Spec.Limited.NominalConcurrencyShares = ptr.To(int32(6))
640
641 plUpdated, err = client.Update(ctx, plToUpdate, metav1.UpdateOptions{})
642 return err
643 })
644 framework.ExpectNoError(err, "failed to update prioritylevelconfiguration %q", plCreated.Name)
645 gomega.Expect(plUpdated.Annotations).To(gomega.HaveKeyWithValue("updated", "true"), "updated object should have the applied annotation")
646 gomega.Expect(plUpdated.Spec.Limited.NominalConcurrencyShares).To(gomega.Equal(ptr.To(int32(6))), "updated object should have the applied spec")
647
648 framework.Logf("waiting for watch events with expected annotations")
649 for sawAnnotation := false; !sawAnnotation; {
650 select {
651 case evt, ok := <-plWatch.ResultChan():
652 if !ok {
653 framework.Fail("watch channel should not close")
654 }
655 gomega.Expect(evt.Type).To(gomega.Equal(watch.Modified))
656 plWatched, isPL := evt.Object.(*flowcontrol.PriorityLevelConfiguration)
657 if !isPL {
658 framework.Failf("expected an object of type: %T, but got %T", &flowcontrol.PriorityLevelConfiguration{}, evt.Object)
659 }
660 if plWatched.Annotations["patched"] == "true" {
661 sawAnnotation = true
662 plWatch.Stop()
663 } else {
664 framework.Logf("missing expected annotations, waiting: %#v", plWatched.Annotations)
665 }
666 case <-time.After(wait.ForeverTestTimeout):
667 framework.Fail("timed out waiting for watch event")
668 }
669 }
670
671 ginkgo.By("getting /status")
672 resource := flowcontrol.SchemeGroupVersion.WithResource("prioritylevelconfigurations")
673 plStatusRead, err := f.DynamicClient.Resource(resource).Get(ctx, plCreated.Name, metav1.GetOptions{}, "status")
674 framework.ExpectNoError(err)
675 gomega.Expect(plStatusRead.GetObjectKind().GroupVersionKind()).To(gomega.Equal(flowcontrol.SchemeGroupVersion.WithKind("PriorityLevelConfiguration")))
676 gomega.Expect(plStatusRead.GetUID()).To(gomega.Equal(plCreated.UID))
677
678 ginkgo.By("patching /status")
679 patchBytes = []byte(`{"status":{"conditions":[{"type":"PatchStatusFailed","status":"False","reason":"e2e"}]}}`)
680 plStatusPatched, err := client.Patch(ctx, plCreated.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
681 framework.ExpectNoError(err)
682 condition := apihelpers.GetPriorityLevelConfigurationConditionByType(plStatusPatched, flowcontrol.PriorityLevelConfigurationConditionType("PatchStatusFailed"))
683 gomega.Expect(condition).NotTo(gomega.BeNil())
684
685 ginkgo.By("updating /status")
686 var plStatusUpdated *flowcontrol.PriorityLevelConfiguration
687 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
688 pl, err := client.Get(ctx, plCreated.Name, metav1.GetOptions{})
689 framework.ExpectNoError(err)
690
691 plStatusToUpdate := pl.DeepCopy()
692 plStatusToUpdate.Status.Conditions = append(plStatusToUpdate.Status.Conditions, flowcontrol.PriorityLevelConfigurationCondition{
693 Type: "StatusUpdateFailed",
694 Status: flowcontrol.ConditionFalse,
695 Reason: "E2E",
696 Message: "Set from an e2e test",
697 })
698 plStatusUpdated, err = client.UpdateStatus(ctx, plStatusToUpdate, metav1.UpdateOptions{})
699 return err
700 })
701 framework.ExpectNoError(err, "failed to update status of prioritylevelconfiguration %q", plCreated.Name)
702 condition = apihelpers.GetPriorityLevelConfigurationConditionByType(plStatusUpdated, flowcontrol.PriorityLevelConfigurationConditionType("StatusUpdateFailed"))
703 gomega.Expect(condition).NotTo(gomega.BeNil())
704
705 ginkgo.By("deleting")
706 err = client.Delete(ctx, plCreated.Name, metav1.DeleteOptions{})
707 framework.ExpectNoError(err)
708 _, err = client.Get(ctx, plCreated.Name, metav1.GetOptions{})
709 if !apierrors.IsNotFound(err) {
710 framework.Failf("expected 404, got %#v", err)
711 }
712
713 list, err = client.List(ctx, metav1.ListOptions{LabelSelector: label})
714 framework.ExpectNoError(err)
715 gomega.Expect(list.Items).To(gomega.HaveLen(2), "filtered list should have 2 items")
716
717 ginkgo.By("deleting a collection")
718 err = client.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: label})
719 framework.ExpectNoError(err)
720
721 list, err = client.List(ctx, metav1.ListOptions{LabelSelector: label})
722 framework.ExpectNoError(err)
723 gomega.Expect(list.Items).To(gomega.BeEmpty(), "filtered list should have 0 items")
724 })
725 })
726
727
728
729 func createPriorityLevel(ctx context.Context, f *framework.Framework, priorityLevelName string, nominalConcurrencyShares int32) *flowcontrol.PriorityLevelConfiguration {
730 createdPriorityLevel, err := f.ClientSet.FlowcontrolV1().PriorityLevelConfigurations().Create(
731 ctx,
732 &flowcontrol.PriorityLevelConfiguration{
733 ObjectMeta: metav1.ObjectMeta{
734 Name: priorityLevelName,
735 },
736 Spec: flowcontrol.PriorityLevelConfigurationSpec{
737 Type: flowcontrol.PriorityLevelEnablementLimited,
738 Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
739 NominalConcurrencyShares: ptr.To(nominalConcurrencyShares),
740 LimitResponse: flowcontrol.LimitResponse{
741 Type: flowcontrol.LimitResponseTypeReject,
742 },
743 },
744 },
745 },
746 metav1.CreateOptions{})
747 framework.ExpectNoError(err)
748 ginkgo.DeferCleanup(f.ClientSet.FlowcontrolV1().PriorityLevelConfigurations().Delete, priorityLevelName, metav1.DeleteOptions{})
749 return createdPriorityLevel
750 }
751
752 func getPriorityLevelNominalConcurrency(ctx context.Context, c clientset.Interface, priorityLevelName string) (int32, error) {
753 req := c.CoreV1().RESTClient().Get().AbsPath("/metrics")
754 resp, err := req.DoRaw(ctx)
755 if err != nil {
756 return 0, fmt.Errorf("error requesting metrics; request=%#+v, request.URL()=%s: %w", req, req.URL(), err)
757 }
758 sampleDecoder := expfmt.SampleDecoder{
759 Dec: expfmt.NewDecoder(bytes.NewBuffer(resp), expfmt.FmtText),
760 Opts: &expfmt.DecodeOptions{},
761 }
762 for {
763 var v model.Vector
764 err := sampleDecoder.Decode(&v)
765 if err != nil {
766 if err == io.EOF {
767 break
768 }
769 return 0, err
770 }
771 for _, metric := range v {
772 if string(metric.Metric[model.MetricNameLabel]) != nominalConcurrencyLimitMetricName {
773 continue
774 }
775 if string(metric.Metric[priorityLevelLabelName]) != priorityLevelName {
776 continue
777 }
778 return int32(metric.Value), nil
779 }
780 }
781 return 0, errPriorityLevelNotFound
782 }
783
784
785
786 func createFlowSchema(ctx context.Context, f *framework.Framework, flowSchemaName string, matchingPrecedence int32, priorityLevelName string, matchingUsernames []string) *flowcontrol.FlowSchema {
787 var subjects []flowcontrol.Subject
788 for _, matchingUsername := range matchingUsernames {
789 subjects = append(subjects, flowcontrol.Subject{
790 Kind: flowcontrol.SubjectKindUser,
791 User: &flowcontrol.UserSubject{
792 Name: matchingUsername,
793 },
794 })
795 }
796
797 createdFlowSchema, err := f.ClientSet.FlowcontrolV1().FlowSchemas().Create(
798 ctx,
799 &flowcontrol.FlowSchema{
800 ObjectMeta: metav1.ObjectMeta{
801 Name: flowSchemaName,
802 },
803 Spec: flowcontrol.FlowSchemaSpec{
804 MatchingPrecedence: matchingPrecedence,
805 PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
806 Name: priorityLevelName,
807 },
808 DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
809 Type: flowcontrol.FlowDistinguisherMethodByUserType,
810 },
811 Rules: []flowcontrol.PolicyRulesWithSubjects{
812 {
813 Subjects: subjects,
814 NonResourceRules: []flowcontrol.NonResourcePolicyRule{
815 {
816 Verbs: []string{flowcontrol.VerbAll},
817 NonResourceURLs: []string{flowcontrol.NonResourceAll},
818 },
819 },
820 },
821 },
822 },
823 },
824 metav1.CreateOptions{})
825 framework.ExpectNoError(err)
826 ginkgo.DeferCleanup(f.ClientSet.FlowcontrolV1().FlowSchemas().Delete, flowSchemaName, metav1.DeleteOptions{})
827 return createdFlowSchema
828 }
829
830
831
832
833
834 func waitForSteadyState(ctx context.Context, f *framework.Framework, flowSchemaName string, priorityLevelName string) {
835 framework.ExpectNoError(wait.PollWithContext(ctx, time.Second, 30*time.Second, func(ctx context.Context) (bool, error) {
836 fs, err := f.ClientSet.FlowcontrolV1().FlowSchemas().Get(ctx, flowSchemaName, metav1.GetOptions{})
837 if err != nil {
838 return false, err
839 }
840 condition := apihelpers.GetFlowSchemaConditionByType(fs, flowcontrol.FlowSchemaConditionDangling)
841 if condition == nil || condition.Status != flowcontrol.ConditionFalse {
842
843
844
845
846 return false, nil
847 }
848 _, err = getPriorityLevelNominalConcurrency(ctx, f.ClientSet, priorityLevelName)
849 if err != nil {
850 if err == errPriorityLevelNotFound {
851 return false, nil
852 }
853 return false, err
854 }
855 return true, nil
856 }))
857 }
858
859
860 func makeRequest(f *framework.Framework, username string) *http.Response {
861 config := f.ClientConfig()
862 config.Impersonate.UserName = username
863 config.RateLimiter = clientsideflowcontrol.NewFakeAlwaysRateLimiter()
864 config.Impersonate.Groups = []string{"system:authenticated"}
865 roundTripper, err := rest.TransportFor(config)
866 framework.ExpectNoError(err)
867
868 req, err := http.NewRequest(http.MethodGet, f.ClientSet.CoreV1().RESTClient().Get().AbsPath("version").URL().String(), nil)
869 framework.ExpectNoError(err)
870
871 response, err := roundTripper.RoundTrip(req)
872 framework.ExpectNoError(err)
873 return response
874 }
875
876 func getPriorityLevelUID(response *http.Response) string {
877 return response.Header.Get(flowcontrol.ResponseHeaderMatchedPriorityLevelConfigurationUID)
878 }
879
880 func getFlowSchemaUID(response *http.Response) string {
881 return response.Header.Get(flowcontrol.ResponseHeaderMatchedFlowSchemaUID)
882 }
883
884
885
886
887 func uniformQPSLoadSingle(f *framework.Framework, username string, qps float64, loadDuration time.Duration) int32 {
888 var completed int32
889 var wg sync.WaitGroup
890 ticker := time.NewTicker(time.Duration(float64(time.Second) / qps))
891 defer ticker.Stop()
892 timer := time.NewTimer(loadDuration)
893 for {
894 select {
895 case <-ticker.C:
896 wg.Add(1)
897
898
899
900
901
902
903
904
905 go func() {
906 defer wg.Done()
907 makeRequest(f, username)
908 atomic.AddInt32(&completed, 1)
909 }()
910 case <-timer.C:
911
912 totalCompleted := atomic.LoadInt32(&completed)
913 wg.Wait()
914 return totalCompleted
915 }
916 }
917 }
918
919
920
921
922
923 func uniformQPSLoadConcurrent(f *framework.Framework, username string, concurrency int32, qps float64, loadDuration time.Duration) int32 {
924 var completed int32
925 var wg sync.WaitGroup
926 wg.Add(int(concurrency))
927 for i := int32(0); i < concurrency; i++ {
928 go func() {
929 defer wg.Done()
930 atomic.AddInt32(&completed, uniformQPSLoadSingle(f, username, qps, loadDuration))
931 }()
932 }
933 wg.Wait()
934 return completed
935 }
936
View as plain text