
Source file src/k8s.io/kubernetes/test/integration/scheduler_perf/util.go

Documentation: k8s.io/kubernetes/test/integration/scheduler_perf

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package benchmark
    19  import (
    20  	"bytes"
    21  	"encoding/json"
    22  	"flag"
    23  	"fmt"
    24  	"math"
    25  	"os"
    26  	"path"
    27  	"sort"
    28  	"strings"
    29  	"time"
    31  	v1 "k8s.io/api/core/v1"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/labels"
    34  	"k8s.io/apimachinery/pkg/util/sets"
    35  	"k8s.io/client-go/informers"
    36  	coreinformers "k8s.io/client-go/informers/core/v1"
    37  	restclient "k8s.io/client-go/rest"
    38  	"k8s.io/component-base/featuregate"
    39  	"k8s.io/component-base/metrics/legacyregistry"
    40  	"k8s.io/component-base/metrics/testutil"
    41  	"k8s.io/klog/v2"
    42  	kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1"
    43  	apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    44  	"k8s.io/kubernetes/pkg/features"
    45  	"k8s.io/kubernetes/pkg/scheduler/apis/config"
    46  	kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
    47  	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
    48  	"k8s.io/kubernetes/test/integration/framework"
    49  	"k8s.io/kubernetes/test/integration/util"
    50  	testutils "k8s.io/kubernetes/test/utils"
    51  	"k8s.io/kubernetes/test/utils/ktesting"
    52  )
    54  const (
    55  	dateFormat               = "2006-01-02T15:04:05Z"
    56  	testNamespace            = "sched-test"
    57  	setupNamespace           = "sched-setup"
    58  	throughputSampleInterval = time.Second
    59  )
    61  var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard")
    63  func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) {
    64  	gvk := kubeschedulerconfigv1.SchemeGroupVersion.WithKind("KubeSchedulerConfiguration")
    65  	cfg := config.KubeSchedulerConfiguration{}
    66  	_, _, err := kubeschedulerscheme.Codecs.UniversalDecoder().Decode(nil, &gvk, &cfg)
    67  	if err != nil {
    68  		return nil, err
    69  	}
    70  	return &cfg, nil
    71  }
    73  // mustSetupCluster starts the following components:
    74  // - k8s api server
    75  // - scheduler
    76  // - some of the kube-controller-manager controllers
    77  //
    78  // It returns regular and dynamic clients, and destroyFunc which should be used to
    79  // remove resources after finished.
    80  // Notes on rate limiter:
    81  //   - client rate limit is set to 5000.
    82  func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, ktesting.TContext) {
    83  	// Run API server with minimimal logging by default. Can be raised with -v.
    84  	framework.MinVerbosity = 0
    86  	// No alpha APIs (overrides api/all=true in https://github.com/kubernetes/kubernetes/blob/d647d19f6aef811bace300eec96a67644ff303d4/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go#L136),
    87  	// except for DRA API group when needed.
    88  	runtimeConfig := []string{"api/alpha=false"}
    89  	if enabledFeatures[features.DynamicResourceAllocation] {
    90  		runtimeConfig = append(runtimeConfig, "resource.k8s.io/v1alpha2=true")
    91  	}
    92  	customFlags := []string{
    93  		// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
    94  		"--disable-admission-plugins=ServiceAccount,TaintNodesByCondition,Priority",
    95  		"--runtime-config=" + strings.Join(runtimeConfig, ","),
    96  	}
    97  	server, err := apiservertesting.StartTestServer(tCtx, apiservertesting.NewDefaultTestServerOptions(), customFlags, framework.SharedEtcd())
    98  	if err != nil {
    99  		tCtx.Fatalf("start apiserver: %v", err)
   100  	}
   101  	tCtx.Cleanup(server.TearDownFn)
   103  	// Cleanup will be in reverse order: first the clients get cancelled,
   104  	// then the apiserver is torn down via the automatic cancelation of
   105  	// tCtx.
   107  	// TODO: client connection configuration, such as QPS or Burst is configurable in theory, this could be derived from the `config`, need to
   108  	// support this when there is any testcase that depends on such configuration.
   109  	cfg := restclient.CopyConfig(server.ClientConfig)
   110  	cfg.QPS = 5000.0
   111  	cfg.Burst = 5000
   113  	// use default component config if config here is nil
   114  	if config == nil {
   115  		var err error
   116  		config, err = newDefaultComponentConfig()
   117  		if err != nil {
   118  			tCtx.Fatalf("Error creating default component config: %v", err)
   119  		}
   120  	}
   122  	tCtx = ktesting.WithRESTConfig(tCtx, cfg)
   124  	// Not all config options will be effective but only those mostly related with scheduler performance will
   125  	// be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`.
   126  	_, informerFactory := util.StartScheduler(tCtx, tCtx.Client(), cfg, config, outOfTreePluginRegistry)
   127  	util.StartFakePVController(tCtx, tCtx.Client(), informerFactory)
   128  	runGC := util.CreateGCController(tCtx, tCtx, *cfg, informerFactory)
   129  	runNS := util.CreateNamespaceController(tCtx, tCtx, *cfg, informerFactory)
   131  	runResourceClaimController := func() {}
   132  	if enabledFeatures[features.DynamicResourceAllocation] {
   133  		// Testing of DRA with inline resource claims depends on this
   134  		// controller for creating and removing ResourceClaims.
   135  		runResourceClaimController = util.CreateResourceClaimController(tCtx, tCtx, tCtx.Client(), informerFactory)
   136  	}
   138  	informerFactory.Start(tCtx.Done())
   139  	informerFactory.WaitForCacheSync(tCtx.Done())
   140  	go runGC()
   141  	go runNS()
   142  	go runResourceClaimController()
   144  	return informerFactory, tCtx
   145  }
   147  // Returns the list of scheduled and unscheduled pods in the specified namespaces.
   148  // Note that no namespaces specified matches all namespaces.
   149  func getScheduledPods(podInformer coreinformers.PodInformer, namespaces ...string) ([]*v1.Pod, []*v1.Pod, error) {
   150  	pods, err := podInformer.Lister().List(labels.Everything())
   151  	if err != nil {
   152  		return nil, nil, err
   153  	}
   155  	s := sets.New(namespaces...)
   156  	scheduled := make([]*v1.Pod, 0, len(pods))
   157  	unscheduled := make([]*v1.Pod, 0, len(pods))
   158  	for i := range pods {
   159  		pod := pods[i]
   160  		if len(s) == 0 || s.Has(pod.Namespace) {
   161  			if len(pod.Spec.NodeName) > 0 {
   162  				scheduled = append(scheduled, pod)
   163  			} else {
   164  				unscheduled = append(unscheduled, pod)
   165  			}
   166  		}
   167  	}
   168  	return scheduled, unscheduled, nil
   169  }
   171  // DataItem is the data point.
   172  type DataItem struct {
   173  	// Data is a map from bucket to real data point (e.g. "Perc90" -> 23.5). Notice
   174  	// that all data items with the same label combination should have the same buckets.
   175  	Data map[string]float64 `json:"data"`
   176  	// Unit is the data unit. Notice that all data items with the same label combination
   177  	// should have the same unit.
   178  	Unit string `json:"unit"`
   179  	// Labels is the labels of the data item.
   180  	Labels map[string]string `json:"labels,omitempty"`
   181  }
   183  // DataItems is the data point set. It is the struct that perf dashboard expects.
   184  type DataItems struct {
   185  	Version   string     `json:"version"`
   186  	DataItems []DataItem `json:"dataItems"`
   187  }
   189  // makeBasePod creates a Pod object to be used as a template.
   190  func makeBasePod() *v1.Pod {
   191  	basePod := &v1.Pod{
   192  		ObjectMeta: metav1.ObjectMeta{
   193  			GenerateName: "pod-",
   194  		},
   195  		Spec: testutils.MakePodSpec(),
   196  	}
   197  	return basePod
   198  }
   200  func dataItems2JSONFile(dataItems DataItems, namePrefix string) error {
   201  	// perfdash expects all data items to have the same set of labels.  It
   202  	// then renders drop-down buttons for each label with all values found
   203  	// for each label. If we were to store data items that don't have a
   204  	// certain label, then perfdash will never show those data items
   205  	// because it will only show data items that have the currently
   206  	// selected label value. To avoid that, we collect all labels used
   207  	// anywhere and then add missing labels with "not applicable" as value.
   208  	labels := sets.New[string]()
   209  	for _, item := range dataItems.DataItems {
   210  		for label := range item.Labels {
   211  			labels.Insert(label)
   212  		}
   213  	}
   214  	for _, item := range dataItems.DataItems {
   215  		for label := range labels {
   216  			if _, ok := item.Labels[label]; !ok {
   217  				item.Labels[label] = "not applicable"
   218  			}
   219  		}
   220  	}
   222  	b, err := json.Marshal(dataItems)
   223  	if err != nil {
   224  		return err
   225  	}
   227  	destFile := fmt.Sprintf("%v_%v.json", namePrefix, time.Now().Format(dateFormat))
   228  	if *dataItemsDir != "" {
   229  		// Ensure the "dataItemsDir" path to be valid.
   230  		if err := os.MkdirAll(*dataItemsDir, 0750); err != nil {
   231  			return fmt.Errorf("dataItemsDir path %v does not exist and cannot be created: %v", *dataItemsDir, err)
   232  		}
   233  		destFile = path.Join(*dataItemsDir, destFile)
   234  	}
   235  	formatted := &bytes.Buffer{}
   236  	if err := json.Indent(formatted, b, "", "  "); err != nil {
   237  		return fmt.Errorf("indenting error: %v", err)
   238  	}
   239  	return os.WriteFile(destFile, formatted.Bytes(), 0644)
   240  }
   242  type labelValues struct {
   243  	label  string
   244  	values []string
   245  }
   247  // metricsCollectorConfig is the config to be marshalled to YAML config file.
   248  // NOTE: The mapping here means only one filter is supported, either value in the list of `values` is able to be collected.
   249  type metricsCollectorConfig struct {
   250  	Metrics map[string]*labelValues
   251  }
   253  // metricsCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint.
   254  // Currently only Histogram metrics are supported.
   255  type metricsCollector struct {
   256  	*metricsCollectorConfig
   257  	labels map[string]string
   258  }
   260  func newMetricsCollector(config *metricsCollectorConfig, labels map[string]string) *metricsCollector {
   261  	return &metricsCollector{
   262  		metricsCollectorConfig: config,
   263  		labels:                 labels,
   264  	}
   265  }
   267  func (*metricsCollector) run(tCtx ktesting.TContext) {
   268  	// metricCollector doesn't need to start before the tests, so nothing to do here.
   269  }
   271  func (pc *metricsCollector) collect() []DataItem {
   272  	var dataItems []DataItem
   273  	for metric, labelVals := range pc.Metrics {
   274  		// no filter is specified, aggregate all the metrics within the same metricFamily.
   275  		if labelVals == nil {
   276  			dataItem := collectHistogramVec(metric, pc.labels, nil)
   277  			if dataItem != nil {
   278  				dataItems = append(dataItems, *dataItem)
   279  			}
   280  		} else {
   281  			// fetch the metric from metricFamily which match each of the lvMap.
   282  			for _, value := range labelVals.values {
   283  				lvMap := map[string]string{labelVals.label: value}
   284  				dataItem := collectHistogramVec(metric, pc.labels, lvMap)
   285  				if dataItem != nil {
   286  					dataItems = append(dataItems, *dataItem)
   287  				}
   288  			}
   289  		}
   290  	}
   291  	return dataItems
   292  }
   294  func collectHistogramVec(metric string, labels map[string]string, lvMap map[string]string) *DataItem {
   295  	vec, err := testutil.GetHistogramVecFromGatherer(legacyregistry.DefaultGatherer, metric, lvMap)
   296  	if err != nil {
   297  		klog.Error(err)
   298  		return nil
   299  	}
   301  	if err := vec.Validate(); err != nil {
   302  		klog.ErrorS(err, "the validation for HistogramVec is failed. The data for this metric won't be stored in a benchmark result file", "metric", metric, "labels", labels)
   303  		return nil
   304  	}
   306  	if vec.GetAggregatedSampleCount() == 0 {
   307  		klog.InfoS("It is expected that this metric wasn't recorded. The data for this metric won't be stored in a benchmark result file", "metric", metric, "labels", labels)
   308  		return nil
   309  	}
   311  	q50 := vec.Quantile(0.50)
   312  	q90 := vec.Quantile(0.90)
   313  	q95 := vec.Quantile(0.95)
   314  	q99 := vec.Quantile(0.99)
   315  	avg := vec.Average()
   317  	msFactor := float64(time.Second) / float64(time.Millisecond)
   319  	// Copy labels and add "Metric" label for this metric.
   320  	labelMap := map[string]string{"Metric": metric}
   321  	for k, v := range labels {
   322  		labelMap[k] = v
   323  	}
   324  	for k, v := range lvMap {
   325  		labelMap[k] = v
   326  	}
   327  	return &DataItem{
   328  		Labels: labelMap,
   329  		Data: map[string]float64{
   330  			"Perc50":  q50 * msFactor,
   331  			"Perc90":  q90 * msFactor,
   332  			"Perc95":  q95 * msFactor,
   333  			"Perc99":  q99 * msFactor,
   334  			"Average": avg * msFactor,
   335  		},
   336  		Unit: "ms",
   337  	}
   338  }
   340  type throughputCollector struct {
   341  	podInformer           coreinformers.PodInformer
   342  	schedulingThroughputs []float64
   343  	labels                map[string]string
   344  	namespaces            []string
   345  	errorMargin           float64
   346  }
   348  func newThroughputCollector(tb ktesting.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector {
   349  	return &throughputCollector{
   350  		podInformer: podInformer,
   351  		labels:      labels,
   352  		namespaces:  namespaces,
   353  		errorMargin: errorMargin,
   354  	}
   355  }
   357  func (tc *throughputCollector) run(tCtx ktesting.TContext) {
   358  	podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...)
   359  	if err != nil {
   360  		klog.Fatalf("%v", err)
   361  	}
   362  	lastScheduledCount := len(podsScheduled)
   363  	ticker := time.NewTicker(throughputSampleInterval)
   364  	defer ticker.Stop()
   365  	lastSampleTime := time.Now()
   366  	started := false
   367  	skipped := 0
   369  	for {
   370  		select {
   371  		case <-tCtx.Done():
   372  			return
   373  		case <-ticker.C:
   374  			now := time.Now()
   375  			podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...)
   376  			if err != nil {
   377  				klog.Fatalf("%v", err)
   378  			}
   380  			scheduled := len(podsScheduled)
   381  			// Only do sampling if number of scheduled pods is greater than zero.
   382  			if scheduled == 0 {
   383  				continue
   384  			}
   385  			if !started {
   386  				started = true
   387  				// Skip the initial sample. It's likely to be an outlier because
   388  				// sampling and creating pods get started independently.
   389  				lastScheduledCount = scheduled
   390  				lastSampleTime = now
   391  				continue
   392  			}
   394  			newScheduled := scheduled - lastScheduledCount
   395  			if newScheduled == 0 {
   396  				// Throughput would be zero for the interval.
   397  				// Instead of recording 0 pods/s, keep waiting
   398  				// until we see at least one additional pod
   399  				// being scheduled.
   400  				skipped++
   401  				continue
   402  			}
   404  			// This should be roughly equal to
   405  			// throughputSampleInterval * (skipped + 1), but we
   406  			// don't count on that because the goroutine might not
   407  			// be scheduled immediately when the timer
   408  			// triggers. Instead we track the actual time stamps.
   409  			duration := now.Sub(lastSampleTime)
   410  			durationInSeconds := duration.Seconds()
   411  			throughput := float64(newScheduled) / durationInSeconds
   412  			expectedDuration := throughputSampleInterval * time.Duration(skipped+1)
   413  			errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100
   414  			if tc.errorMargin > 0 && math.Abs(errorMargin) > tc.errorMargin {
   415  				// This might affect the result, report it.
   416  				tCtx.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin)
   417  			}
   419  			// To keep percentiles accurate, we have to record multiple samples with the same
   420  			// throughput value if we skipped some intervals.
   421  			for i := 0; i <= skipped; i++ {
   422  				tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput)
   423  			}
   424  			lastScheduledCount = scheduled
   425  			klog.Infof("%d pods scheduled", lastScheduledCount)
   426  			skipped = 0
   427  			lastSampleTime = now
   428  		}
   429  	}
   430  }
   432  func (tc *throughputCollector) collect() []DataItem {
   433  	throughputSummary := DataItem{Labels: tc.labels}
   434  	if length := len(tc.schedulingThroughputs); length > 0 {
   435  		sort.Float64s(tc.schedulingThroughputs)
   436  		sum := 0.0
   437  		for i := range tc.schedulingThroughputs {
   438  			sum += tc.schedulingThroughputs[i]
   439  		}
   441  		throughputSummary.Labels["Metric"] = "SchedulingThroughput"
   442  		throughputSummary.Data = map[string]float64{
   443  			"Average": sum / float64(length),
   444  			"Perc50":  tc.schedulingThroughputs[int(math.Ceil(float64(length*50)/100))-1],
   445  			"Perc90":  tc.schedulingThroughputs[int(math.Ceil(float64(length*90)/100))-1],
   446  			"Perc95":  tc.schedulingThroughputs[int(math.Ceil(float64(length*95)/100))-1],
   447  			"Perc99":  tc.schedulingThroughputs[int(math.Ceil(float64(length*99)/100))-1],
   448  		}
   449  		throughputSummary.Unit = "pods/s"
   450  	}
   452  	return []DataItem{throughputSummary}
   453  }

