    17  package apimachinery
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"errors"
    23  	"fmt"
    24  	"io"
    25  	"net/http"
    26  	"sync"
    27  	"sync/atomic"
    28  	"time"
    30  	"github.com/onsi/ginkgo/v2"
    31  	"github.com/onsi/gomega"
    32  	"github.com/prometheus/common/expfmt"
    33  	"github.com/prometheus/common/model"
    35  	flowcontrol "k8s.io/api/flowcontrol/v1"
    36  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    37  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    38  	"k8s.io/apimachinery/pkg/types"
    39  	utilrand "k8s.io/apimachinery/pkg/util/rand"
    40  	"k8s.io/apimachinery/pkg/util/wait"
    41  	"k8s.io/apimachinery/pkg/watch"
    42  	"k8s.io/apiserver/pkg/util/apihelpers"
    43  	clientset "k8s.io/client-go/kubernetes"
    44  	"k8s.io/client-go/rest"
    45  	clientsideflowcontrol "k8s.io/client-go/util/flowcontrol"
    46  	"k8s.io/client-go/util/retry"
    47  	"k8s.io/kubernetes/test/e2e/framework"
    48  	admissionapi "k8s.io/pod-security-admission/api"
    49  	"k8s.io/utils/ptr"
    50  )
    52  const (
    53  	nominalConcurrencyLimitMetricName = "apiserver_flowcontrol_nominal_limit_seats"
    54  	priorityLevelLabelName            = "priority_level"
    55  )
    57  var (
    58  	errPriorityLevelNotFound = errors.New("cannot find a metric sample with a matching priority level name label")
    59  )
    61  var _ = SIGDescribe("API priority and fairness", func() {
    62  	f := framework.NewDefaultFramework("apf")
    63  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    65  	ginkgo.It("should ensure that requests can be classified by adding FlowSchema and PriorityLevelConfiguration", func(ctx context.Context) {
    66  		testingFlowSchemaName := "e2e-testing-flowschema"
    67  		testingPriorityLevelName := "e2e-testing-prioritylevel"
    68  		matchingUsername := "noxu"
    69  		nonMatchingUsername := "foo"
    71  		ginkgo.By("creating a testing PriorityLevelConfiguration object")
    72  		createdPriorityLevel := createPriorityLevel(ctx, f, testingPriorityLevelName, 1)
    74  		ginkgo.By("creating a testing FlowSchema object")
    75  		createdFlowSchema := createFlowSchema(ctx, f, testingFlowSchemaName, 1000, testingPriorityLevelName, []string{matchingUsername})
    77  		ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state")
    78  		waitForSteadyState(ctx, f, testingFlowSchemaName, testingPriorityLevelName)
    80  		var response *http.Response
    81  		ginkgo.By("response headers should contain the UID of the appropriate FlowSchema and PriorityLevelConfiguration for a matching user")
    82  		response = makeRequest(f, matchingUsername)
    83  		if plUIDWant, plUIDGot := string(createdPriorityLevel.UID), getPriorityLevelUID(response); plUIDWant != plUIDGot {
    84  			framework.Failf("expected PriorityLevelConfiguration UID in the response header: %s, but got: %s, response header: %#v", plUIDWant, plUIDGot, response.Header)
    85  		}
    86  		if fsUIDWant, fsUIDGot := string(createdFlowSchema.UID), getFlowSchemaUID(response); fsUIDWant != fsUIDGot {
    87  			framework.Failf("expected FlowSchema UID in the response header: %s, but got: %s, response header: %#v", fsUIDWant, fsUIDGot, response.Header)
    88  		}
    90  		ginkgo.By("response headers should contain non-empty UID of FlowSchema and PriorityLevelConfiguration for a non-matching user")
    91  		response = makeRequest(f, nonMatchingUsername)
    92  		if plUIDGot := getPriorityLevelUID(response); plUIDGot == "" {
    93  			framework.Failf("expected a non-empty PriorityLevelConfiguration UID in the response header, but got: %s, response header: %#v", plUIDGot, response.Header)
    94  		}
    95  		if fsUIDGot := getFlowSchemaUID(response); fsUIDGot == "" {
    96  			framework.Failf("expected a non-empty FlowSchema UID in the response header but got: %s, response header: %#v", fsUIDGot, response.Header)
    97  		}
    98  	})
   100  	// This test creates two flow schemas and a corresponding priority level for
   101  	// each flow schema. One flow schema has a higher match precedence. With two
   102  	// clients making requests at different rates, we test to make sure that the
   103  	// higher QPS client cannot drown out the other one despite having higher
   104  	// priority.
   105  	ginkgo.It("should ensure that requests can't be drowned out (priority)", func(ctx context.Context) {
   106  		// See https://github.com/kubernetes/kubernetes/issues/96710
   107  		ginkgo.Skip("skipping test until flakiness is resolved")
   109  		flowSchemaNamePrefix := "e2e-testing-flowschema-" + f.UniqueName
   110  		priorityLevelNamePrefix := "e2e-testing-prioritylevel-" + f.UniqueName
   111  		loadDuration := 10 * time.Second
   112  		highQPSClientName := "highqps-" + f.UniqueName
   113  		lowQPSClientName := "lowqps-" + f.UniqueName
   115  		type client struct {
   116  			username                    string
   117  			qps                         float64
   118  			priorityLevelName           string  //lint:ignore U1000 field is actually used
   119  			concurrencyMultiplier       float64 //lint:ignore U1000 field is actually used
   120  			concurrency                 int32
   121  			flowSchemaName              string //lint:ignore U1000 field is actually used
   122  			matchingPrecedence          int32  //lint:ignore U1000 field is actually used
   123  			completedRequests           int32
   124  			expectedCompletedPercentage float64 //lint:ignore U1000 field is actually used
   125  		}
   126  		clients := []client{
   127  			// "highqps" refers to a client that creates requests at a much higher
   128  			// QPS than its counter-part and well above its concurrency share limit.
   129  			// In contrast, "lowqps" stays under its concurrency shares.
   130  			// Additionally, the "highqps" client also has a higher matching
   131  			// precedence for its flow schema.
   132  			{username: highQPSClientName, qps: 90, concurrencyMultiplier: 2.0, matchingPrecedence: 999, expectedCompletedPercentage: 0.90},
   133  			{username: lowQPSClientName, qps: 4, concurrencyMultiplier: 0.5, matchingPrecedence: 1000, expectedCompletedPercentage: 0.90},
   134  		}
   136  		ginkgo.By("creating test priority levels and flow schemas")
   137  		for i := range clients {
   138  			clients[i].priorityLevelName = fmt.Sprintf("%s-%s", priorityLevelNamePrefix, clients[i].username)
   139  			framework.Logf("creating PriorityLevel %q", clients[i].priorityLevelName)
   140  			createPriorityLevel(ctx, f, clients[i].priorityLevelName, 1)
   142  			clients[i].flowSchemaName = fmt.Sprintf("%s-%s", flowSchemaNamePrefix, clients[i].username)
   143  			framework.Logf("creating FlowSchema %q", clients[i].flowSchemaName)
   144  			createFlowSchema(ctx, f, clients[i].flowSchemaName, clients[i].matchingPrecedence, clients[i].priorityLevelName, []string{clients[i].username})
   146  			ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state")
   147  			waitForSteadyState(ctx, f, clients[i].flowSchemaName, clients[i].priorityLevelName)
   148  		}
   150  		ginkgo.By("getting request concurrency from metrics")
   151  		for i := range clients {
   152  			realConcurrency, err := getPriorityLevelNominalConcurrency(ctx, f.ClientSet, clients[i].priorityLevelName)
   153  			framework.ExpectNoError(err)
   154  			clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
   155  			if clients[i].concurrency < 1 {
   156  				clients[i].concurrency = 1
   157  			}
   158  			framework.Logf("request concurrency for %q will be %d (that is %d times client multiplier)", clients[i].username, clients[i].concurrency, realConcurrency)
   159  		}
   161  		ginkgo.By(fmt.Sprintf("starting uniform QPS load for %s", loadDuration.String()))
   162  		var wg sync.WaitGroup
   163  		for i := range clients {
   164  			wg.Add(1)
   165  			go func(c *client) {
   166  				defer wg.Done()
   167  				framework.Logf("starting uniform QPS load for %q: concurrency=%d, qps=%.1f", c.username, c.concurrency, c.qps)
   168  				c.completedRequests = uniformQPSLoadConcurrent(f, c.username, c.concurrency, c.qps, loadDuration)
   169  			}(&clients[i])
   170  		}
   171  		wg.Wait()
   173  		ginkgo.By("checking completed requests with expected values")
   174  		for _, client := range clients {
   175  			// Each client should have 95% of its ideal number of completed requests.
   176  			maxCompletedRequests := float64(client.concurrency) * client.qps * loadDuration.Seconds()
   177  			fractionCompleted := float64(client.completedRequests) / maxCompletedRequests
   178  			framework.Logf("client %q completed %d/%d requests (%.1f%%)", client.username, client.completedRequests, int32(maxCompletedRequests), 100*fractionCompleted)
   179  			if fractionCompleted < client.expectedCompletedPercentage {
   180  				framework.Failf("client %q: got %.1f%% completed requests, want at least %.1f%%", client.username, 100*fractionCompleted, 100*client.expectedCompletedPercentage)
   181  			}
   182  		}
   183  	})
   185  	// This test has two clients (different usernames) making requests at
   186  	// different rates. Both clients' requests get mapped to the same flow schema
   187  	// and priority level. We expect APF's "ByUser" flow distinguisher to isolate
   188  	// the two clients and not allow one client to drown out the other despite
   189  	// having a higher QPS.
   190  	ginkgo.It("should ensure that requests can't be drowned out (fairness)", func(ctx context.Context) {
   191  		// See https://github.com/kubernetes/kubernetes/issues/96710
   192  		ginkgo.Skip("skipping test until flakiness is resolved")
   194  		priorityLevelName := "e2e-testing-prioritylevel-" + f.UniqueName
   195  		flowSchemaName := "e2e-testing-flowschema-" + f.UniqueName
   196  		loadDuration := 10 * time.Second
   198  		framework.Logf("creating PriorityLevel %q", priorityLevelName)
   199  		createPriorityLevel(ctx, f, priorityLevelName, 1)
   201  		highQPSClientName := "highqps-" + f.UniqueName
   202  		lowQPSClientName := "lowqps-" + f.UniqueName
   203  		framework.Logf("creating FlowSchema %q", flowSchemaName)
   204  		createFlowSchema(ctx, f, flowSchemaName, 1000, priorityLevelName, []string{highQPSClientName, lowQPSClientName})
   206  		ginkgo.By("waiting for testing flow schema and priority level to reach steady state")
   207  		waitForSteadyState(ctx, f, flowSchemaName, priorityLevelName)
   209  		type client struct {
   210  			username                    string
   211  			qps                         float64
   212  			concurrencyMultiplier       float64 //lint:ignore U1000 field is actually used
   213  			concurrency                 int32
   214  			completedRequests           int32
   215  			expectedCompletedPercentage float64 //lint:ignore U1000 field is actually used
   216  		}
   217  		clients := []client{
   218  			{username: highQPSClientName, qps: 90, concurrencyMultiplier: 2.0, expectedCompletedPercentage: 0.90},
   219  			{username: lowQPSClientName, qps: 4, concurrencyMultiplier: 0.5, expectedCompletedPercentage: 0.90},
   220  		}
   222  		framework.Logf("getting real concurrency")
   223  		realConcurrency, err := getPriorityLevelNominalConcurrency(ctx, f.ClientSet, priorityLevelName)
   224  		framework.ExpectNoError(err)
   225  		for i := range clients {
   226  			clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
   227  			if clients[i].concurrency < 1 {
   228  				clients[i].concurrency = 1
   229  			}
   230  			framework.Logf("request concurrency for %q will be %d", clients[i].username, clients[i].concurrency)
   231  		}
   233  		ginkgo.By(fmt.Sprintf("starting uniform QPS load for %s", loadDuration.String()))
   234  		var wg sync.WaitGroup
   235  		for i := range clients {
   236  			wg.Add(1)
   237  			go func(c *client) {
   238  				defer wg.Done()
   239  				framework.Logf("starting uniform QPS load for %q: concurrency=%d, qps=%.1f", c.username, c.concurrency, c.qps)
   240  				c.completedRequests = uniformQPSLoadConcurrent(f, c.username, c.concurrency, c.qps, loadDuration)
   241  			}(&clients[i])
   242  		}
   243  		wg.Wait()
   245  		ginkgo.By("checking completed requests with expected values")
   246  		for _, client := range clients {
   247  			// Each client should have 95% of its ideal number of completed requests.
   248  			maxCompletedRequests := float64(client.concurrency) * client.qps * float64(loadDuration/time.Second)
   249  			fractionCompleted := float64(client.completedRequests) / maxCompletedRequests
   250  			framework.Logf("client %q completed %d/%d requests (%.1f%%)", client.username, client.completedRequests, int32(maxCompletedRequests), 100*fractionCompleted)
   251  			if fractionCompleted < client.expectedCompletedPercentage {
   252  				framework.Failf("client %q: got %.1f%% completed requests, want at least %.1f%%", client.username, 100*fractionCompleted, 100*client.expectedCompletedPercentage)
   253  			}
   254  		}
   255  	})
   257  	/*
   258  	   Release: v1.29
   259  	   Testname: Priority and Fairness FlowSchema API
   260  	   Description:
   261  	   The flowcontrol.apiserver.k8s.io API group MUST exist in the
   262  	     /apis discovery document.
   263  	   The flowcontrol.apiserver.k8s.io/v1 API group/version MUST exist
   264  	     in the /apis/flowcontrol.apiserver.k8s.io discovery document.
   265  	   The flowschemas and flowschemas/status resources MUST exist
   266  	     in the /apis/flowcontrol.apiserver.k8s.io/v1 discovery document.
   267  	   The flowschema resource must support create, get, list, watch,
   268  	     update, patch, delete, and deletecollection.
   269  	*/
   270  	framework.ConformanceIt("should support FlowSchema API operations", func(ctx context.Context) {
   271  		fsVersion := "v1"
   272  		ginkgo.By("getting /apis")
   273  		{
   274  			discoveryGroups, err := f.ClientSet.Discovery().ServerGroups()
   275  			framework.ExpectNoError(err)
   276  			found := false
   277  			for _, group := range discoveryGroups.Groups {
   278  				if group.Name == flowcontrol.GroupName {
   279  					for _, version := range group.Versions {
   280  						if version.Version == fsVersion {
   281  							found = true
   282  							break
   283  						}
   284  					}
   285  				}
   286  			}
   287  			if !found {
   288  				framework.Failf("expected flowcontrol API group/version, got %#v", discoveryGroups.Groups)
   289  			}
   290  		}
   292  		ginkgo.By("getting /apis/flowcontrol.apiserver.k8s.io")
   293  		{
   294  			group := &metav1.APIGroup{}
   295  			err := f.ClientSet.Discovery().RESTClient().Get().AbsPath("/apis/flowcontrol.apiserver.k8s.io").Do(ctx).Into(group)
   296  			framework.ExpectNoError(err)
   297  			found := false
   298  			for _, version := range group.Versions {
   299  				if version.Version == fsVersion {
   300  					found = true
   301  					break
   302  				}
   303  			}
   304  			if !found {
   305  				framework.Failf("expected flowschemas API version, got %#v", group.Versions)
   306  			}
   307  		}
   309  		ginkgo.By("getting /apis/flowcontrol.apiserver.k8s.io/" + fsVersion)
   310  		{
   311  			resources, err := f.ClientSet.Discovery().ServerResourcesForGroupVersion(flowcontrol.SchemeGroupVersion.String())
   312  			framework.ExpectNoError(err)
   313  			foundFS, foundFSStatus := false, false
   314  			for _, resource := range resources.APIResources {
   315  				switch resource.Name {
   316  				case "flowschemas":
   317  					foundFS = true
   318  				case "flowschemas/status":
   319  					foundFSStatus = true
   320  				}
   321  			}
   322  			if !foundFS {
   323  				framework.Failf("expected flowschemas, got %#v", resources.APIResources)
   324  			}
   325  			if !foundFSStatus {
   326  				framework.Failf("expected flowschemas/status, got %#v", resources.APIResources)
   327  			}
   328  		}
   330  		client := f.ClientSet.FlowcontrolV1().FlowSchemas()
   331  		labelKey, labelValue := "example-e2e-fs-label", utilrand.String(8)
   332  		label := fmt.Sprintf("%s=%s", labelKey, labelValue)
   334  		template := &flowcontrol.FlowSchema{
   335  			ObjectMeta: metav1.ObjectMeta{
   336  				GenerateName: "e2e-example-fs-",
   337  				Labels: map[string]string{
   338  					labelKey: labelValue,
   339  				},
   340  			},
   341  			Spec: flowcontrol.FlowSchemaSpec{
   342  				MatchingPrecedence: 10000,
   343  				PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
   344  					Name: "global-default",
   345  				},
   346  				DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
   347  					Type: flowcontrol.FlowDistinguisherMethodByUserType,
   348  				},
   349  				Rules: []flowcontrol.PolicyRulesWithSubjects{
   350  					{
   351  						Subjects: []flowcontrol.Subject{
   352  							{
   353  								Kind: flowcontrol.SubjectKindUser,
   354  								User: &flowcontrol.UserSubject{
   355  									Name: "example-e2e-non-existent-user",
   356  								},
   357  							},
   358  						},
   359  						NonResourceRules: []flowcontrol.NonResourcePolicyRule{
   360  							{
   361  								Verbs:           []string{flowcontrol.VerbAll},
   362  								NonResourceURLs: []string{flowcontrol.NonResourceAll},
   363  							},
   364  						},
   365  					},
   366  				},
   367  			},
   368  		}
   370  		ginkgo.DeferCleanup(func(ctx context.Context) {
   371  			err := client.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: label})
   372  			framework.ExpectNoError(err)
   373  		})
   375  		ginkgo.By("creating")
   376  		_, err := client.Create(ctx, template, metav1.CreateOptions{})
   377  		framework.ExpectNoError(err)
   378  		_, err = client.Create(ctx, template, metav1.CreateOptions{})
   379  		framework.ExpectNoError(err)
   380  		fsCreated, err := client.Create(ctx, template, metav1.CreateOptions{})
   381  		framework.ExpectNoError(err)
   383  		ginkgo.By("getting")
   384  		fsRead, err := client.Get(ctx, fsCreated.Name, metav1.GetOptions{})
   385  		framework.ExpectNoError(err)
   386  		gomega.Expect(fsRead.UID).To(gomega.Equal(fsCreated.UID))
   388  		ginkgo.By("listing")
   389  		list, err := client.List(ctx, metav1.ListOptions{LabelSelector: label})
   390  		framework.ExpectNoError(err)
   391  		gomega.Expect(list.Items).To(gomega.HaveLen(3), "filtered list should have 3 items")
   393  		ginkgo.By("watching")
   394  		framework.Logf("starting watch")
   395  		fsWatch, err := client.Watch(ctx, metav1.ListOptions{ResourceVersion: list.ResourceVersion, LabelSelector: label})
   396  		framework.ExpectNoError(err)
   398  		ginkgo.By("patching")
   399  		patchBytes := []byte(`{"metadata":{"annotations":{"patched":"true"}},"spec":{"matchingPrecedence":9999}}`)
   400  		fsPatched, err := client.Patch(ctx, fsCreated.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
   401  		framework.ExpectNoError(err)
   402  		gomega.Expect(fsPatched.Annotations).To(gomega.HaveKeyWithValue("patched", "true"), "patched object should have the applied annotation")
   403  		gomega.Expect(fsPatched.Spec.MatchingPrecedence).To(gomega.Equal(int32(9999)), "patched object should have the applied spec")
   405  		ginkgo.By("updating")
   406  		var fsUpdated *flowcontrol.FlowSchema
   407  		err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
   408  			fs, err := client.Get(ctx, fsCreated.Name, metav1.GetOptions{})
   409  			framework.ExpectNoError(err)
   411  			fsToUpdate := fs.DeepCopy()
   412  			fsToUpdate.Annotations["updated"] = "true"
   413  			fsToUpdate.Spec.MatchingPrecedence = int32(9000)
   415  			fsUpdated, err = client.Update(ctx, fsToUpdate, metav1.UpdateOptions{})
   416  			return err
   417  		})
   418  		framework.ExpectNoError(err, "failed to update flowschema %q", fsCreated.Name)
   419  		gomega.Expect(fsUpdated.Annotations).To(gomega.HaveKeyWithValue("updated", "true"), "updated object should have the applied annotation")
   420  		gomega.Expect(fsUpdated.Spec.MatchingPrecedence).To(gomega.Equal(int32(9000)), "updated object should have the applied spec")
   422  		framework.Logf("waiting for watch events with expected annotations")
   423  		for sawAnnotation := false; !sawAnnotation; {
   424  			select {
   425  			case evt, ok := <-fsWatch.ResultChan():
   426  				if !ok {
   427  					framework.Fail("watch channel should not close")
   428  				}
   429  				gomega.Expect(evt.Type).To(gomega.Equal(watch.Modified))
   430  				fsWatched, isFS := evt.Object.(*flowcontrol.FlowSchema)
   431  				if !isFS {
   432  					framework.Failf("expected an object of type: %T, but got %T", &flowcontrol.FlowSchema{}, evt.Object)
   433  				}
   434  				if fsWatched.Annotations["patched"] == "true" {
   435  					sawAnnotation = true
   436  					fsWatch.Stop()
   437  				} else {
   438  					framework.Logf("missing expected annotations, waiting: %#v", fsWatched.Annotations)
   439  				}
   440  			case <-time.After(wait.ForeverTestTimeout):
   441  				framework.Fail("timed out waiting for watch event")
   442  			}
   443  		}
   445  		ginkgo.By("getting /status")
   446  		resource := flowcontrol.SchemeGroupVersion.WithResource("flowschemas")
   447  		fsStatusRead, err := f.DynamicClient.Resource(resource).Get(ctx, fsCreated.Name, metav1.GetOptions{}, "status")
   448  		framework.ExpectNoError(err)
   449  		gomega.Expect(fsStatusRead.GetObjectKind().GroupVersionKind()).To(gomega.Equal(flowcontrol.SchemeGroupVersion.WithKind("FlowSchema")))
   450  		gomega.Expect(fsStatusRead.GetUID()).To(gomega.Equal(fsCreated.UID))
   452  		ginkgo.By("patching /status")
   453  		patchBytes = []byte(`{"status":{"conditions":[{"type":"PatchStatusFailed","status":"False","reason":"e2e"}]}}`)
   454  		fsStatusPatched, err := client.Patch(ctx, fsCreated.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
   455  		framework.ExpectNoError(err)
   456  		condition := apihelpers.GetFlowSchemaConditionByType(fsStatusPatched, flowcontrol.FlowSchemaConditionType("PatchStatusFailed"))
   457  		gomega.Expect(condition).NotTo(gomega.BeNil())
   459  		ginkgo.By("updating /status")
   460  		var fsStatusUpdated *flowcontrol.FlowSchema
   461  		err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
   462  			fs, err := client.Get(ctx, fsCreated.Name, metav1.GetOptions{})
   463  			framework.ExpectNoError(err)
   465  			fsStatusToUpdate := fs.DeepCopy()
   466  			fsStatusToUpdate.Status.Conditions = append(fsStatusToUpdate.Status.Conditions, flowcontrol.FlowSchemaCondition{
   467  				Type:    "StatusUpdateFailed",
   468  				Status:  flowcontrol.ConditionFalse,
   469  				Reason:  "E2E",
   470  				Message: "Set from an e2e test",
   471  			})
   472  			fsStatusUpdated, err = client.UpdateStatus(ctx, fsStatusToUpdate, metav1.UpdateOptions{})
   473  			return err
   474  		})
   475  		framework.ExpectNoError(err, "failed to update status of flowschema %q", fsCreated.Name)
   476  		condition = apihelpers.GetFlowSchemaConditionByType(fsStatusUpdated, flowcontrol.FlowSchemaConditionType("StatusUpdateFailed"))
   477  		gomega.Expect(condition).NotTo(gomega.BeNil())
   479  		ginkgo.By("deleting")
   480  		err = client.Delete(ctx, fsCreated.Name, metav1.DeleteOptions{})
   481  		framework.ExpectNoError(err)
   482  		_, err = client.Get(ctx, fsCreated.Name, metav1.GetOptions{})
   483  		if !apierrors.IsNotFound(err) {
   484  			framework.Failf("expected 404, got %#v", err)
   485  		}
   487  		list, err = client.List(ctx, metav1.ListOptions{LabelSelector: label})
   488  		framework.ExpectNoError(err)
   489  		gomega.Expect(list.Items).To(gomega.HaveLen(2), "filtered list should have 2 items")
   491  		ginkgo.By("deleting a collection")
   492  		err = client.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: label})
   493  		framework.ExpectNoError(err)
   495  		list, err = client.List(ctx, metav1.ListOptions{LabelSelector: label})
   496  		framework.ExpectNoError(err)
   497  		gomega.Expect(list.Items).To(gomega.BeEmpty(), "filtered list should have 0 items")
   498  	})
   500  	/*
   501  	   Release: v1.29
   502  	   Testname: Priority and Fairness PriorityLevelConfiguration API
   503  	   Description:
   504  	   The flowcontrol.apiserver.k8s.io API group MUST exist in the
   505  	     /apis discovery document.
   506  	   The flowcontrol.apiserver.k8s.io/v1 API group/version MUST exist
   507  	     in the /apis/flowcontrol.apiserver.k8s.io discovery document.
   508  	   The prioritylevelconfiguration and prioritylevelconfiguration/status
   509  	     resources MUST exist in the
   510  	     /apis/flowcontrol.apiserver.k8s.io/v1 discovery document.
   511  	   The prioritylevelconfiguration resource must support create, get,
   512  	     list, watch, update, patch, delete, and deletecollection.
   513  	*/
   514  	framework.ConformanceIt("should support PriorityLevelConfiguration API operations", func(ctx context.Context) {
   515  		plVersion := "v1"
   516  		ginkgo.By("getting /apis")
   517  		{
   518  			discoveryGroups, err := f.ClientSet.Discovery().ServerGroups()
   519  			framework.ExpectNoError(err)
   520  			found := false
   521  			for _, group := range discoveryGroups.Groups {
   522  				if group.Name == flowcontrol.GroupName {
   523  					for _, version := range group.Versions {
   524  						if version.Version == plVersion {
   525  							found = true
   526  							break
   527  						}
   528  					}
   529  				}
   530  			}
   531  			if !found {
   532  				framework.Failf("expected flowcontrol API group/version, got %#v", discoveryGroups.Groups)
   533  			}
   534  		}
   536  		ginkgo.By("getting /apis/flowcontrol.apiserver.k8s.io")
   537  		{
   538  			group := &metav1.APIGroup{}
   539  			err := f.ClientSet.Discovery().RESTClient().Get().AbsPath("/apis/flowcontrol.apiserver.k8s.io").Do(ctx).Into(group)
   540  			framework.ExpectNoError(err)
   541  			found := false
   542  			for _, version := range group.Versions {
   543  				if version.Version == plVersion {
   544  					found = true
   545  					break
   546  				}
   547  			}
   548  			if !found {
   549  				framework.Failf("expected flowcontrol API version, got %#v", group.Versions)
   550  			}
   551  		}
   553  		ginkgo.By("getting /apis/flowcontrol.apiserver.k8s.io/" + plVersion)
   554  		{
   555  			resources, err := f.ClientSet.Discovery().ServerResourcesForGroupVersion(flowcontrol.SchemeGroupVersion.String())
   556  			framework.ExpectNoError(err)
   557  			foundPL, foundPLStatus := false, false
   558  			for _, resource := range resources.APIResources {
   559  				switch resource.Name {
   560  				case "prioritylevelconfigurations":
   561  					foundPL = true
   562  				case "prioritylevelconfigurations/status":
   563  					foundPLStatus = true
   564  				}
   565  			}
   566  			if !foundPL {
   567  				framework.Failf("expected prioritylevelconfigurations, got %#v", resources.APIResources)
   568  			}
   569  			if !foundPLStatus {
   570  				framework.Failf("expected prioritylevelconfigurations/status, got %#v", resources.APIResources)
   571  			}
   572  		}
   574  		client := f.ClientSet.FlowcontrolV1().PriorityLevelConfigurations()
   575  		labelKey, labelValue := "example-e2e-pl-label", utilrand.String(8)
   576  		label := fmt.Sprintf("%s=%s", labelKey, labelValue)
   578  		template := &flowcontrol.PriorityLevelConfiguration{
   579  			ObjectMeta: metav1.ObjectMeta{
   580  				GenerateName: "e2e-example-pl-",
   581  				Labels: map[string]string{
   582  					labelKey: labelValue,
   583  				},
   584  			},
   585  			Spec: flowcontrol.PriorityLevelConfigurationSpec{
   586  				Type: flowcontrol.PriorityLevelEnablementLimited,
   587  				Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
   588  					NominalConcurrencyShares: ptr.To(int32(2)),
   589  					LimitResponse: flowcontrol.LimitResponse{
   590  						Type: flowcontrol.LimitResponseTypeReject,
   591  					},
   592  				},
   593  			},
   594  		}
   596  		ginkgo.DeferCleanup(func(ctx context.Context) {
   597  			err := client.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: label})
   598  			framework.ExpectNoError(err)
   599  		})
   601  		ginkgo.By("creating")
   602  		_, err := client.Create(ctx, template, metav1.CreateOptions{})
   603  		framework.ExpectNoError(err)
   604  		_, err = client.Create(ctx, template, metav1.CreateOptions{})
   605  		framework.ExpectNoError(err)
   606  		plCreated, err := client.Create(ctx, template, metav1.CreateOptions{})
   607  		framework.ExpectNoError(err)
   609  		ginkgo.By("getting")
   610  		plRead, err := client.Get(ctx, plCreated.Name, metav1.GetOptions{})
   611  		framework.ExpectNoError(err)
   612  		gomega.Expect(plRead.UID).To(gomega.Equal(plCreated.UID))
   614  		ginkgo.By("listing")
   615  		list, err := client.List(ctx, metav1.ListOptions{LabelSelector: label})
   616  		framework.ExpectNoError(err)
   617  		gomega.Expect(list.Items).To(gomega.HaveLen(3), "filtered list should have 3 items")
   619  		ginkgo.By("watching")
   620  		framework.Logf("starting watch")
   621  		plWatch, err := client.Watch(ctx, metav1.ListOptions{ResourceVersion: list.ResourceVersion, LabelSelector: label})
   622  		framework.ExpectNoError(err)
   624  		ginkgo.By("patching")
   625  		patchBytes := []byte(`{"metadata":{"annotations":{"patched":"true"}},"spec":{"limited":{"nominalConcurrencyShares":4}}}`)
   626  		plPatched, err := client.Patch(ctx, plCreated.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
   627  		framework.ExpectNoError(err)
   628  		gomega.Expect(plPatched.Annotations).To(gomega.HaveKeyWithValue("patched", "true"), "patched object should have the applied annotation")
   629  		gomega.Expect(plPatched.Spec.Limited.NominalConcurrencyShares).To(gomega.Equal(ptr.To(int32(4))), "patched object should have the applied spec")
   631  		ginkgo.By("updating")
   632  		var plUpdated *flowcontrol.PriorityLevelConfiguration
   633  		err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
   634  			pl, err := client.Get(ctx, plCreated.Name, metav1.GetOptions{})
   635  			framework.ExpectNoError(err)
   637  			plToUpdate := pl.DeepCopy()
   638  			plToUpdate.Annotations["updated"] = "true"
   639  			plToUpdate.Spec.Limited.NominalConcurrencyShares = ptr.To(int32(6))
   641  			plUpdated, err = client.Update(ctx, plToUpdate, metav1.UpdateOptions{})
   642  			return err
   643  		})
   644  		framework.ExpectNoError(err, "failed to update prioritylevelconfiguration %q", plCreated.Name)
   645  		gomega.Expect(plUpdated.Annotations).To(gomega.HaveKeyWithValue("updated", "true"), "updated object should have the applied annotation")
   646  		gomega.Expect(plUpdated.Spec.Limited.NominalConcurrencyShares).To(gomega.Equal(ptr.To(int32(6))), "updated object should have the applied spec")
   648  		framework.Logf("waiting for watch events with expected annotations")
   649  		for sawAnnotation := false; !sawAnnotation; {
   650  			select {
   651  			case evt, ok := <-plWatch.ResultChan():
   652  				if !ok {
   653  					framework.Fail("watch channel should not close")
   654  				}
   655  				gomega.Expect(evt.Type).To(gomega.Equal(watch.Modified))
   656  				plWatched, isPL := evt.Object.(*flowcontrol.PriorityLevelConfiguration)
   657  				if !isPL {
   658  					framework.Failf("expected an object of type: %T, but got %T", &flowcontrol.PriorityLevelConfiguration{}, evt.Object)
   659  				}
   660  				if plWatched.Annotations["patched"] == "true" {
   661  					sawAnnotation = true
   662  					plWatch.Stop()
   663  				} else {
   664  					framework.Logf("missing expected annotations, waiting: %#v", plWatched.Annotations)
   665  				}
   666  			case <-time.After(wait.ForeverTestTimeout):
   667  				framework.Fail("timed out waiting for watch event")
   668  			}
   669  		}
   671  		ginkgo.By("getting /status")
   672  		resource := flowcontrol.SchemeGroupVersion.WithResource("prioritylevelconfigurations")
   673  		plStatusRead, err := f.DynamicClient.Resource(resource).Get(ctx, plCreated.Name, metav1.GetOptions{}, "status")
   674  		framework.ExpectNoError(err)
   675  		gomega.Expect(plStatusRead.GetObjectKind().GroupVersionKind()).To(gomega.Equal(flowcontrol.SchemeGroupVersion.WithKind("PriorityLevelConfiguration")))
   676  		gomega.Expect(plStatusRead.GetUID()).To(gomega.Equal(plCreated.UID))
   678  		ginkgo.By("patching /status")
   679  		patchBytes = []byte(`{"status":{"conditions":[{"type":"PatchStatusFailed","status":"False","reason":"e2e"}]}}`)
   680  		plStatusPatched, err := client.Patch(ctx, plCreated.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
   681  		framework.ExpectNoError(err)
   682  		condition := apihelpers.GetPriorityLevelConfigurationConditionByType(plStatusPatched, flowcontrol.PriorityLevelConfigurationConditionType("PatchStatusFailed"))
   683  		gomega.Expect(condition).NotTo(gomega.BeNil())
   685  		ginkgo.By("updating /status")
   686  		var plStatusUpdated *flowcontrol.PriorityLevelConfiguration
   687  		err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
   688  			pl, err := client.Get(ctx, plCreated.Name, metav1.GetOptions{})
   689  			framework.ExpectNoError(err)
   691  			plStatusToUpdate := pl.DeepCopy()
   692  			plStatusToUpdate.Status.Conditions = append(plStatusToUpdate.Status.Conditions, flowcontrol.PriorityLevelConfigurationCondition{
   693  				Type:    "StatusUpdateFailed",
   694  				Status:  flowcontrol.ConditionFalse,
   695  				Reason:  "E2E",
   696  				Message: "Set from an e2e test",
   697  			})
   698  			plStatusUpdated, err = client.UpdateStatus(ctx, plStatusToUpdate, metav1.UpdateOptions{})
   699  			return err
   700  		})
   701  		framework.ExpectNoError(err, "failed to update status of prioritylevelconfiguration %q", plCreated.Name)
   702  		condition = apihelpers.GetPriorityLevelConfigurationConditionByType(plStatusUpdated, flowcontrol.PriorityLevelConfigurationConditionType("StatusUpdateFailed"))
   703  		gomega.Expect(condition).NotTo(gomega.BeNil())
   705  		ginkgo.By("deleting")
   706  		err = client.Delete(ctx, plCreated.Name, metav1.DeleteOptions{})
   707  		framework.ExpectNoError(err)
   708  		_, err = client.Get(ctx, plCreated.Name, metav1.GetOptions{})
   709  		if !apierrors.IsNotFound(err) {
   710  			framework.Failf("expected 404, got %#v", err)
   711  		}
   713  		list, err = client.List(ctx, metav1.ListOptions{LabelSelector: label})
   714  		framework.ExpectNoError(err)
   715  		gomega.Expect(list.Items).To(gomega.HaveLen(2), "filtered list should have 2 items")
   717  		ginkgo.By("deleting a collection")
   718  		err = client.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: label})
   719  		framework.ExpectNoError(err)
   721  		list, err = client.List(ctx, metav1.ListOptions{LabelSelector: label})
   722  		framework.ExpectNoError(err)
   723  		gomega.Expect(list.Items).To(gomega.BeEmpty(), "filtered list should have 0 items")
   724  	})
   725  })
   727  // createPriorityLevel creates a priority level with the provided assured
   728  // concurrency share.
   729  func createPriorityLevel(ctx context.Context, f *framework.Framework, priorityLevelName string, nominalConcurrencyShares int32) *flowcontrol.PriorityLevelConfiguration {
   730  	createdPriorityLevel, err := f.ClientSet.FlowcontrolV1().PriorityLevelConfigurations().Create(
   731  		ctx,
   732  		&flowcontrol.PriorityLevelConfiguration{
   733  			ObjectMeta: metav1.ObjectMeta{
   734  				Name: priorityLevelName,
   735  			},
   736  			Spec: flowcontrol.PriorityLevelConfigurationSpec{
   737  				Type: flowcontrol.PriorityLevelEnablementLimited,
   738  				Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
   739  					NominalConcurrencyShares: ptr.To(nominalConcurrencyShares),
   740  					LimitResponse: flowcontrol.LimitResponse{
   741  						Type: flowcontrol.LimitResponseTypeReject,
   742  					},
   743  				},
   744  			},
   745  		},
   746  		metav1.CreateOptions{})
   747  	framework.ExpectNoError(err)
   748  	ginkgo.DeferCleanup(f.ClientSet.FlowcontrolV1().PriorityLevelConfigurations().Delete, priorityLevelName, metav1.DeleteOptions{})
   749  	return createdPriorityLevel
   750  }
   752  func getPriorityLevelNominalConcurrency(ctx context.Context, c clientset.Interface, priorityLevelName string) (int32, error) {
   753  	req := c.CoreV1().RESTClient().Get().AbsPath("/metrics")
   754  	resp, err := req.DoRaw(ctx)
   755  	if err != nil {
   756  		return 0, fmt.Errorf("error requesting metrics; request=%#+v, request.URL()=%s: %w", req, req.URL(), err)
   757  	}
   758  	sampleDecoder := expfmt.SampleDecoder{
   759  		Dec:  expfmt.NewDecoder(bytes.NewBuffer(resp), expfmt.FmtText),
   760  		Opts: &expfmt.DecodeOptions{},
   761  	}
   762  	for {
   763  		var v model.Vector
   764  		err := sampleDecoder.Decode(&v)
   765  		if err != nil {
   766  			if err == io.EOF {
   767  				break
   768  			}
   769  			return 0, err
   770  		}
   771  		for _, metric := range v {
   772  			if string(metric.Metric[model.MetricNameLabel]) != nominalConcurrencyLimitMetricName {
   773  				continue
   774  			}
   775  			if string(metric.Metric[priorityLevelLabelName]) != priorityLevelName {
   776  				continue
   777  			}
   778  			return int32(metric.Value), nil
   779  		}
   780  	}
   781  	return 0, errPriorityLevelNotFound
   782  }
   784  // createFlowSchema creates a flow schema referring to a particular priority
   785  // level and matching the username provided.
   786  func createFlowSchema(ctx context.Context, f *framework.Framework, flowSchemaName string, matchingPrecedence int32, priorityLevelName string, matchingUsernames []string) *flowcontrol.FlowSchema {
   787  	var subjects []flowcontrol.Subject
   788  	for _, matchingUsername := range matchingUsernames {
   789  		subjects = append(subjects, flowcontrol.Subject{
   790  			Kind: flowcontrol.SubjectKindUser,
   791  			User: &flowcontrol.UserSubject{
   792  				Name: matchingUsername,
   793  			},
   794  		})
   795  	}
   797  	createdFlowSchema, err := f.ClientSet.FlowcontrolV1().FlowSchemas().Create(
   798  		ctx,
   799  		&flowcontrol.FlowSchema{
   800  			ObjectMeta: metav1.ObjectMeta{
   801  				Name: flowSchemaName,
   802  			},
   803  			Spec: flowcontrol.FlowSchemaSpec{
   804  				MatchingPrecedence: matchingPrecedence,
   805  				PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
   806  					Name: priorityLevelName,
   807  				},
   808  				DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
   809  					Type: flowcontrol.FlowDistinguisherMethodByUserType,
   810  				},
   811  				Rules: []flowcontrol.PolicyRulesWithSubjects{
   812  					{
   813  						Subjects: subjects,
   814  						NonResourceRules: []flowcontrol.NonResourcePolicyRule{
   815  							{
   816  								Verbs:           []string{flowcontrol.VerbAll},
   817  								NonResourceURLs: []string{flowcontrol.NonResourceAll},
   818  							},
   819  						},
   820  					},
   821  				},
   822  			},
   823  		},
   824  		metav1.CreateOptions{})
   825  	framework.ExpectNoError(err)
   826  	ginkgo.DeferCleanup(f.ClientSet.FlowcontrolV1().FlowSchemas().Delete, flowSchemaName, metav1.DeleteOptions{})
   827  	return createdFlowSchema
   828  }
   830  // waitForSteadyState repeatedly polls the API server to check if the newly
   831  // created flow schema and priority level have been seen by the APF controller
   832  // by checking: (1) the dangling priority level reference condition in the flow
   833  // schema status, and (2) metrics. The function times out after 30 seconds.
   834  func waitForSteadyState(ctx context.Context, f *framework.Framework, flowSchemaName string, priorityLevelName string) {
   835  	framework.ExpectNoError(wait.PollWithContext(ctx, time.Second, 30*time.Second, func(ctx context.Context) (bool, error) {
   836  		fs, err := f.ClientSet.FlowcontrolV1().FlowSchemas().Get(ctx, flowSchemaName, metav1.GetOptions{})
   837  		if err != nil {
   838  			return false, err
   839  		}
   840  		condition := apihelpers.GetFlowSchemaConditionByType(fs, flowcontrol.FlowSchemaConditionDangling)
   841  		if condition == nil || condition.Status != flowcontrol.ConditionFalse {
   842  			// The absence of the dangling status object implies that the APF
   843  			// controller isn't done with syncing the flow schema object. And, of
   844  			// course, the condition being anything but false means that steady state
   845  			// hasn't been achieved.
   846  			return false, nil
   847  		}
   848  		_, err = getPriorityLevelNominalConcurrency(ctx, f.ClientSet, priorityLevelName)
   849  		if err != nil {
   850  			if err == errPriorityLevelNotFound {
   851  				return false, nil
   852  			}
   853  			return false, err
   854  		}
   855  		return true, nil
   856  	}))
   857  }
   859  // makeRequests creates a request to the API server and returns the response.
   860  func makeRequest(f *framework.Framework, username string) *http.Response {
   861  	config := f.ClientConfig()
   862  	config.Impersonate.UserName = username
   863  	config.RateLimiter = clientsideflowcontrol.NewFakeAlwaysRateLimiter()
   864  	config.Impersonate.Groups = []string{"system:authenticated"}
   865  	roundTripper, err := rest.TransportFor(config)
   866  	framework.ExpectNoError(err)
   868  	req, err := http.NewRequest(http.MethodGet, f.ClientSet.CoreV1().RESTClient().Get().AbsPath("version").URL().String(), nil)
   869  	framework.ExpectNoError(err)
   871  	response, err := roundTripper.RoundTrip(req)
   872  	framework.ExpectNoError(err)
   873  	return response
   874  }
   876  func getPriorityLevelUID(response *http.Response) string {
   877  	return response.Header.Get(flowcontrol.ResponseHeaderMatchedPriorityLevelConfigurationUID)
   878  }
   880  func getFlowSchemaUID(response *http.Response) string {
   881  	return response.Header.Get(flowcontrol.ResponseHeaderMatchedFlowSchemaUID)
   882  }
   884  // uniformQPSLoadSingle loads the API server with requests at a uniform <qps>
   885  // for <loadDuration> time. The number of successfully completed requests is
   886  // returned.
   887  func uniformQPSLoadSingle(f *framework.Framework, username string, qps float64, loadDuration time.Duration) int32 {
   888  	var completed int32
   889  	var wg sync.WaitGroup
   890  	ticker := time.NewTicker(time.Duration(float64(time.Second) / qps))
   891  	defer ticker.Stop()
   892  	timer := time.NewTimer(loadDuration)
   893  	for {
   894  		select {
   895  		case <-ticker.C:
   896  			wg.Add(1)
   897  			// Each request will have a non-zero latency. In addition, there may be
   898  			// multiple concurrent requests in-flight. As a result, a request may
   899  			// take longer than the time between two different consecutive ticks
   900  			// regardless of whether a requests is accepted or rejected. For example,
   901  			// in cases with clients making requests far above their concurrency
   902  			// share, with little time between consecutive requests, due to limited
   903  			// concurrency, newer requests will be enqueued until older ones
   904  			// complete. Hence the synchronisation with sync.WaitGroup.
   905  			go func() {
   906  				defer wg.Done()
   907  				makeRequest(f, username)
   908  				atomic.AddInt32(&completed, 1)
   909  			}()
   910  		case <-timer.C:
   911  			// Still in-flight requests should not contribute to the completed count.
   912  			totalCompleted := atomic.LoadInt32(&completed)
   913  			wg.Wait() // do not leak goroutines
   914  			return totalCompleted
   915  		}
   916  	}
   917  }
   919  // uniformQPSLoadConcurrent loads the API server with a <concurrency> number of
   920  // clients impersonating to be <username>, each creating requests at a uniform
   921  // rate defined by <qps>. The sum of number of successfully completed requests
   922  // across all concurrent clients is returned.
   923  func uniformQPSLoadConcurrent(f *framework.Framework, username string, concurrency int32, qps float64, loadDuration time.Duration) int32 {
   924  	var completed int32
   925  	var wg sync.WaitGroup
   926  	wg.Add(int(concurrency))
   927  	for i := int32(0); i < concurrency; i++ {
   928  		go func() {
   929  			defer wg.Done()
   930  			atomic.AddInt32(&completed, uniformQPSLoadSingle(f, username, qps, loadDuration))
   931  		}()
   932  	}
   933  	wg.Wait()
   934  	return completed
   935  }

