...

Source file src/k8s.io/kubernetes/test/integration/scheduler_perf/util.go

Documentation: k8s.io/kubernetes/test/integration/scheduler_perf

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package benchmark
    18  
    19  import (
    20  	"bytes"
    21  	"encoding/json"
    22  	"flag"
    23  	"fmt"
    24  	"math"
    25  	"os"
    26  	"path"
    27  	"sort"
    28  	"strings"
    29  	"time"
    30  
    31  	v1 "k8s.io/api/core/v1"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/labels"
    34  	"k8s.io/apimachinery/pkg/util/sets"
    35  	"k8s.io/client-go/informers"
    36  	coreinformers "k8s.io/client-go/informers/core/v1"
    37  	restclient "k8s.io/client-go/rest"
    38  	"k8s.io/component-base/featuregate"
    39  	"k8s.io/component-base/metrics/legacyregistry"
    40  	"k8s.io/component-base/metrics/testutil"
    41  	"k8s.io/klog/v2"
    42  	kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1"
    43  	apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    44  	"k8s.io/kubernetes/pkg/features"
    45  	"k8s.io/kubernetes/pkg/scheduler/apis/config"
    46  	kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
    47  	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
    48  	"k8s.io/kubernetes/test/integration/framework"
    49  	"k8s.io/kubernetes/test/integration/util"
    50  	testutils "k8s.io/kubernetes/test/utils"
    51  	"k8s.io/kubernetes/test/utils/ktesting"
    52  )
    53  
    54  const (
    55  	dateFormat               = "2006-01-02T15:04:05Z"
    56  	testNamespace            = "sched-test"
    57  	setupNamespace           = "sched-setup"
    58  	throughputSampleInterval = time.Second
    59  )
    60  
    61  var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard")
    62  
    63  func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) {
    64  	gvk := kubeschedulerconfigv1.SchemeGroupVersion.WithKind("KubeSchedulerConfiguration")
    65  	cfg := config.KubeSchedulerConfiguration{}
    66  	_, _, err := kubeschedulerscheme.Codecs.UniversalDecoder().Decode(nil, &gvk, &cfg)
    67  	if err != nil {
    68  		return nil, err
    69  	}
    70  	return &cfg, nil
    71  }
    72  
    73  // mustSetupCluster starts the following components:
    74  // - k8s api server
    75  // - scheduler
    76  // - some of the kube-controller-manager controllers
    77  //
    78  // It returns regular and dynamic clients, and destroyFunc which should be used to
    79  // remove resources after finished.
    80  // Notes on rate limiter:
    81  //   - client rate limit is set to 5000.
    82  func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, ktesting.TContext) {
    83  	// Run API server with minimimal logging by default. Can be raised with -v.
    84  	framework.MinVerbosity = 0
    85  
    86  	// No alpha APIs (overrides api/all=true in https://github.com/kubernetes/kubernetes/blob/d647d19f6aef811bace300eec96a67644ff303d4/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/testing/testserver.go#L136),
    87  	// except for DRA API group when needed.
    88  	runtimeConfig := []string{"api/alpha=false"}
    89  	if enabledFeatures[features.DynamicResourceAllocation] {
    90  		runtimeConfig = append(runtimeConfig, "resource.k8s.io/v1alpha2=true")
    91  	}
    92  	customFlags := []string{
    93  		// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
    94  		"--disable-admission-plugins=ServiceAccount,TaintNodesByCondition,Priority",
    95  		"--runtime-config=" + strings.Join(runtimeConfig, ","),
    96  	}
    97  	server, err := apiservertesting.StartTestServer(tCtx, apiservertesting.NewDefaultTestServerOptions(), customFlags, framework.SharedEtcd())
    98  	if err != nil {
    99  		tCtx.Fatalf("start apiserver: %v", err)
   100  	}
   101  	tCtx.Cleanup(server.TearDownFn)
   102  
   103  	// Cleanup will be in reverse order: first the clients get cancelled,
   104  	// then the apiserver is torn down via the automatic cancelation of
   105  	// tCtx.
   106  
   107  	// TODO: client connection configuration, such as QPS or Burst is configurable in theory, this could be derived from the `config`, need to
   108  	// support this when there is any testcase that depends on such configuration.
   109  	cfg := restclient.CopyConfig(server.ClientConfig)
   110  	cfg.QPS = 5000.0
   111  	cfg.Burst = 5000
   112  
   113  	// use default component config if config here is nil
   114  	if config == nil {
   115  		var err error
   116  		config, err = newDefaultComponentConfig()
   117  		if err != nil {
   118  			tCtx.Fatalf("Error creating default component config: %v", err)
   119  		}
   120  	}
   121  
   122  	tCtx = ktesting.WithRESTConfig(tCtx, cfg)
   123  
   124  	// Not all config options will be effective but only those mostly related with scheduler performance will
   125  	// be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`.
   126  	_, informerFactory := util.StartScheduler(tCtx, tCtx.Client(), cfg, config, outOfTreePluginRegistry)
   127  	util.StartFakePVController(tCtx, tCtx.Client(), informerFactory)
   128  	runGC := util.CreateGCController(tCtx, tCtx, *cfg, informerFactory)
   129  	runNS := util.CreateNamespaceController(tCtx, tCtx, *cfg, informerFactory)
   130  
   131  	runResourceClaimController := func() {}
   132  	if enabledFeatures[features.DynamicResourceAllocation] {
   133  		// Testing of DRA with inline resource claims depends on this
   134  		// controller for creating and removing ResourceClaims.
   135  		runResourceClaimController = util.CreateResourceClaimController(tCtx, tCtx, tCtx.Client(), informerFactory)
   136  	}
   137  
   138  	informerFactory.Start(tCtx.Done())
   139  	informerFactory.WaitForCacheSync(tCtx.Done())
   140  	go runGC()
   141  	go runNS()
   142  	go runResourceClaimController()
   143  
   144  	return informerFactory, tCtx
   145  }
   146  
   147  // Returns the list of scheduled and unscheduled pods in the specified namespaces.
   148  // Note that no namespaces specified matches all namespaces.
   149  func getScheduledPods(podInformer coreinformers.PodInformer, namespaces ...string) ([]*v1.Pod, []*v1.Pod, error) {
   150  	pods, err := podInformer.Lister().List(labels.Everything())
   151  	if err != nil {
   152  		return nil, nil, err
   153  	}
   154  
   155  	s := sets.New(namespaces...)
   156  	scheduled := make([]*v1.Pod, 0, len(pods))
   157  	unscheduled := make([]*v1.Pod, 0, len(pods))
   158  	for i := range pods {
   159  		pod := pods[i]
   160  		if len(s) == 0 || s.Has(pod.Namespace) {
   161  			if len(pod.Spec.NodeName) > 0 {
   162  				scheduled = append(scheduled, pod)
   163  			} else {
   164  				unscheduled = append(unscheduled, pod)
   165  			}
   166  		}
   167  	}
   168  	return scheduled, unscheduled, nil
   169  }
   170  
   171  // DataItem is the data point.
   172  type DataItem struct {
   173  	// Data is a map from bucket to real data point (e.g. "Perc90" -> 23.5). Notice
   174  	// that all data items with the same label combination should have the same buckets.
   175  	Data map[string]float64 `json:"data"`
   176  	// Unit is the data unit. Notice that all data items with the same label combination
   177  	// should have the same unit.
   178  	Unit string `json:"unit"`
   179  	// Labels is the labels of the data item.
   180  	Labels map[string]string `json:"labels,omitempty"`
   181  }
   182  
   183  // DataItems is the data point set. It is the struct that perf dashboard expects.
   184  type DataItems struct {
   185  	Version   string     `json:"version"`
   186  	DataItems []DataItem `json:"dataItems"`
   187  }
   188  
   189  // makeBasePod creates a Pod object to be used as a template.
   190  func makeBasePod() *v1.Pod {
   191  	basePod := &v1.Pod{
   192  		ObjectMeta: metav1.ObjectMeta{
   193  			GenerateName: "pod-",
   194  		},
   195  		Spec: testutils.MakePodSpec(),
   196  	}
   197  	return basePod
   198  }
   199  
   200  func dataItems2JSONFile(dataItems DataItems, namePrefix string) error {
   201  	// perfdash expects all data items to have the same set of labels.  It
   202  	// then renders drop-down buttons for each label with all values found
   203  	// for each label. If we were to store data items that don't have a
   204  	// certain label, then perfdash will never show those data items
   205  	// because it will only show data items that have the currently
   206  	// selected label value. To avoid that, we collect all labels used
   207  	// anywhere and then add missing labels with "not applicable" as value.
   208  	labels := sets.New[string]()
   209  	for _, item := range dataItems.DataItems {
   210  		for label := range item.Labels {
   211  			labels.Insert(label)
   212  		}
   213  	}
   214  	for _, item := range dataItems.DataItems {
   215  		for label := range labels {
   216  			if _, ok := item.Labels[label]; !ok {
   217  				item.Labels[label] = "not applicable"
   218  			}
   219  		}
   220  	}
   221  
   222  	b, err := json.Marshal(dataItems)
   223  	if err != nil {
   224  		return err
   225  	}
   226  
   227  	destFile := fmt.Sprintf("%v_%v.json", namePrefix, time.Now().Format(dateFormat))
   228  	if *dataItemsDir != "" {
   229  		// Ensure the "dataItemsDir" path to be valid.
   230  		if err := os.MkdirAll(*dataItemsDir, 0750); err != nil {
   231  			return fmt.Errorf("dataItemsDir path %v does not exist and cannot be created: %v", *dataItemsDir, err)
   232  		}
   233  		destFile = path.Join(*dataItemsDir, destFile)
   234  	}
   235  	formatted := &bytes.Buffer{}
   236  	if err := json.Indent(formatted, b, "", "  "); err != nil {
   237  		return fmt.Errorf("indenting error: %v", err)
   238  	}
   239  	return os.WriteFile(destFile, formatted.Bytes(), 0644)
   240  }
   241  
   242  type labelValues struct {
   243  	label  string
   244  	values []string
   245  }
   246  
   247  // metricsCollectorConfig is the config to be marshalled to YAML config file.
   248  // NOTE: The mapping here means only one filter is supported, either value in the list of `values` is able to be collected.
   249  type metricsCollectorConfig struct {
   250  	Metrics map[string]*labelValues
   251  }
   252  
   253  // metricsCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint.
   254  // Currently only Histogram metrics are supported.
   255  type metricsCollector struct {
   256  	*metricsCollectorConfig
   257  	labels map[string]string
   258  }
   259  
   260  func newMetricsCollector(config *metricsCollectorConfig, labels map[string]string) *metricsCollector {
   261  	return &metricsCollector{
   262  		metricsCollectorConfig: config,
   263  		labels:                 labels,
   264  	}
   265  }
   266  
   267  func (*metricsCollector) run(tCtx ktesting.TContext) {
   268  	// metricCollector doesn't need to start before the tests, so nothing to do here.
   269  }
   270  
   271  func (pc *metricsCollector) collect() []DataItem {
   272  	var dataItems []DataItem
   273  	for metric, labelVals := range pc.Metrics {
   274  		// no filter is specified, aggregate all the metrics within the same metricFamily.
   275  		if labelVals == nil {
   276  			dataItem := collectHistogramVec(metric, pc.labels, nil)
   277  			if dataItem != nil {
   278  				dataItems = append(dataItems, *dataItem)
   279  			}
   280  		} else {
   281  			// fetch the metric from metricFamily which match each of the lvMap.
   282  			for _, value := range labelVals.values {
   283  				lvMap := map[string]string{labelVals.label: value}
   284  				dataItem := collectHistogramVec(metric, pc.labels, lvMap)
   285  				if dataItem != nil {
   286  					dataItems = append(dataItems, *dataItem)
   287  				}
   288  			}
   289  		}
   290  	}
   291  	return dataItems
   292  }
   293  
   294  func collectHistogramVec(metric string, labels map[string]string, lvMap map[string]string) *DataItem {
   295  	vec, err := testutil.GetHistogramVecFromGatherer(legacyregistry.DefaultGatherer, metric, lvMap)
   296  	if err != nil {
   297  		klog.Error(err)
   298  		return nil
   299  	}
   300  
   301  	if err := vec.Validate(); err != nil {
   302  		klog.ErrorS(err, "the validation for HistogramVec is failed. The data for this metric won't be stored in a benchmark result file", "metric", metric, "labels", labels)
   303  		return nil
   304  	}
   305  
   306  	if vec.GetAggregatedSampleCount() == 0 {
   307  		klog.InfoS("It is expected that this metric wasn't recorded. The data for this metric won't be stored in a benchmark result file", "metric", metric, "labels", labels)
   308  		return nil
   309  	}
   310  
   311  	q50 := vec.Quantile(0.50)
   312  	q90 := vec.Quantile(0.90)
   313  	q95 := vec.Quantile(0.95)
   314  	q99 := vec.Quantile(0.99)
   315  	avg := vec.Average()
   316  
   317  	msFactor := float64(time.Second) / float64(time.Millisecond)
   318  
   319  	// Copy labels and add "Metric" label for this metric.
   320  	labelMap := map[string]string{"Metric": metric}
   321  	for k, v := range labels {
   322  		labelMap[k] = v
   323  	}
   324  	for k, v := range lvMap {
   325  		labelMap[k] = v
   326  	}
   327  	return &DataItem{
   328  		Labels: labelMap,
   329  		Data: map[string]float64{
   330  			"Perc50":  q50 * msFactor,
   331  			"Perc90":  q90 * msFactor,
   332  			"Perc95":  q95 * msFactor,
   333  			"Perc99":  q99 * msFactor,
   334  			"Average": avg * msFactor,
   335  		},
   336  		Unit: "ms",
   337  	}
   338  }
   339  
   340  type throughputCollector struct {
   341  	podInformer           coreinformers.PodInformer
   342  	schedulingThroughputs []float64
   343  	labels                map[string]string
   344  	namespaces            []string
   345  	errorMargin           float64
   346  }
   347  
   348  func newThroughputCollector(tb ktesting.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector {
   349  	return &throughputCollector{
   350  		podInformer: podInformer,
   351  		labels:      labels,
   352  		namespaces:  namespaces,
   353  		errorMargin: errorMargin,
   354  	}
   355  }
   356  
   357  func (tc *throughputCollector) run(tCtx ktesting.TContext) {
   358  	podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...)
   359  	if err != nil {
   360  		klog.Fatalf("%v", err)
   361  	}
   362  	lastScheduledCount := len(podsScheduled)
   363  	ticker := time.NewTicker(throughputSampleInterval)
   364  	defer ticker.Stop()
   365  	lastSampleTime := time.Now()
   366  	started := false
   367  	skipped := 0
   368  
   369  	for {
   370  		select {
   371  		case <-tCtx.Done():
   372  			return
   373  		case <-ticker.C:
   374  			now := time.Now()
   375  			podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...)
   376  			if err != nil {
   377  				klog.Fatalf("%v", err)
   378  			}
   379  
   380  			scheduled := len(podsScheduled)
   381  			// Only do sampling if number of scheduled pods is greater than zero.
   382  			if scheduled == 0 {
   383  				continue
   384  			}
   385  			if !started {
   386  				started = true
   387  				// Skip the initial sample. It's likely to be an outlier because
   388  				// sampling and creating pods get started independently.
   389  				lastScheduledCount = scheduled
   390  				lastSampleTime = now
   391  				continue
   392  			}
   393  
   394  			newScheduled := scheduled - lastScheduledCount
   395  			if newScheduled == 0 {
   396  				// Throughput would be zero for the interval.
   397  				// Instead of recording 0 pods/s, keep waiting
   398  				// until we see at least one additional pod
   399  				// being scheduled.
   400  				skipped++
   401  				continue
   402  			}
   403  
   404  			// This should be roughly equal to
   405  			// throughputSampleInterval * (skipped + 1), but we
   406  			// don't count on that because the goroutine might not
   407  			// be scheduled immediately when the timer
   408  			// triggers. Instead we track the actual time stamps.
   409  			duration := now.Sub(lastSampleTime)
   410  			durationInSeconds := duration.Seconds()
   411  			throughput := float64(newScheduled) / durationInSeconds
   412  			expectedDuration := throughputSampleInterval * time.Duration(skipped+1)
   413  			errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100
   414  			if tc.errorMargin > 0 && math.Abs(errorMargin) > tc.errorMargin {
   415  				// This might affect the result, report it.
   416  				tCtx.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin)
   417  			}
   418  
   419  			// To keep percentiles accurate, we have to record multiple samples with the same
   420  			// throughput value if we skipped some intervals.
   421  			for i := 0; i <= skipped; i++ {
   422  				tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput)
   423  			}
   424  			lastScheduledCount = scheduled
   425  			klog.Infof("%d pods scheduled", lastScheduledCount)
   426  			skipped = 0
   427  			lastSampleTime = now
   428  		}
   429  	}
   430  }
   431  
   432  func (tc *throughputCollector) collect() []DataItem {
   433  	throughputSummary := DataItem{Labels: tc.labels}
   434  	if length := len(tc.schedulingThroughputs); length > 0 {
   435  		sort.Float64s(tc.schedulingThroughputs)
   436  		sum := 0.0
   437  		for i := range tc.schedulingThroughputs {
   438  			sum += tc.schedulingThroughputs[i]
   439  		}
   440  
   441  		throughputSummary.Labels["Metric"] = "SchedulingThroughput"
   442  		throughputSummary.Data = map[string]float64{
   443  			"Average": sum / float64(length),
   444  			"Perc50":  tc.schedulingThroughputs[int(math.Ceil(float64(length*50)/100))-1],
   445  			"Perc90":  tc.schedulingThroughputs[int(math.Ceil(float64(length*90)/100))-1],
   446  			"Perc95":  tc.schedulingThroughputs[int(math.Ceil(float64(length*95)/100))-1],
   447  			"Perc99":  tc.schedulingThroughputs[int(math.Ceil(float64(length*99)/100))-1],
   448  		}
   449  		throughputSummary.Unit = "pods/s"
   450  	}
   451  
   452  	return []DataItem{throughputSummary}
   453  }
   454  

View as plain text