
Source file src/k8s.io/kubernetes/test/integration/apiserver/flowcontrol/concurrency_test.go

Documentation: k8s.io/kubernetes/test/integration/apiserver/flowcontrol

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package flowcontrol
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"io"
    23  	"strings"
    24  	"sync"
    25  	"testing"
    26  	"time"
    28  	"github.com/prometheus/common/expfmt"
    29  	"github.com/prometheus/common/model"
    31  	flowcontrol "k8s.io/api/flowcontrol/v1"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/util/wait"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	"k8s.io/client-go/rest"
    36  	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
    37  	"k8s.io/kubernetes/test/integration/framework"
    38  	"k8s.io/kubernetes/test/utils/ktesting"
    39  	"k8s.io/utils/ptr"
    40  )
    42  const (
    43  	nominalConcurrencyMetricsName     = "apiserver_flowcontrol_nominal_limit_seats"
    44  	dispatchedRequestCountMetricsName = "apiserver_flowcontrol_dispatched_requests_total"
    45  	rejectedRequestCountMetricsName   = "apiserver_flowcontrol_rejected_requests_total"
    46  	labelPriorityLevel                = "priority_level"
    47  	timeout                           = time.Second * 10
    48  )
    50  func setup(t testing.TB, maxReadonlyRequestsInFlight, maxMutatingRequestsInFlight int) (context.Context, *rest.Config, framework.TearDownFunc) {
    51  	tCtx := ktesting.Init(t)
    53  	_, kubeConfig, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
    54  		ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
    55  			// Ensure all clients are allowed to send requests.
    56  			opts.Authorization.Modes = []string{"AlwaysAllow"}
    57  			opts.GenericServerRunOptions.MaxRequestsInFlight = maxReadonlyRequestsInFlight
    58  			opts.GenericServerRunOptions.MaxMutatingRequestsInFlight = maxMutatingRequestsInFlight
    59  		},
    60  	})
    62  	newTeardown := func() {
    63  		tCtx.Cancel("tearing down apiserver")
    64  		tearDownFn()
    65  	}
    66  	return tCtx, kubeConfig, newTeardown
    67  }
    69  func TestPriorityLevelIsolation(t *testing.T) {
    70  	ctx, kubeConfig, closeFn := setup(t, 1, 1)
    71  	defer closeFn()
    73  	loopbackClient := clientset.NewForConfigOrDie(kubeConfig)
    74  	noxu1Client := getClientFor(kubeConfig, "noxu1")
    75  	noxu2Client := getClientFor(kubeConfig, "noxu2")
    77  	queueLength := 50
    78  	concurrencyShares := 1
    80  	priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
    81  		loopbackClient, "noxu1", concurrencyShares, queueLength)
    82  	if err != nil {
    83  		t.Error(err)
    84  	}
    85  	priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
    86  		loopbackClient, "noxu2", concurrencyShares, queueLength)
    87  	if err != nil {
    88  		t.Error(err)
    89  	}
    91  	nominalConcurrency, err := getNominalConcurrencyOfPriorityLevel(loopbackClient)
    92  	if err != nil {
    93  		t.Error(err)
    94  	}
    96  	if 1 != nominalConcurrency[priorityLevelNoxu1.Name] {
    97  		t.Errorf("unexpected shared concurrency %v instead of %v", nominalConcurrency[priorityLevelNoxu1.Name], 1)
    98  	}
    99  	if 1 != nominalConcurrency[priorityLevelNoxu2.Name] {
   100  		t.Errorf("unexpected shared concurrency %v instead of %v", nominalConcurrency[priorityLevelNoxu2.Name], 1)
   101  	}
   103  	stopCh := make(chan struct{})
   104  	wg := sync.WaitGroup{}
   105  	defer func() {
   106  		close(stopCh)
   107  		wg.Wait()
   108  	}()
   110  	// "elephant"
   111  	wg.Add(concurrencyShares + queueLength)
   112  	streamRequests(concurrencyShares+queueLength, func() {
   113  		_, err := noxu1Client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
   114  		if err != nil {
   115  			t.Error(err)
   116  		}
   117  	}, &wg, stopCh)
   118  	// "mouse"
   119  	wg.Add(3)
   120  	streamRequests(3, func() {
   121  		_, err := noxu2Client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
   122  		if err != nil {
   123  			t.Error(err)
   124  		}
   125  	}, &wg, stopCh)
   127  	time.Sleep(time.Second * 10) // running in background for a while
   129  	allDispatchedReqCounts, rejectedReqCounts, err := getRequestCountOfPriorityLevel(loopbackClient)
   130  	if err != nil {
   131  		t.Error(err)
   132  	}
   134  	noxu1RequestCount := allDispatchedReqCounts[priorityLevelNoxu1.Name]
   135  	noxu2RequestCount := allDispatchedReqCounts[priorityLevelNoxu2.Name]
   137  	if rejectedReqCounts[priorityLevelNoxu1.Name] > 0 {
   138  		t.Errorf(`%v requests from the "elephant" stream were rejected unexpectedly`, rejectedReqCounts[priorityLevelNoxu2.Name])
   139  	}
   140  	if rejectedReqCounts[priorityLevelNoxu2.Name] > 0 {
   141  		t.Errorf(`%v requests from the "mouse" stream were rejected unexpectedly`, rejectedReqCounts[priorityLevelNoxu2.Name])
   142  	}
   144  	// Theoretically, the actual expected value of request counts upon the two priority-level should be
   145  	// the equal. We're deliberately lax to make flakes super rare.
   146  	if (noxu1RequestCount/2) > noxu2RequestCount || (noxu2RequestCount/2) > noxu1RequestCount {
   147  		t.Errorf("imbalanced requests made by noxu1/2: (%d:%d)", noxu1RequestCount, noxu2RequestCount)
   148  	}
   149  }
   151  func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interface {
   152  	config := rest.CopyConfig(loopbackConfig)
   153  	config.Impersonate = rest.ImpersonationConfig{
   154  		UserName: username,
   155  	}
   156  	return clientset.NewForConfigOrDie(config)
   157  }
   159  func getMetrics(c clientset.Interface) (string, error) {
   160  	resp, err := c.CoreV1().
   161  		RESTClient().
   162  		Get().
   163  		RequestURI("/metrics").
   164  		DoRaw(context.Background())
   165  	if err != nil {
   166  		return "", err
   167  	}
   168  	return string(resp), err
   169  }
   171  func getNominalConcurrencyOfPriorityLevel(c clientset.Interface) (map[string]int, error) {
   172  	resp, err := getMetrics(c)
   173  	if err != nil {
   174  		return nil, err
   175  	}
   177  	dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
   178  	decoder := expfmt.SampleDecoder{
   179  		Dec:  dec,
   180  		Opts: &expfmt.DecodeOptions{},
   181  	}
   183  	concurrency := make(map[string]int)
   184  	for {
   185  		var v model.Vector
   186  		if err := decoder.Decode(&v); err != nil {
   187  			if err == io.EOF {
   188  				// Expected loop termination condition.
   189  				return concurrency, nil
   190  			}
   191  			return nil, fmt.Errorf("failed decoding metrics: %v", err)
   192  		}
   193  		for _, metric := range v {
   194  			switch name := string(metric.Metric[model.MetricNameLabel]); name {
   195  			case nominalConcurrencyMetricsName:
   196  				concurrency[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
   197  			}
   198  		}
   199  	}
   200  }
   202  func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, map[string]int, error) {
   203  	resp, err := getMetrics(c)
   204  	if err != nil {
   205  		return nil, nil, err
   206  	}
   208  	dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
   209  	decoder := expfmt.SampleDecoder{
   210  		Dec:  dec,
   211  		Opts: &expfmt.DecodeOptions{},
   212  	}
   214  	allReqCounts := make(map[string]int)
   215  	rejectReqCounts := make(map[string]int)
   216  	for {
   217  		var v model.Vector
   218  		if err := decoder.Decode(&v); err != nil {
   219  			if err == io.EOF {
   220  				// Expected loop termination condition.
   221  				return allReqCounts, rejectReqCounts, nil
   222  			}
   223  			return nil, nil, fmt.Errorf("failed decoding metrics: %v", err)
   224  		}
   225  		for _, metric := range v {
   226  			switch name := string(metric.Metric[model.MetricNameLabel]); name {
   227  			case dispatchedRequestCountMetricsName:
   228  				allReqCounts[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
   229  			case rejectedRequestCountMetricsName:
   230  				rejectReqCounts[string(metric.Metric[labelPriorityLevel])] = int(metric.Value)
   231  			}
   232  		}
   233  	}
   234  }
   236  func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string, concurrencyShares, queuelength int) (*flowcontrol.PriorityLevelConfiguration, *flowcontrol.FlowSchema, error) {
   237  	i0 := int32(0)
   238  	pl, err := c.FlowcontrolV1().PriorityLevelConfigurations().Create(context.Background(), &flowcontrol.PriorityLevelConfiguration{
   239  		ObjectMeta: metav1.ObjectMeta{
   240  			Name: username,
   241  		},
   242  		Spec: flowcontrol.PriorityLevelConfigurationSpec{
   243  			Type: flowcontrol.PriorityLevelEnablementLimited,
   244  			Limited: &flowcontrol.LimitedPriorityLevelConfiguration{
   245  				NominalConcurrencyShares: ptr.To(int32(concurrencyShares)),
   246  				BorrowingLimitPercent:    &i0,
   247  				LimitResponse: flowcontrol.LimitResponse{
   248  					Type: flowcontrol.LimitResponseTypeQueue,
   249  					Queuing: &flowcontrol.QueuingConfiguration{
   250  						Queues:           100,
   251  						HandSize:         1,
   252  						QueueLengthLimit: int32(queuelength),
   253  					},
   254  				},
   255  			},
   256  		},
   257  	}, metav1.CreateOptions{})
   258  	if err != nil {
   259  		return nil, nil, err
   260  	}
   261  	fs, err := c.FlowcontrolV1().FlowSchemas().Create(context.TODO(), &flowcontrol.FlowSchema{
   262  		ObjectMeta: metav1.ObjectMeta{
   263  			Name: username,
   264  		},
   265  		Spec: flowcontrol.FlowSchemaSpec{
   266  			DistinguisherMethod: &flowcontrol.FlowDistinguisherMethod{
   267  				Type: flowcontrol.FlowDistinguisherMethodByUserType,
   268  			},
   269  			MatchingPrecedence: 1000,
   270  			PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
   271  				Name: username,
   272  			},
   273  			Rules: []flowcontrol.PolicyRulesWithSubjects{
   274  				{
   275  					ResourceRules: []flowcontrol.ResourcePolicyRule{
   276  						{
   277  							Verbs:        []string{flowcontrol.VerbAll},
   278  							APIGroups:    []string{flowcontrol.APIGroupAll},
   279  							Resources:    []string{flowcontrol.ResourceAll},
   280  							Namespaces:   []string{flowcontrol.NamespaceEvery},
   281  							ClusterScope: true,
   282  						},
   283  					},
   284  					Subjects: []flowcontrol.Subject{
   285  						{
   286  							Kind: flowcontrol.SubjectKindUser,
   287  							User: &flowcontrol.UserSubject{
   288  								Name: username,
   289  							},
   290  						},
   291  					},
   292  				},
   293  			},
   294  		},
   295  	}, metav1.CreateOptions{})
   296  	if err != nil {
   297  		return nil, nil, err
   298  	}
   300  	return pl, fs, wait.Poll(time.Second, timeout, func() (bool, error) {
   301  		fs, err := c.FlowcontrolV1().FlowSchemas().Get(context.TODO(), username, metav1.GetOptions{})
   302  		if err != nil {
   303  			return false, err
   304  		}
   305  		for _, condition := range fs.Status.Conditions {
   306  			if condition.Type == flowcontrol.FlowSchemaConditionDangling {
   307  				if condition.Status == flowcontrol.ConditionFalse {
   308  					return true, nil
   309  				}
   310  			}
   311  		}
   312  		return false, nil
   313  	})
   314  }
   316  func streamRequests(parallel int, request func(), wg *sync.WaitGroup, stopCh <-chan struct{}) {
   317  	for i := 0; i < parallel; i++ {
   318  		go func() {
   319  			defer wg.Done()
   320  			for {
   321  				select {
   322  				case <-stopCh:
   323  					return
   324  				default:
   325  					request()
   326  				}
   327  			}
   328  		}()
   329  	}
   330  }

View as plain text