
Source file src/k8s.io/kubernetes/test/integration/apiserver/flowcontrol/concurrency_util_test.go

Documentation: k8s.io/kubernetes/test/integration/apiserver/flowcontrol

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package flowcontrol
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"io"
    23  	"math"
    24  	"strings"
    25  	"sync"
    26  	"testing"
    27  	"time"
    29  	"github.com/prometheus/common/expfmt"
    30  	"github.com/prometheus/common/model"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apiserver/pkg/authorization/authorizer"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
    36  	"k8s.io/kubernetes/pkg/controlplane"
    37  	"k8s.io/kubernetes/test/integration/framework"
    38  	"k8s.io/kubernetes/test/utils/ktesting"
    39  )
    41  const (
    42  	nominalConcurrencyLimitMetricsName = "apiserver_flowcontrol_nominal_limit_seats"
    43  	requestExecutionSecondsSumName     = "apiserver_flowcontrol_request_execution_seconds_sum"
    44  	requestExecutionSecondsCountName   = "apiserver_flowcontrol_request_execution_seconds_count"
    45  	priorityLevelSeatUtilSumName       = "apiserver_flowcontrol_priority_level_seat_utilization_sum"
    46  	priorityLevelSeatUtilCountName     = "apiserver_flowcontrol_priority_level_seat_utilization_count"
    47  	fakeworkDuration                   = 200 * time.Millisecond
    48  	testWarmUpTime                     = 2 * time.Second
    49  	testTime                           = 10 * time.Second
    50  )
    52  type SumAndCount struct {
    53  	Sum   float64
    54  	Count int
    55  }
    57  type plMetrics struct {
    58  	execSeconds    SumAndCount
    59  	seatUtil       SumAndCount
    60  	availableSeats int
    61  }
    63  // metricSnapshot maps from a priority level label to
    64  // a plMetrics struct containing APF metrics of interest
    65  type metricSnapshot map[string]plMetrics
    67  // Client request latency measurement
    68  type clientLatencyMeasurement struct {
    69  	SumAndCount
    70  	SumSq float64 // latency sum of squares
    71  	Mu    sync.Mutex
    72  }
    74  func (clm *clientLatencyMeasurement) reset() {
    75  	clm.Mu.Lock()
    76  	defer clm.Mu.Unlock()
    77  	clm.Sum = 0
    78  	clm.Count = 0
    79  	clm.SumSq = 0
    80  }
    82  func (clm *clientLatencyMeasurement) update(duration float64) {
    83  	clm.Mu.Lock()
    84  	defer clm.Mu.Unlock()
    85  	clm.Count += 1
    86  	clm.Sum += duration
    87  	clm.SumSq += duration * duration
    88  }
    90  func (clm *clientLatencyMeasurement) getStats() clientLatencyStats {
    91  	clm.Mu.Lock()
    92  	defer clm.Mu.Unlock()
    93  	mean := clm.Sum / float64(clm.Count)
    94  	ss := clm.SumSq - mean*clm.Sum // reduced from ss := sumsq - 2*mean*sum + float64(count)*mean*mean
    95  	// Set ss to 0 if negative value is resulted from floating point calculations
    96  	if ss < 0 {
    97  		ss = 0
    98  	}
    99  	stdDev := math.Sqrt(ss / float64(clm.Count))
   100  	cv := stdDev / mean
   101  	return clientLatencyStats{mean: mean, stdDev: stdDev, cv: cv}
   102  }
   104  type clientLatencyStats struct {
   105  	mean   float64 // latency average
   106  	stdDev float64 // latency population standard deviation
   107  	cv     float64 // latency coefficient of variation
   108  }
   110  type plMetricAvg struct {
   111  	reqExecution float64 // average request execution time
   112  	seatUtil     float64 // average seat utilization
   113  }
   115  func intervalMetricAvg(snapshot0, snapshot1 metricSnapshot, plLabel string) plMetricAvg {
   116  	plmT0 := snapshot0[plLabel]
   117  	plmT1 := snapshot1[plLabel]
   118  	return plMetricAvg{
   119  		reqExecution: (plmT1.execSeconds.Sum - plmT0.execSeconds.Sum) / float64(plmT1.execSeconds.Count-plmT0.execSeconds.Count),
   120  		seatUtil:     (plmT1.seatUtil.Sum - plmT0.seatUtil.Sum) / float64(plmT1.seatUtil.Count-plmT0.seatUtil.Count),
   121  	}
   122  }
   124  type noxuDelayingAuthorizer struct {
   125  	Authorizer authorizer.Authorizer
   126  }
   128  func (d *noxuDelayingAuthorizer) Authorize(ctx context.Context, a authorizer.Attributes) (authorizer.Decision, string, error) {
   129  	if a.GetUser().GetName() == "noxu1" || a.GetUser().GetName() == "noxu2" {
   130  		time.Sleep(fakeworkDuration) // simulate fake work with sleep
   131  	}
   132  	return d.Authorizer.Authorize(ctx, a)
   133  }
   135  // TestConcurrencyIsolation tests the concurrency isolation between priority levels.
   136  // The test defines two priority levels for this purpose, and corresponding flow schemas.
   137  // To one priority level, this test sends many more concurrent requests than the configuration
   138  // allows to execute at once, while sending fewer than allowed to the other priority level.
   139  // The primary check is that the low flow gets all the seats it wants, but is modulated by
   140  // recognizing that there are uncontrolled overheads in the system.
   141  //
   142  // This test differs from TestPriorityLevelIsolation since TestPriorityLevelIsolation checks throughput instead
   143  // of concurrency. In order to mitigate the effects of system noise, a delaying authorizer is used to artificially
   144  // increase request execution time to make the system noise relatively insignificant.
   145  // Secondarily, this test also checks the observed seat utilizations against values derived from expecting that
   146  // the throughput observed by the client equals the execution throughput observed by the server.
   147  func TestConcurrencyIsolation(t *testing.T) {
   148  	tCtx := ktesting.Init(t)
   149  	_, kubeConfig, closeFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
   150  		ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
   151  			// Ensure all clients are allowed to send requests.
   152  			opts.Authorization.Modes = []string{"AlwaysAllow"}
   153  			opts.GenericServerRunOptions.MaxRequestsInFlight = 10
   154  			opts.GenericServerRunOptions.MaxMutatingRequestsInFlight = 10
   155  		},
   156  		ModifyServerConfig: func(config *controlplane.Config) {
   157  			// Wrap default authorizer with one that delays requests from noxu clients
   158  			config.GenericConfig.Authorization.Authorizer = &noxuDelayingAuthorizer{config.GenericConfig.Authorization.Authorizer}
   159  		},
   160  	})
   161  	defer closeFn()
   163  	loopbackClient := clientset.NewForConfigOrDie(kubeConfig)
   164  	noxu1Client := getClientFor(kubeConfig, "noxu1")
   165  	noxu2Client := getClientFor(kubeConfig, "noxu2")
   167  	queueLength := 50
   168  	concurrencyShares := 100
   170  	plNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
   171  		loopbackClient, "noxu1", concurrencyShares, queueLength)
   172  	if err != nil {
   173  		t.Error(err)
   174  	}
   175  	plNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(
   176  		loopbackClient, "noxu2", concurrencyShares, queueLength)
   177  	if err != nil {
   178  		t.Error(err)
   179  	}
   181  	stopCh := make(chan struct{})
   182  	wg := sync.WaitGroup{}
   184  	// "elephant"
   185  	noxu1NumGoroutines := 5 + queueLength
   186  	var noxu1LatMeasure clientLatencyMeasurement
   187  	wg.Add(noxu1NumGoroutines)
   188  	streamRequests(noxu1NumGoroutines, func() {
   189  		start := time.Now()
   190  		_, err := noxu1Client.CoreV1().Namespaces().Get(tCtx, "default", metav1.GetOptions{})
   191  		duration := time.Since(start).Seconds()
   192  		noxu1LatMeasure.update(duration)
   193  		if err != nil {
   194  			t.Error(err)
   195  		}
   196  	}, &wg, stopCh)
   197  	// "mouse"
   198  	noxu2NumGoroutines := 3
   199  	var noxu2LatMeasure clientLatencyMeasurement
   200  	wg.Add(noxu2NumGoroutines)
   201  	streamRequests(noxu2NumGoroutines, func() {
   202  		start := time.Now()
   203  		_, err := noxu2Client.CoreV1().Namespaces().Get(tCtx, "default", metav1.GetOptions{})
   204  		duration := time.Since(start).Seconds()
   205  		noxu2LatMeasure.update(duration)
   206  		if err != nil {
   207  			t.Error(err)
   208  		}
   209  	}, &wg, stopCh)
   211  	// Warm up
   212  	time.Sleep(testWarmUpTime)
   214  	noxu1LatMeasure.reset()
   215  	noxu2LatMeasure.reset()
   216  	snapshot0, err := getRequestMetricsSnapshot(loopbackClient)
   217  	if err != nil {
   218  		t.Error(err)
   219  	}
   220  	time.Sleep(testTime) // after warming up, the test enters a steady state
   221  	snapshot1, err := getRequestMetricsSnapshot(loopbackClient)
   222  	if err != nil {
   223  		t.Error(err)
   224  	}
   225  	close(stopCh)
   227  	// Check the assumptions of the test
   228  	noxu1T0 := snapshot0[plNoxu1.Name]
   229  	noxu1T1 := snapshot1[plNoxu1.Name]
   230  	noxu2T0 := snapshot0[plNoxu2.Name]
   231  	noxu2T1 := snapshot1[plNoxu2.Name]
   232  	if noxu1T0.seatUtil.Count >= noxu1T1.seatUtil.Count || noxu2T0.seatUtil.Count >= noxu2T1.seatUtil.Count {
   233  		t.Errorf("SeatUtilCount check failed: noxu1 t0 count %d, t1 count %d; noxu2 t0 count %d, t1 count %d",
   234  			noxu1T0.seatUtil.Count, noxu1T1.seatUtil.Count, noxu2T0.seatUtil.Count, noxu2T1.seatUtil.Count)
   235  	}
   236  	t.Logf("noxu1 priority level concurrency limit: %d", noxu1T0.availableSeats)
   237  	t.Logf("noxu2 priority level concurrency limit: %d", noxu2T0.availableSeats)
   238  	if (noxu1T0.availableSeats != noxu1T1.availableSeats) || (noxu2T0.availableSeats != noxu2T1.availableSeats) {
   239  		t.Errorf("The number of available seats changed: noxu1 (%d, %d) noxu2 (%d, %d)",
   240  			noxu1T0.availableSeats, noxu1T1.availableSeats, noxu2T0.availableSeats, noxu2T1.availableSeats)
   241  	}
   242  	if (noxu1T0.availableSeats <= 4) || (noxu2T0.availableSeats <= 4) {
   243  		t.Errorf("The number of available seats for test client priority levels are too small: (%d, %d). Expecting a number > 4",
   244  			noxu1T0.availableSeats, noxu2T0.availableSeats)
   245  	}
   246  	// No requests should be rejected under normal situations
   247  	_, rejectedReqCounts, err := getRequestCountOfPriorityLevel(loopbackClient)
   248  	if err != nil {
   249  		t.Error(err)
   250  	}
   251  	if rejectedReqCounts[plNoxu1.Name] > 0 {
   252  		t.Errorf(`%d requests from the "elephant" stream were rejected unexpectedly`, rejectedReqCounts[plNoxu1.Name])
   253  	}
   254  	if rejectedReqCounts[plNoxu2.Name] > 0 {
   255  		t.Errorf(`%d requests from the "mouse" stream were rejected unexpectedly`, rejectedReqCounts[plNoxu2.Name])
   256  	}
   258  	// Calculate APF server side metric averages during the test interval
   259  	noxu1Avg := intervalMetricAvg(snapshot0, snapshot1, plNoxu1.Name)
   260  	noxu2Avg := intervalMetricAvg(snapshot0, snapshot1, plNoxu2.Name)
   261  	t.Logf("\nnoxu1 avg request execution time %v\nnoxu2 avg request execution time %v", noxu1Avg.reqExecution, noxu2Avg.reqExecution)
   262  	t.Logf("\nnoxu1 avg seat utilization %v\nnoxu2 avg seat utilization %v", noxu1Avg.seatUtil, noxu2Avg.seatUtil)
   264  	// Wait till the client goroutines finish before computing the client side request latency statistics
   265  	wg.Wait()
   266  	noxu1LatStats := noxu1LatMeasure.getStats()
   267  	noxu2LatStats := noxu2LatMeasure.getStats()
   268  	t.Logf("noxu1 client request count %d duration mean %v stddev %v cv %v", noxu1LatMeasure.Count, noxu1LatStats.mean, noxu1LatStats.stdDev, noxu1LatStats.cv)
   269  	t.Logf("noxu2 client request count %d duration mean %v stddev %v cv %v", noxu2LatMeasure.Count, noxu2LatStats.mean, noxu2LatStats.stdDev, noxu2LatStats.cv)
   271  	// Calculate server-side observed concurrency
   272  	noxu1ObservedConcurrency := noxu1Avg.seatUtil * float64(noxu1T0.availableSeats)
   273  	noxu2ObservedConcurrency := noxu2Avg.seatUtil * float64(noxu2T0.availableSeats)
   274  	// Expected concurrency is derived from equal throughput assumption on both the client-side and the server-side
   275  	noxu1ExpectedConcurrency := float64(noxu1NumGoroutines) * noxu1Avg.reqExecution / noxu1LatStats.mean
   276  	noxu2ExpectedConcurrency := float64(noxu2NumGoroutines) * noxu2Avg.reqExecution / noxu2LatStats.mean
   277  	t.Logf("Concurrency of noxu1:noxu2 - expected (%v:%v), observed (%v:%v)", noxu1ExpectedConcurrency, noxu2ExpectedConcurrency, noxu1ObservedConcurrency, noxu2ObservedConcurrency)
   279  	// There are uncontrolled overheads that introduce noise into the system. The coefficient of variation (CV), that is,
   280  	// standard deviation divided by mean, for a class of traffic is a characterization of all the noise that applied to
   281  	// that class. We found that noxu1 generally had a much bigger CV than noxu2. This makes sense, because noxu1 probes
   282  	// more behavior --- the waiting in queues. So we take the minimum of the two as an indicator of the relative amount
   283  	// of noise that comes from all the other behavior. Currently, we use 2 times the experienced coefficient of variation
   284  	// as the margin of error.
   285  	margin := 2 * math.Min(noxu1LatStats.cv, noxu2LatStats.cv)
   286  	t.Logf("Error margin is %v", margin)
   288  	isConcurrencyExpected := func(name string, observed float64, expected float64) bool {
   289  		relativeErr := math.Abs(expected-observed) / expected
   290  		t.Logf("%v relative error is %v", name, relativeErr)
   291  		return relativeErr <= margin
   292  	}
   293  	if !isConcurrencyExpected(plNoxu1.Name, noxu1ObservedConcurrency, noxu1ExpectedConcurrency) {
   294  		t.Errorf("Concurrency observed by noxu1 is off. Expected: %v, observed: %v", noxu1ExpectedConcurrency, noxu1ObservedConcurrency)
   295  	}
   296  	if !isConcurrencyExpected(plNoxu2.Name, noxu2ObservedConcurrency, noxu2ExpectedConcurrency) {
   297  		t.Errorf("Concurrency observed by noxu2 is off. Expected: %v, observed: %v", noxu2ExpectedConcurrency, noxu2ObservedConcurrency)
   298  	}
   300  	// Check the server-side APF seat utilization measurements
   301  	if math.Abs(1-noxu1Avg.seatUtil) > 0.05 {
   302  		t.Errorf("noxu1Avg.seatUtil=%v is too far from expected=1.0", noxu1Avg.seatUtil)
   303  	}
   304  	noxu2ExpectedSeatUtil := float64(noxu2NumGoroutines) / float64(noxu2T0.availableSeats)
   305  	if math.Abs(noxu2ExpectedSeatUtil-noxu2Avg.seatUtil) > 0.05 {
   306  		t.Errorf("noxu2Avg.seatUtil=%v is too far from expected=%v", noxu2Avg.seatUtil, noxu2ExpectedSeatUtil)
   307  	}
   308  }
   310  func getRequestMetricsSnapshot(c clientset.Interface) (metricSnapshot, error) {
   312  	resp, err := getMetrics(c)
   313  	if err != nil {
   314  		return nil, err
   315  	}
   317  	dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText)
   318  	decoder := expfmt.SampleDecoder{
   319  		Dec:  dec,
   320  		Opts: &expfmt.DecodeOptions{},
   321  	}
   323  	snapshot := metricSnapshot{}
   325  	for {
   326  		var v model.Vector
   327  		if err := decoder.Decode(&v); err != nil {
   328  			if err == io.EOF {
   329  				// Expected loop termination condition.
   330  				return snapshot, nil
   331  			}
   332  			return nil, fmt.Errorf("failed decoding metrics: %v", err)
   333  		}
   334  		for _, metric := range v {
   335  			plLabel := string(metric.Metric[labelPriorityLevel])
   336  			entry := plMetrics{}
   337  			if v, ok := snapshot[plLabel]; ok {
   338  				entry = v
   339  			}
   340  			switch name := string(metric.Metric[model.MetricNameLabel]); name {
   341  			case requestExecutionSecondsSumName:
   342  				entry.execSeconds.Sum = float64(metric.Value)
   343  			case requestExecutionSecondsCountName:
   344  				entry.execSeconds.Count = int(metric.Value)
   345  			case priorityLevelSeatUtilSumName:
   346  				entry.seatUtil.Sum = float64(metric.Value)
   347  			case priorityLevelSeatUtilCountName:
   348  				entry.seatUtil.Count = int(metric.Value)
   349  			case nominalConcurrencyLimitMetricsName:
   350  				entry.availableSeats = int(metric.Value)
   351  			}
   352  			snapshot[plLabel] = entry
   353  		}
   354  	}
   355  }

View as plain text