...

Source file src/k8s.io/kubernetes/test/integration/scheduler_perf/scheduler_perf.go

Documentation: k8s.io/kubernetes/test/integration/scheduler_perf

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package benchmark
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"flag"
    23  	"fmt"
    24  	"io"
    25  	"math"
    26  	"os"
    27  	"path"
    28  	"strings"
    29  	"sync"
    30  	"testing"
    31  	"time"
    32  
    33  	"github.com/google/go-cmp/cmp"
    34  
    35  	v1 "k8s.io/api/core/v1"
    36  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    37  	"k8s.io/apimachinery/pkg/api/meta"
    38  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    39  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    40  	"k8s.io/apimachinery/pkg/runtime/schema"
    41  	"k8s.io/apimachinery/pkg/util/wait"
    42  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    43  	cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
    44  	"k8s.io/client-go/dynamic"
    45  	"k8s.io/client-go/informers"
    46  	coreinformers "k8s.io/client-go/informers/core/v1"
    47  	clientset "k8s.io/client-go/kubernetes"
    48  	"k8s.io/client-go/restmapper"
    49  	"k8s.io/component-base/featuregate"
    50  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    51  	"k8s.io/component-base/metrics/legacyregistry"
    52  	"k8s.io/klog/v2"
    53  	"k8s.io/kubernetes/pkg/scheduler/apis/config"
    54  	"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
    55  	"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
    56  	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
    57  	"k8s.io/kubernetes/pkg/scheduler/metrics"
    58  	"k8s.io/kubernetes/test/integration/framework"
    59  	testutils "k8s.io/kubernetes/test/utils"
    60  	"k8s.io/kubernetes/test/utils/ktesting"
    61  	"k8s.io/kubernetes/test/utils/ktesting/initoption"
    62  	"sigs.k8s.io/yaml"
    63  )
    64  
    65  type operationCode string
    66  
    67  const (
    68  	createAnyOpcode            operationCode = "createAny"
    69  	createNodesOpcode          operationCode = "createNodes"
    70  	createNamespacesOpcode     operationCode = "createNamespaces"
    71  	createPodsOpcode           operationCode = "createPods"
    72  	createPodSetsOpcode        operationCode = "createPodSets"
    73  	createResourceClaimsOpcode operationCode = "createResourceClaims"
    74  	createResourceDriverOpcode operationCode = "createResourceDriver"
    75  	churnOpcode                operationCode = "churn"
    76  	barrierOpcode              operationCode = "barrier"
    77  	sleepOpcode                operationCode = "sleep"
    78  )
    79  
    80  const (
    81  	// Two modes supported in "churn" operator.
    82  
    83  	// Create continuously create API objects without deleting them.
    84  	Create = "create"
    85  	// Recreate creates a number of API objects and then delete them, and repeat the iteration.
    86  	Recreate = "recreate"
    87  )
    88  
    89  const (
    90  	configFile               = "config/performance-config.yaml"
    91  	extensionPointsLabelName = "extension_point"
    92  	resultLabelName          = "result"
    93  )
    94  
    95  var (
    96  	defaultMetricsCollectorConfig = metricsCollectorConfig{
    97  		Metrics: map[string]*labelValues{
    98  			"scheduler_framework_extension_point_duration_seconds": {
    99  				label:  extensionPointsLabelName,
   100  				values: []string{"Filter", "Score"},
   101  			},
   102  			"scheduler_scheduling_attempt_duration_seconds": {
   103  				label:  resultLabelName,
   104  				values: []string{metrics.ScheduledResult, metrics.UnschedulableResult, metrics.ErrorResult},
   105  			},
   106  			"scheduler_pod_scheduling_duration_seconds":     nil,
   107  			"scheduler_pod_scheduling_sli_duration_seconds": nil,
   108  		},
   109  	}
   110  )
   111  
   112  // testCase defines a set of test cases that intends to test the performance of
   113  // similar workloads of varying sizes with shared overall settings such as
   114  // feature gates and metrics collected.
   115  type testCase struct {
   116  	// Name of the testCase.
   117  	Name string
   118  	// Feature gates to set before running the test.
   119  	// Optional
   120  	FeatureGates map[featuregate.Feature]bool
   121  	// List of metrics to collect. Defaults to
   122  	// defaultMetricsCollectorConfig if unspecified.
   123  	// Optional
   124  	MetricsCollectorConfig *metricsCollectorConfig
   125  	// Template for sequence of ops that each workload must follow. Each op will
   126  	// be executed serially one after another. Each element of the list must be
   127  	// createNodesOp, createPodsOp, or barrierOp.
   128  	WorkloadTemplate []op
   129  	// List of workloads to run under this testCase.
   130  	Workloads []*workload
   131  	// SchedulerConfigPath is the path of scheduler configuration
   132  	// Optional
   133  	SchedulerConfigPath string
   134  	// Default path to spec file describing the pods to create.
   135  	// This path can be overridden in createPodsOp by setting PodTemplatePath .
   136  	// Optional
   137  	DefaultPodTemplatePath *string
   138  	// Labels can be used to enable or disable workloads inside this test case.
   139  	Labels []string
   140  }
   141  
   142  func (tc *testCase) collectsMetrics() bool {
   143  	for _, op := range tc.WorkloadTemplate {
   144  		if op.realOp.collectsMetrics() {
   145  			return true
   146  		}
   147  	}
   148  	return false
   149  }
   150  
   151  func (tc *testCase) workloadNamesUnique() error {
   152  	workloadUniqueNames := map[string]bool{}
   153  	for _, w := range tc.Workloads {
   154  		if workloadUniqueNames[w.Name] {
   155  			return fmt.Errorf("%s: workload name %s is not unique", tc.Name, w.Name)
   156  		}
   157  		workloadUniqueNames[w.Name] = true
   158  	}
   159  	return nil
   160  }
   161  
   162  // workload is a subtest under a testCase that tests the scheduler performance
   163  // for a certain ordering of ops. The set of nodes created and pods scheduled
   164  // in a workload may be heterogeneous.
   165  type workload struct {
   166  	// Name of the workload.
   167  	Name string
   168  	// Values of parameters used in the workloadTemplate.
   169  	Params params
   170  	// Labels can be used to enable or disable a workload.
   171  	Labels []string
   172  }
   173  
   174  type params struct {
   175  	params map[string]int
   176  	// isUsed field records whether params is used or not.
   177  	isUsed map[string]bool
   178  }
   179  
   180  // UnmarshalJSON is a custom unmarshaler for params.
   181  //
   182  // from(json):
   183  //
   184  //	{
   185  //		"initNodes": 500,
   186  //		"initPods": 50
   187  //	}
   188  //
   189  // to:
   190  //
   191  //	params{
   192  //		params: map[string]int{
   193  //			"intNodes": 500,
   194  //			"initPods": 50,
   195  //		},
   196  //		isUsed: map[string]bool{}, // empty map
   197  //	}
   198  func (p *params) UnmarshalJSON(b []byte) error {
   199  	aux := map[string]int{}
   200  
   201  	if err := json.Unmarshal(b, &aux); err != nil {
   202  		return err
   203  	}
   204  
   205  	p.params = aux
   206  	p.isUsed = map[string]bool{}
   207  	return nil
   208  }
   209  
   210  // get returns param.
   211  func (p params) get(key string) (int, error) {
   212  	p.isUsed[key] = true
   213  	param, ok := p.params[key]
   214  	if ok {
   215  		return param, nil
   216  	}
   217  	return 0, fmt.Errorf("parameter %s is undefined", key)
   218  }
   219  
   220  // unusedParams returns the names of unusedParams
   221  func (w workload) unusedParams() []string {
   222  	var ret []string
   223  	for name := range w.Params.params {
   224  		if !w.Params.isUsed[name] {
   225  			ret = append(ret, name)
   226  		}
   227  	}
   228  	return ret
   229  }
   230  
   231  // op is a dummy struct which stores the real op in itself.
   232  type op struct {
   233  	realOp realOp
   234  }
   235  
   236  // UnmarshalJSON is a custom unmarshaler for the op struct since we don't know
   237  // which op we're decoding at runtime.
   238  func (op *op) UnmarshalJSON(b []byte) error {
   239  	possibleOps := []realOp{
   240  		&createAny{},
   241  		&createNodesOp{},
   242  		&createNamespacesOp{},
   243  		&createPodsOp{},
   244  		&createPodSetsOp{},
   245  		&createResourceClaimsOp{},
   246  		&createResourceDriverOp{},
   247  		&churnOp{},
   248  		&barrierOp{},
   249  		&sleepOp{},
   250  		// TODO(#94601): add a delete nodes op to simulate scaling behaviour?
   251  	}
   252  	var firstError error
   253  	for _, possibleOp := range possibleOps {
   254  		if err := json.Unmarshal(b, possibleOp); err == nil {
   255  			if err2 := possibleOp.isValid(true); err2 == nil {
   256  				op.realOp = possibleOp
   257  				return nil
   258  			} else if firstError == nil {
   259  				// Don't return an error yet. Even though this op is invalid, it may
   260  				// still match other possible ops.
   261  				firstError = err2
   262  			}
   263  		}
   264  	}
   265  	return fmt.Errorf("cannot unmarshal %s into any known op type: %w", string(b), firstError)
   266  }
   267  
   268  // realOp is an interface that is implemented by different structs. To evaluate
   269  // the validity of ops at parse-time, a isValid function must be implemented.
   270  type realOp interface {
   271  	// isValid verifies the validity of the op args such as node/pod count. Note
   272  	// that we don't catch undefined parameters at this stage.
   273  	isValid(allowParameterization bool) error
   274  	// collectsMetrics checks if the op collects metrics.
   275  	collectsMetrics() bool
   276  	// patchParams returns a patched realOp of the same type after substituting
   277  	// parameterizable values with workload-specific values. One should implement
   278  	// this method on the value receiver base type, not a pointer receiver base
   279  	// type, even though calls will be made from with a *realOp. This is because
   280  	// callers don't want the receiver to inadvertently modify the realOp
   281  	// (instead, it's returned as a return value).
   282  	patchParams(w *workload) (realOp, error)
   283  }
   284  
   285  // runnableOp is an interface implemented by some operations. It makes it posssible
   286  // to execute the operation without having to add separate code into runWorkload.
   287  type runnableOp interface {
   288  	realOp
   289  
   290  	// requiredNamespaces returns all namespaces that runWorkload must create
   291  	// before running the operation.
   292  	requiredNamespaces() []string
   293  	// run executes the steps provided by the operation.
   294  	run(ktesting.TContext)
   295  }
   296  
   297  func isValidParameterizable(val string) bool {
   298  	return strings.HasPrefix(val, "$")
   299  }
   300  
   301  func isValidCount(allowParameterization bool, count int, countParam string) bool {
   302  	if !allowParameterization || countParam == "" {
   303  		// Ignore parameter. The value itself must be okay.
   304  		return count >= 0
   305  	}
   306  	return isValidParameterizable(countParam)
   307  }
   308  
   309  // createNodesOp defines an op where nodes are created as a part of a workload.
   310  type createNodesOp struct {
   311  	// Must be "createNodes".
   312  	Opcode operationCode
   313  	// Number of nodes to create. Parameterizable through CountParam.
   314  	Count int
   315  	// Template parameter for Count.
   316  	CountParam string
   317  	// Path to spec file describing the nodes to create.
   318  	// Optional
   319  	NodeTemplatePath *string
   320  	// At most one of the following strategies can be defined. Defaults
   321  	// to TrivialNodePrepareStrategy if unspecified.
   322  	// Optional
   323  	NodeAllocatableStrategy  *testutils.NodeAllocatableStrategy
   324  	LabelNodePrepareStrategy *testutils.LabelNodePrepareStrategy
   325  	UniqueNodeLabelStrategy  *testutils.UniqueNodeLabelStrategy
   326  }
   327  
   328  func (cno *createNodesOp) isValid(allowParameterization bool) error {
   329  	if cno.Opcode != createNodesOpcode {
   330  		return fmt.Errorf("invalid opcode %q", cno.Opcode)
   331  	}
   332  	if !isValidCount(allowParameterization, cno.Count, cno.CountParam) {
   333  		return fmt.Errorf("invalid Count=%d / CountParam=%q", cno.Count, cno.CountParam)
   334  	}
   335  	return nil
   336  }
   337  
   338  func (*createNodesOp) collectsMetrics() bool {
   339  	return false
   340  }
   341  
   342  func (cno createNodesOp) patchParams(w *workload) (realOp, error) {
   343  	if cno.CountParam != "" {
   344  		var err error
   345  		cno.Count, err = w.Params.get(cno.CountParam[1:])
   346  		if err != nil {
   347  			return nil, err
   348  		}
   349  	}
   350  	return &cno, (&cno).isValid(false)
   351  }
   352  
   353  // createNamespacesOp defines an op for creating namespaces
   354  type createNamespacesOp struct {
   355  	// Must be "createNamespaces".
   356  	Opcode operationCode
   357  	// Name prefix of the Namespace. The format is "<prefix>-<number>", where number is
   358  	// between 0 and count-1.
   359  	Prefix string
   360  	// Number of namespaces to create. Parameterizable through CountParam.
   361  	Count int
   362  	// Template parameter for Count. Takes precedence over Count if both set.
   363  	CountParam string
   364  	// Path to spec file describing the Namespaces to create.
   365  	// Optional
   366  	NamespaceTemplatePath *string
   367  }
   368  
   369  func (cmo *createNamespacesOp) isValid(allowParameterization bool) error {
   370  	if cmo.Opcode != createNamespacesOpcode {
   371  		return fmt.Errorf("invalid opcode %q", cmo.Opcode)
   372  	}
   373  	if !isValidCount(allowParameterization, cmo.Count, cmo.CountParam) {
   374  		return fmt.Errorf("invalid Count=%d / CountParam=%q", cmo.Count, cmo.CountParam)
   375  	}
   376  	return nil
   377  }
   378  
   379  func (*createNamespacesOp) collectsMetrics() bool {
   380  	return false
   381  }
   382  
   383  func (cmo createNamespacesOp) patchParams(w *workload) (realOp, error) {
   384  	if cmo.CountParam != "" {
   385  		var err error
   386  		cmo.Count, err = w.Params.get(cmo.CountParam[1:])
   387  		if err != nil {
   388  			return nil, err
   389  		}
   390  	}
   391  	return &cmo, (&cmo).isValid(false)
   392  }
   393  
   394  // createPodsOp defines an op where pods are scheduled as a part of a workload.
   395  // The test can block on the completion of this op before moving forward or
   396  // continue asynchronously.
   397  type createPodsOp struct {
   398  	// Must be "createPods".
   399  	Opcode operationCode
   400  	// Number of pods to schedule. Parameterizable through CountParam.
   401  	Count int
   402  	// Template parameter for Count.
   403  	CountParam string
   404  	// Whether or not to enable metrics collection for this createPodsOp.
   405  	// Optional. Both CollectMetrics and SkipWaitToCompletion cannot be true at
   406  	// the same time for a particular createPodsOp.
   407  	CollectMetrics bool
   408  	// Namespace the pods should be created in. Defaults to a unique
   409  	// namespace of the format "namespace-<number>".
   410  	// Optional
   411  	Namespace *string
   412  	// Path to spec file describing the pods to schedule.
   413  	// If nil, DefaultPodTemplatePath will be used.
   414  	// Optional
   415  	PodTemplatePath *string
   416  	// Whether or not to wait for all pods in this op to get scheduled.
   417  	// Defaults to false if not specified.
   418  	// Optional
   419  	SkipWaitToCompletion bool
   420  	// Persistent volume settings for the pods to be scheduled.
   421  	// Optional
   422  	PersistentVolumeTemplatePath      *string
   423  	PersistentVolumeClaimTemplatePath *string
   424  }
   425  
   426  func (cpo *createPodsOp) isValid(allowParameterization bool) error {
   427  	if cpo.Opcode != createPodsOpcode {
   428  		return fmt.Errorf("invalid opcode %q; expected %q", cpo.Opcode, createPodsOpcode)
   429  	}
   430  	if !isValidCount(allowParameterization, cpo.Count, cpo.CountParam) {
   431  		return fmt.Errorf("invalid Count=%d / CountParam=%q", cpo.Count, cpo.CountParam)
   432  	}
   433  	if cpo.CollectMetrics && cpo.SkipWaitToCompletion {
   434  		// While it's technically possible to achieve this, the additional
   435  		// complexity is not worth it, especially given that we don't have any
   436  		// use-cases right now.
   437  		return fmt.Errorf("collectMetrics and skipWaitToCompletion cannot be true at the same time")
   438  	}
   439  	return nil
   440  }
   441  
   442  func (cpo *createPodsOp) collectsMetrics() bool {
   443  	return cpo.CollectMetrics
   444  }
   445  
   446  func (cpo createPodsOp) patchParams(w *workload) (realOp, error) {
   447  	if cpo.CountParam != "" {
   448  		var err error
   449  		cpo.Count, err = w.Params.get(cpo.CountParam[1:])
   450  		if err != nil {
   451  			return nil, err
   452  		}
   453  	}
   454  	return &cpo, (&cpo).isValid(false)
   455  }
   456  
   457  // createPodSetsOp defines an op where a set of createPodsOps is created in each unique namespace.
   458  type createPodSetsOp struct {
   459  	// Must be "createPodSets".
   460  	Opcode operationCode
   461  	// Number of sets to create.
   462  	Count int
   463  	// Template parameter for Count.
   464  	CountParam string
   465  	// Each set of pods will be created in a namespace of the form namespacePrefix-<number>,
   466  	// where number is from 0 to count-1
   467  	NamespacePrefix string
   468  	// The template of a createPodsOp.
   469  	CreatePodsOp createPodsOp
   470  }
   471  
   472  func (cpso *createPodSetsOp) isValid(allowParameterization bool) error {
   473  	if cpso.Opcode != createPodSetsOpcode {
   474  		return fmt.Errorf("invalid opcode %q; expected %q", cpso.Opcode, createPodSetsOpcode)
   475  	}
   476  	if !isValidCount(allowParameterization, cpso.Count, cpso.CountParam) {
   477  		return fmt.Errorf("invalid Count=%d / CountParam=%q", cpso.Count, cpso.CountParam)
   478  	}
   479  	return cpso.CreatePodsOp.isValid(allowParameterization)
   480  }
   481  
   482  func (cpso *createPodSetsOp) collectsMetrics() bool {
   483  	return cpso.CreatePodsOp.CollectMetrics
   484  }
   485  
   486  func (cpso createPodSetsOp) patchParams(w *workload) (realOp, error) {
   487  	if cpso.CountParam != "" {
   488  		var err error
   489  		cpso.Count, err = w.Params.get(cpso.CountParam[1:])
   490  		if err != nil {
   491  			return nil, err
   492  		}
   493  	}
   494  	return &cpso, (&cpso).isValid(true)
   495  }
   496  
   497  // churnOp defines an op where services are created as a part of a workload.
   498  type churnOp struct {
   499  	// Must be "churnOp".
   500  	Opcode operationCode
   501  	// Value must be one of the followings:
   502  	// - recreate. In this mode, API objects will be created for N cycles, and then
   503  	//   deleted in the next N cycles. N is specified by the "Number" field.
   504  	// - create. In this mode, API objects will be created (without deletion) until
   505  	//   reaching a threshold - which is specified by the "Number" field.
   506  	Mode string
   507  	// Maximum number of API objects to be created.
   508  	// Defaults to 0, which means unlimited.
   509  	Number int
   510  	// Intervals of churning. Defaults to 500 millisecond.
   511  	IntervalMilliseconds int64
   512  	// Namespace the churning objects should be created in. Defaults to a unique
   513  	// namespace of the format "namespace-<number>".
   514  	// Optional
   515  	Namespace *string
   516  	// Path of API spec files.
   517  	TemplatePaths []string
   518  }
   519  
   520  func (co *churnOp) isValid(_ bool) error {
   521  	if co.Opcode != churnOpcode {
   522  		return fmt.Errorf("invalid opcode %q", co.Opcode)
   523  	}
   524  	if co.Mode != Recreate && co.Mode != Create {
   525  		return fmt.Errorf("invalid mode: %v. must be one of %v", co.Mode, []string{Recreate, Create})
   526  	}
   527  	if co.Number < 0 {
   528  		return fmt.Errorf("number (%v) cannot be negative", co.Number)
   529  	}
   530  	if co.Mode == Recreate && co.Number == 0 {
   531  		return fmt.Errorf("number cannot be 0 when mode is %v", Recreate)
   532  	}
   533  	if len(co.TemplatePaths) == 0 {
   534  		return fmt.Errorf("at least one template spec file needs to be specified")
   535  	}
   536  	return nil
   537  }
   538  
   539  func (*churnOp) collectsMetrics() bool {
   540  	return false
   541  }
   542  
   543  func (co churnOp) patchParams(w *workload) (realOp, error) {
   544  	return &co, nil
   545  }
   546  
   547  // barrierOp defines an op that can be used to wait until all scheduled pods of
   548  // one or many namespaces have been bound to nodes. This is useful when pods
   549  // were scheduled with SkipWaitToCompletion set to true.
   550  type barrierOp struct {
   551  	// Must be "barrier".
   552  	Opcode operationCode
   553  	// Namespaces to block on. Empty array or not specifying this field signifies
   554  	// that the barrier should block on all namespaces.
   555  	Namespaces []string
   556  }
   557  
   558  func (bo *barrierOp) isValid(allowParameterization bool) error {
   559  	if bo.Opcode != barrierOpcode {
   560  		return fmt.Errorf("invalid opcode %q", bo.Opcode)
   561  	}
   562  	return nil
   563  }
   564  
   565  func (*barrierOp) collectsMetrics() bool {
   566  	return false
   567  }
   568  
   569  func (bo barrierOp) patchParams(w *workload) (realOp, error) {
   570  	return &bo, nil
   571  }
   572  
   573  // sleepOp defines an op that can be used to sleep for a specified amount of time.
   574  // This is useful in simulating workloads that require some sort of time-based synchronisation.
   575  type sleepOp struct {
   576  	// Must be "sleep".
   577  	Opcode operationCode
   578  	// duration of sleep.
   579  	Duration time.Duration
   580  }
   581  
   582  func (so *sleepOp) UnmarshalJSON(data []byte) (err error) {
   583  	var tmp struct {
   584  		Opcode   operationCode
   585  		Duration string
   586  	}
   587  	if err = json.Unmarshal(data, &tmp); err != nil {
   588  		return err
   589  	}
   590  
   591  	so.Opcode = tmp.Opcode
   592  	so.Duration, err = time.ParseDuration(tmp.Duration)
   593  	return err
   594  }
   595  
   596  func (so *sleepOp) isValid(_ bool) error {
   597  	if so.Opcode != sleepOpcode {
   598  		return fmt.Errorf("invalid opcode %q; expected %q", so.Opcode, sleepOpcode)
   599  	}
   600  	return nil
   601  }
   602  
   603  func (so *sleepOp) collectsMetrics() bool {
   604  	return false
   605  }
   606  
   607  func (so sleepOp) patchParams(_ *workload) (realOp, error) {
   608  	return &so, nil
   609  }
   610  
   611  var useTestingLog = flag.Bool("use-testing-log", false, "Write log entries with testing.TB.Log. This is more suitable for unit testing and debugging, but less realistic in real benchmarks.")
   612  
   613  func initTestOutput(tb testing.TB) io.Writer {
   614  	var output io.Writer
   615  	if *useTestingLog {
   616  		output = framework.NewTBWriter(tb)
   617  	} else {
   618  		tmpDir := tb.TempDir()
   619  		logfileName := path.Join(tmpDir, "output.log")
   620  		fileOutput, err := os.Create(logfileName)
   621  		if err != nil {
   622  			tb.Fatalf("create log file: %v", err)
   623  		}
   624  		output = fileOutput
   625  
   626  		tb.Cleanup(func() {
   627  			// Dump the log output when the test is done.  The user
   628  			// can decide how much of it will be visible in case of
   629  			// success: then "go test" truncates, "go test -v"
   630  			// doesn't. All of it will be shown for a failure.
   631  			if err := fileOutput.Close(); err != nil {
   632  				tb.Fatalf("close log file: %v", err)
   633  			}
   634  			log, err := os.ReadFile(logfileName)
   635  			if err != nil {
   636  				tb.Fatalf("read log file: %v", err)
   637  			}
   638  			tb.Logf("full log output:\n%s", string(log))
   639  		})
   640  	}
   641  	return output
   642  }
   643  
   644  type cleanupKeyType struct{}
   645  
   646  var cleanupKey = cleanupKeyType{}
   647  
   648  // shouldCleanup returns true if a function should clean up resource in the
   649  // apiserver when the test is done. This is true for unit tests (etcd and
   650  // apiserver get reused) and false for benchmarks (each benchmark starts with a
   651  // clean state, so cleaning up just wastes time).
   652  //
   653  // The default if not explicitly set in the context is true.
   654  func shouldCleanup(ctx context.Context) bool {
   655  	val := ctx.Value(cleanupKey)
   656  	if enabled, ok := val.(bool); ok {
   657  		return enabled
   658  	}
   659  	return true
   660  }
   661  
   662  // withCleanup sets whether cleaning up resources in the apiserver
   663  // should be done. The default is true.
   664  func withCleanup(tCtx ktesting.TContext, enabled bool) ktesting.TContext {
   665  	return ktesting.WithValue(tCtx, cleanupKey, enabled)
   666  }
   667  
   668  var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by BenchmarkPerfScheduling")
   669  
   670  // RunBenchmarkPerfScheduling runs the scheduler performance tests.
   671  // Optionally, you can pass your own scheduler plugin via outOfTreePluginRegistry.
   672  func RunBenchmarkPerfScheduling(b *testing.B, outOfTreePluginRegistry frameworkruntime.Registry) {
   673  	testCases, err := getTestCases(configFile)
   674  	if err != nil {
   675  		b.Fatal(err)
   676  	}
   677  	if err = validateTestCases(testCases); err != nil {
   678  		b.Fatal(err)
   679  	}
   680  
   681  	output := initTestOutput(b)
   682  
   683  	// Because we run sequentially, it is possible to change the global
   684  	// klog logger and redirect log output. Quite a lot of code still uses
   685  	// it instead of supporting contextual logging.
   686  	//
   687  	// Because we leak one goroutine which calls klog, we cannot restore
   688  	// the previous state.
   689  	_ = framework.RedirectKlog(b, output)
   690  
   691  	dataItems := DataItems{Version: "v1"}
   692  	for _, tc := range testCases {
   693  		b.Run(tc.Name, func(b *testing.B) {
   694  			for _, w := range tc.Workloads {
   695  				b.Run(w.Name, func(b *testing.B) {
   696  					if !enabled(*perfSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
   697  						b.Skipf("disabled by label filter %q", *perfSchedulingLabelFilter)
   698  					}
   699  					tCtx := ktesting.Init(b, initoption.PerTestOutput(*useTestingLog))
   700  
   701  					// Ensure that there are no leaked
   702  					// goroutines.  They could influence
   703  					// performance of the next benchmark.
   704  					// This must *after* RedirectKlog
   705  					// because then during cleanup, the
   706  					// test will wait for goroutines to
   707  					// quit *before* restoring klog settings.
   708  					framework.GoleakCheck(b)
   709  
   710  					// Now that we are ready to run, start
   711  					// etcd.
   712  					framework.StartEtcd(b, output)
   713  
   714  					// 30 minutes should be plenty enough even for the 5000-node tests.
   715  					timeout := 30 * time.Minute
   716  					tCtx = ktesting.WithTimeout(tCtx, timeout, fmt.Sprintf("timed out after the %s per-test timeout", timeout))
   717  
   718  					for feature, flag := range tc.FeatureGates {
   719  						defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)()
   720  					}
   721  					informerFactory, tCtx := setupClusterForWorkload(tCtx, tc.SchedulerConfigPath, tc.FeatureGates, outOfTreePluginRegistry)
   722  
   723  					// No need to clean up, each benchmark testcase starts with an empty
   724  					// etcd database.
   725  					tCtx = withCleanup(tCtx, false)
   726  
   727  					results := runWorkload(tCtx, tc, w, informerFactory)
   728  					dataItems.DataItems = append(dataItems.DataItems, results...)
   729  
   730  					if len(results) > 0 {
   731  						// The default ns/op is not
   732  						// useful because it includes
   733  						// the time spent on
   734  						// initialization and shutdown. Here we suppress it.
   735  						b.ReportMetric(0, "ns/op")
   736  
   737  						// Instead, report the same
   738  						// results that also get stored
   739  						// in the JSON file.
   740  						for _, result := range results {
   741  							// For some metrics like
   742  							// scheduler_framework_extension_point_duration_seconds
   743  							// the actual value has some
   744  							// other unit. We patch the key
   745  							// to make it look right.
   746  							metric := strings.ReplaceAll(result.Labels["Metric"], "_seconds", "_"+result.Unit)
   747  							for key, value := range result.Data {
   748  								b.ReportMetric(value, metric+"/"+key)
   749  							}
   750  						}
   751  					}
   752  
   753  					// Reset metrics to prevent metrics generated in current workload gets
   754  					// carried over to the next workload.
   755  					legacyregistry.Reset()
   756  				})
   757  			}
   758  		})
   759  	}
   760  	if err := dataItems2JSONFile(dataItems, b.Name()+"_benchmark"); err != nil {
   761  		b.Fatalf("unable to write measured data %+v: %v", dataItems, err)
   762  	}
   763  }
   764  
   765  var testSchedulingLabelFilter = flag.String("test-scheduling-label-filter", "integration-test", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by TestScheduling")
   766  
   767  type schedulerConfig struct {
   768  	schedulerConfigPath string
   769  	featureGates        map[featuregate.Feature]bool
   770  }
   771  
   772  func (c schedulerConfig) equals(tc *testCase) bool {
   773  	return c.schedulerConfigPath == tc.SchedulerConfigPath &&
   774  		cmp.Equal(c.featureGates, tc.FeatureGates)
   775  }
   776  
   777  func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error) {
   778  	data, err := os.ReadFile(file)
   779  	if err != nil {
   780  		return nil, err
   781  	}
   782  	// The UniversalDecoder runs defaulting and returns the internal type by default.
   783  	obj, gvk, err := scheme.Codecs.UniversalDecoder().Decode(data, nil, nil)
   784  	if err != nil {
   785  		return nil, err
   786  	}
   787  	if cfgObj, ok := obj.(*config.KubeSchedulerConfiguration); ok {
   788  		return cfgObj, nil
   789  	}
   790  	return nil, fmt.Errorf("couldn't decode as KubeSchedulerConfiguration, got %s: ", gvk)
   791  }
   792  
   793  func unrollWorkloadTemplate(tb ktesting.TB, wt []op, w *workload) []op {
   794  	var unrolled []op
   795  	for opIndex, o := range wt {
   796  		realOp, err := o.realOp.patchParams(w)
   797  		if err != nil {
   798  			tb.Fatalf("op %d: %v", opIndex, err)
   799  		}
   800  		switch concreteOp := realOp.(type) {
   801  		case *createPodSetsOp:
   802  			tb.Logf("Creating %d pod sets %s", concreteOp.Count, concreteOp.CountParam)
   803  			for i := 0; i < concreteOp.Count; i++ {
   804  				copy := concreteOp.CreatePodsOp
   805  				ns := fmt.Sprintf("%s-%d", concreteOp.NamespacePrefix, i)
   806  				copy.Namespace = &ns
   807  				unrolled = append(unrolled, op{realOp: &copy})
   808  			}
   809  		default:
   810  			unrolled = append(unrolled, o)
   811  		}
   812  	}
   813  	return unrolled
   814  }
   815  
   816  func setupClusterForWorkload(tCtx ktesting.TContext, configPath string, featureGates map[featuregate.Feature]bool, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, ktesting.TContext) {
   817  	var cfg *config.KubeSchedulerConfiguration
   818  	var err error
   819  	if configPath != "" {
   820  		cfg, err = loadSchedulerConfig(configPath)
   821  		if err != nil {
   822  			tCtx.Fatalf("error loading scheduler config file: %v", err)
   823  		}
   824  		if err = validation.ValidateKubeSchedulerConfiguration(cfg); err != nil {
   825  			tCtx.Fatalf("validate scheduler config file failed: %v", err)
   826  		}
   827  	}
   828  	return mustSetupCluster(tCtx, cfg, featureGates, outOfTreePluginRegistry)
   829  }
   830  
   831  func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem {
   832  	b, benchmarking := tCtx.TB().(*testing.B)
   833  	if benchmarking {
   834  		start := time.Now()
   835  		b.Cleanup(func() {
   836  			duration := time.Since(start)
   837  			// This includes startup and shutdown time and thus does not
   838  			// reflect scheduling performance. It's useful to get a feeling
   839  			// for how long each workload runs overall.
   840  			b.ReportMetric(duration.Seconds(), "runtime_seconds")
   841  		})
   842  	}
   843  	cleanup := shouldCleanup(tCtx)
   844  
   845  	// Disable error checking of the sampling interval length in the
   846  	// throughput collector by default. When running benchmarks, report
   847  	// it as test failure when samples are not taken regularly.
   848  	var throughputErrorMargin float64
   849  	if benchmarking {
   850  		// TODO: To prevent the perf-test failure, we increased the error margin, if still not enough
   851  		// one day, we should think of another approach to avoid this trick.
   852  		throughputErrorMargin = 30
   853  	}
   854  
   855  	// Additional informers needed for testing. The pod informer was
   856  	// already created before (scheduler.NewInformerFactory) and the
   857  	// factory was started for it (mustSetupCluster), therefore we don't
   858  	// need to start again.
   859  	podInformer := informerFactory.Core().V1().Pods()
   860  
   861  	// Everything else started by this function gets stopped before it returns.
   862  	tCtx = ktesting.WithCancel(tCtx)
   863  	var wg sync.WaitGroup
   864  	defer wg.Wait()
   865  	defer tCtx.Cancel("workload is done")
   866  
   867  	var mu sync.Mutex
   868  	var dataItems []DataItem
   869  	nextNodeIndex := 0
   870  	// numPodsScheduledPerNamespace has all namespaces created in workload and the number of pods they (will) have.
   871  	// All namespaces listed in numPodsScheduledPerNamespace will be cleaned up.
   872  	numPodsScheduledPerNamespace := make(map[string]int)
   873  
   874  	if cleanup {
   875  		// This must run before controllers get shut down.
   876  		defer cleanupWorkload(tCtx, tc, numPodsScheduledPerNamespace)
   877  	}
   878  
   879  	for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
   880  		realOp, err := op.realOp.patchParams(w)
   881  		if err != nil {
   882  			tCtx.Fatalf("op %d: %v", opIndex, err)
   883  		}
   884  		select {
   885  		case <-tCtx.Done():
   886  			tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx))
   887  		default:
   888  		}
   889  		switch concreteOp := realOp.(type) {
   890  		case *createNodesOp:
   891  			nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, tCtx.Client())
   892  			if err != nil {
   893  				tCtx.Fatalf("op %d: %v", opIndex, err)
   894  			}
   895  			if err := nodePreparer.PrepareNodes(tCtx, nextNodeIndex); err != nil {
   896  				tCtx.Fatalf("op %d: %v", opIndex, err)
   897  			}
   898  			if cleanup {
   899  				defer func() {
   900  					if err := nodePreparer.CleanupNodes(tCtx); err != nil {
   901  						tCtx.Fatalf("failed to clean up nodes, error: %v", err)
   902  					}
   903  				}()
   904  			}
   905  			nextNodeIndex += concreteOp.Count
   906  
   907  		case *createNamespacesOp:
   908  			nsPreparer, err := newNamespacePreparer(tCtx, concreteOp)
   909  			if err != nil {
   910  				tCtx.Fatalf("op %d: %v", opIndex, err)
   911  			}
   912  			if err := nsPreparer.prepare(tCtx); err != nil {
   913  				err2 := nsPreparer.cleanup(tCtx)
   914  				if err2 != nil {
   915  					err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2)
   916  				}
   917  				tCtx.Fatalf("op %d: %v", opIndex, err)
   918  			}
   919  			for _, n := range nsPreparer.namespaces() {
   920  				if _, ok := numPodsScheduledPerNamespace[n]; ok {
   921  					// this namespace has been already created.
   922  					continue
   923  				}
   924  				numPodsScheduledPerNamespace[n] = 0
   925  			}
   926  
   927  		case *createPodsOp:
   928  			var namespace string
   929  			// define Pod's namespace automatically, and create that namespace.
   930  			namespace = fmt.Sprintf("namespace-%d", opIndex)
   931  			if concreteOp.Namespace != nil {
   932  				namespace = *concreteOp.Namespace
   933  			}
   934  			createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace)
   935  			if concreteOp.PodTemplatePath == nil {
   936  				concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath
   937  			}
   938  			var collectors []testDataCollector
   939  			// This needs a separate context and wait group because
   940  			// the code below needs to be sure that the goroutines
   941  			// are stopped.
   942  			var collectorCtx ktesting.TContext
   943  			var collectorWG sync.WaitGroup
   944  			defer collectorWG.Wait()
   945  
   946  			if concreteOp.CollectMetrics {
   947  				collectorCtx = ktesting.WithCancel(tCtx)
   948  				defer collectorCtx.Cancel("cleaning up")
   949  				name := tCtx.Name()
   950  				// The first part is the same for each work load, therefore we can strip it.
   951  				name = name[strings.Index(name, "/")+1:]
   952  				collectors = getTestDataCollectors(collectorCtx, podInformer, fmt.Sprintf("%s/%s", name, namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin)
   953  				for _, collector := range collectors {
   954  					// Need loop-local variable for function below.
   955  					collector := collector
   956  					collectorWG.Add(1)
   957  					go func() {
   958  						defer collectorWG.Done()
   959  						collector.run(collectorCtx)
   960  					}()
   961  				}
   962  			}
   963  			if err := createPods(tCtx, namespace, concreteOp); err != nil {
   964  				tCtx.Fatalf("op %d: %v", opIndex, err)
   965  			}
   966  			if concreteOp.SkipWaitToCompletion {
   967  				// Only record those namespaces that may potentially require barriers
   968  				// in the future.
   969  				numPodsScheduledPerNamespace[namespace] += concreteOp.Count
   970  			} else {
   971  				if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, concreteOp.Count); err != nil {
   972  					tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
   973  				}
   974  			}
   975  			if concreteOp.CollectMetrics {
   976  				// CollectMetrics and SkipWaitToCompletion can never be true at the
   977  				// same time, so if we're here, it means that all pods have been
   978  				// scheduled.
   979  				collectorCtx.Cancel("collecting metrix, collector must stop first")
   980  				collectorWG.Wait()
   981  				mu.Lock()
   982  				for _, collector := range collectors {
   983  					dataItems = append(dataItems, collector.collect()...)
   984  				}
   985  				mu.Unlock()
   986  			}
   987  
   988  			if !concreteOp.SkipWaitToCompletion {
   989  				// SkipWaitToCompletion=false indicates this step has waited for the Pods to be scheduled.
   990  				// So we reset the metrics in global registry; otherwise metrics gathered in this step
   991  				// will be carried over to next step.
   992  				legacyregistry.Reset()
   993  			}
   994  
   995  		case *churnOp:
   996  			var namespace string
   997  			if concreteOp.Namespace != nil {
   998  				namespace = *concreteOp.Namespace
   999  			} else {
  1000  				namespace = fmt.Sprintf("namespace-%d", opIndex)
  1001  			}
  1002  			restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(tCtx.Client().Discovery()))
  1003  			// Ensure the namespace exists.
  1004  			nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
  1005  			if _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
  1006  				tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
  1007  			}
  1008  
  1009  			var churnFns []func(name string) string
  1010  
  1011  			for i, path := range concreteOp.TemplatePaths {
  1012  				unstructuredObj, gvk, err := getUnstructuredFromFile(path)
  1013  				if err != nil {
  1014  					tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
  1015  				}
  1016  				// Obtain GVR.
  1017  				mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
  1018  				if err != nil {
  1019  					tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
  1020  				}
  1021  				gvr := mapping.Resource
  1022  				// Distinguish cluster-scoped with namespaced API objects.
  1023  				var dynRes dynamic.ResourceInterface
  1024  				if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
  1025  					dynRes = tCtx.Dynamic().Resource(gvr).Namespace(namespace)
  1026  				} else {
  1027  					dynRes = tCtx.Dynamic().Resource(gvr)
  1028  				}
  1029  
  1030  				churnFns = append(churnFns, func(name string) string {
  1031  					if name != "" {
  1032  						if err := dynRes.Delete(tCtx, name, metav1.DeleteOptions{}); err != nil {
  1033  							tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err)
  1034  						}
  1035  						return ""
  1036  					}
  1037  
  1038  					live, err := dynRes.Create(tCtx, unstructuredObj, metav1.CreateOptions{})
  1039  					if err != nil {
  1040  						return ""
  1041  					}
  1042  					return live.GetName()
  1043  				})
  1044  			}
  1045  
  1046  			var interval int64 = 500
  1047  			if concreteOp.IntervalMilliseconds != 0 {
  1048  				interval = concreteOp.IntervalMilliseconds
  1049  			}
  1050  			ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
  1051  			defer ticker.Stop()
  1052  
  1053  			switch concreteOp.Mode {
  1054  			case Create:
  1055  				wg.Add(1)
  1056  				go func() {
  1057  					defer wg.Done()
  1058  					count, threshold := 0, concreteOp.Number
  1059  					if threshold == 0 {
  1060  						threshold = math.MaxInt32
  1061  					}
  1062  					for count < threshold {
  1063  						select {
  1064  						case <-ticker.C:
  1065  							for i := range churnFns {
  1066  								churnFns[i]("")
  1067  							}
  1068  							count++
  1069  						case <-tCtx.Done():
  1070  							return
  1071  						}
  1072  					}
  1073  				}()
  1074  			case Recreate:
  1075  				wg.Add(1)
  1076  				go func() {
  1077  					defer wg.Done()
  1078  					retVals := make([][]string, len(churnFns))
  1079  					// For each churn function, instantiate a slice of strings with length "concreteOp.Number".
  1080  					for i := range retVals {
  1081  						retVals[i] = make([]string, concreteOp.Number)
  1082  					}
  1083  
  1084  					count := 0
  1085  					for {
  1086  						select {
  1087  						case <-ticker.C:
  1088  							for i := range churnFns {
  1089  								retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number])
  1090  							}
  1091  							count++
  1092  						case <-tCtx.Done():
  1093  							return
  1094  						}
  1095  					}
  1096  				}()
  1097  			}
  1098  
  1099  		case *barrierOp:
  1100  			for _, namespace := range concreteOp.Namespaces {
  1101  				if _, ok := numPodsScheduledPerNamespace[namespace]; !ok {
  1102  					tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
  1103  				}
  1104  			}
  1105  			if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
  1106  				tCtx.Fatalf("op %d: %v", opIndex, err)
  1107  			}
  1108  			// At the end of the barrier, we can be sure that there are no pods
  1109  			// pending scheduling in the namespaces that we just blocked on.
  1110  			if len(concreteOp.Namespaces) == 0 {
  1111  				numPodsScheduledPerNamespace = make(map[string]int)
  1112  			} else {
  1113  				for _, namespace := range concreteOp.Namespaces {
  1114  					delete(numPodsScheduledPerNamespace, namespace)
  1115  				}
  1116  			}
  1117  
  1118  		case *sleepOp:
  1119  			select {
  1120  			case <-tCtx.Done():
  1121  			case <-time.After(concreteOp.Duration):
  1122  			}
  1123  		default:
  1124  			runable, ok := concreteOp.(runnableOp)
  1125  			if !ok {
  1126  				tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
  1127  			}
  1128  			for _, namespace := range runable.requiredNamespaces() {
  1129  				createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace)
  1130  			}
  1131  			runable.run(tCtx)
  1132  		}
  1133  	}
  1134  
  1135  	// check unused params and inform users
  1136  	unusedParams := w.unusedParams()
  1137  	if len(unusedParams) != 0 {
  1138  		tCtx.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name)
  1139  	}
  1140  
  1141  	// Some tests have unschedulable pods. Do not add an implicit barrier at the
  1142  	// end as we do not want to wait for them.
  1143  	return dataItems
  1144  }
  1145  
  1146  // cleanupWorkload ensures that everything is removed from the API server that
  1147  // might have been created by runWorkload. This must be done before starting
  1148  // the next workload because otherwise it might stumble over previously created
  1149  // objects. For example, the namespaces are the same in different workloads, so
  1150  // not deleting them would cause the next one to fail with "cannot create
  1151  // namespace: already exists".
  1152  //
  1153  // Calling cleanupWorkload can be skipped if it is known that the next workload
  1154  // will run with a fresh etcd instance.
  1155  func cleanupWorkload(tCtx ktesting.TContext, tc *testCase, numPodsScheduledPerNamespace map[string]int) {
  1156  	deleteNow := *metav1.NewDeleteOptions(0)
  1157  	for namespace := range numPodsScheduledPerNamespace {
  1158  		// Pods have to be deleted explicitly, with no grace period. Normally
  1159  		// kubelet will set the DeletionGracePeriodSeconds to zero when it's okay
  1160  		// to remove a deleted pod, but we don't run kubelet...
  1161  		if err := tCtx.Client().CoreV1().Pods(namespace).DeleteCollection(tCtx, deleteNow, metav1.ListOptions{}); err != nil {
  1162  			tCtx.Fatalf("failed to delete pods in namespace %q: %v", namespace, err)
  1163  		}
  1164  		if err := tCtx.Client().CoreV1().Namespaces().Delete(tCtx, namespace, deleteNow); err != nil {
  1165  			tCtx.Fatalf("Deleting Namespace %q in numPodsScheduledPerNamespace: %v", namespace, err)
  1166  		}
  1167  	}
  1168  
  1169  	// We need to wait here because even with deletion timestamp set,
  1170  	// actually removing a namespace can take some time (garbage collecting
  1171  	// other generated object like secrets, etc.) and we don't want to
  1172  	// start the next workloads while that cleanup is still going on.
  1173  	if err := wait.PollUntilContextTimeout(tCtx, time.Second, 5*time.Minute, false, func(ctx context.Context) (bool, error) {
  1174  		namespaces, err := tCtx.Client().CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
  1175  		if err != nil {
  1176  			return false, err
  1177  		}
  1178  		for _, namespace := range namespaces.Items {
  1179  			if _, ok := numPodsScheduledPerNamespace[namespace.Name]; ok {
  1180  				// A namespace created by the workload, need to wait.
  1181  				return false, nil
  1182  			}
  1183  		}
  1184  		// All namespaces gone.
  1185  		return true, nil
  1186  	}); err != nil {
  1187  		tCtx.Fatalf("failed while waiting for namespace removal: %v", err)
  1188  	}
  1189  }
  1190  
  1191  func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) {
  1192  	if _, ok := (*podsPerNamespace)[namespace]; !ok {
  1193  		// The namespace has not created yet.
  1194  		// So, create that and register it.
  1195  		_, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
  1196  		if err != nil {
  1197  			tCtx.Fatalf("failed to create namespace for Pod: %v", namespace)
  1198  		}
  1199  		(*podsPerNamespace)[namespace] = 0
  1200  	}
  1201  }
  1202  
  1203  type testDataCollector interface {
  1204  	run(tCtx ktesting.TContext)
  1205  	collect() []DataItem
  1206  }
  1207  
  1208  func getTestDataCollectors(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector {
  1209  	if mcc == nil {
  1210  		mcc = &defaultMetricsCollectorConfig
  1211  	}
  1212  	return []testDataCollector{
  1213  		newThroughputCollector(tCtx, podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin),
  1214  		newMetricsCollector(mcc, map[string]string{"Name": name}),
  1215  	}
  1216  }
  1217  
  1218  func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Interface) (testutils.TestNodePreparer, error) {
  1219  	var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
  1220  	if cno.NodeAllocatableStrategy != nil {
  1221  		nodeStrategy = cno.NodeAllocatableStrategy
  1222  	} else if cno.LabelNodePrepareStrategy != nil {
  1223  		nodeStrategy = cno.LabelNodePrepareStrategy
  1224  	} else if cno.UniqueNodeLabelStrategy != nil {
  1225  		nodeStrategy = cno.UniqueNodeLabelStrategy
  1226  	}
  1227  
  1228  	if cno.NodeTemplatePath != nil {
  1229  		node, err := getNodeSpecFromFile(cno.NodeTemplatePath)
  1230  		if err != nil {
  1231  			return nil, err
  1232  		}
  1233  		return framework.NewIntegrationTestNodePreparerWithNodeSpec(
  1234  			clientset,
  1235  			[]testutils.CountToStrategy{{Count: cno.Count, Strategy: nodeStrategy}},
  1236  			node,
  1237  		), nil
  1238  	}
  1239  	return framework.NewIntegrationTestNodePreparer(
  1240  		clientset,
  1241  		[]testutils.CountToStrategy{{Count: cno.Count, Strategy: nodeStrategy}},
  1242  		prefix,
  1243  	), nil
  1244  }
  1245  
  1246  func createPods(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) error {
  1247  	strategy, err := getPodStrategy(cpo)
  1248  	if err != nil {
  1249  		return err
  1250  	}
  1251  	tCtx.Logf("creating %d pods in namespace %q", cpo.Count, namespace)
  1252  	config := testutils.NewTestPodCreatorConfig()
  1253  	config.AddStrategy(namespace, cpo.Count, strategy)
  1254  	podCreator := testutils.NewTestPodCreator(tCtx.Client(), config)
  1255  	return podCreator.CreatePods(tCtx)
  1256  }
  1257  
  1258  // waitUntilPodsScheduledInNamespace blocks until all pods in the given
  1259  // namespace are scheduled. Times out after 10 minutes because even at the
  1260  // lowest observed QPS of ~10 pods/sec, a 5000-node test should complete.
  1261  func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespace string, wantCount int) error {
  1262  	var pendingPod *v1.Pod
  1263  
  1264  	err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) {
  1265  		select {
  1266  		case <-ctx.Done():
  1267  			return true, ctx.Err()
  1268  		default:
  1269  		}
  1270  		scheduled, unscheduled, err := getScheduledPods(podInformer, namespace)
  1271  		if err != nil {
  1272  			return false, err
  1273  		}
  1274  		if len(scheduled) >= wantCount {
  1275  			tCtx.Logf("scheduling succeed")
  1276  			return true, nil
  1277  		}
  1278  		tCtx.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled))
  1279  		if len(unscheduled) > 0 {
  1280  			pendingPod = unscheduled[0]
  1281  		} else {
  1282  			pendingPod = nil
  1283  		}
  1284  		return false, nil
  1285  	})
  1286  
  1287  	if err != nil && pendingPod != nil {
  1288  		err = fmt.Errorf("at least pod %s is not scheduled: %v", klog.KObj(pendingPod), err)
  1289  	}
  1290  	return err
  1291  }
  1292  
  1293  // waitUntilPodsScheduled blocks until the all pods in the given namespaces are
  1294  // scheduled.
  1295  func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error {
  1296  	// If unspecified, default to all known namespaces.
  1297  	if len(namespaces) == 0 {
  1298  		for namespace := range numPodsScheduledPerNamespace {
  1299  			namespaces = append(namespaces, namespace)
  1300  		}
  1301  	}
  1302  	for _, namespace := range namespaces {
  1303  		select {
  1304  		case <-tCtx.Done():
  1305  			return context.Cause(tCtx)
  1306  		default:
  1307  		}
  1308  		wantCount, ok := numPodsScheduledPerNamespace[namespace]
  1309  		if !ok {
  1310  			return fmt.Errorf("unknown namespace %s", namespace)
  1311  		}
  1312  		if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, wantCount); err != nil {
  1313  			return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err)
  1314  		}
  1315  	}
  1316  	return nil
  1317  }
  1318  
  1319  func getSpecFromFile(path *string, spec interface{}) error {
  1320  	bytes, err := os.ReadFile(*path)
  1321  	if err != nil {
  1322  		return err
  1323  	}
  1324  	return yaml.UnmarshalStrict(bytes, spec)
  1325  }
  1326  
  1327  func getUnstructuredFromFile(path string) (*unstructured.Unstructured, *schema.GroupVersionKind, error) {
  1328  	bytes, err := os.ReadFile(path)
  1329  	if err != nil {
  1330  		return nil, nil, err
  1331  	}
  1332  
  1333  	bytes, err = yaml.YAMLToJSONStrict(bytes)
  1334  	if err != nil {
  1335  		return nil, nil, fmt.Errorf("cannot covert YAML to JSON: %v", err)
  1336  	}
  1337  
  1338  	obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(bytes, nil, nil)
  1339  	if err != nil {
  1340  		return nil, nil, err
  1341  	}
  1342  	unstructuredObj, ok := obj.(*unstructured.Unstructured)
  1343  	if !ok {
  1344  		return nil, nil, fmt.Errorf("cannot convert spec file in %v to an unstructured obj", path)
  1345  	}
  1346  	return unstructuredObj, gvk, nil
  1347  }
  1348  
  1349  func getTestCases(path string) ([]*testCase, error) {
  1350  	testCases := make([]*testCase, 0)
  1351  	if err := getSpecFromFile(&path, &testCases); err != nil {
  1352  		return nil, fmt.Errorf("parsing test cases error: %w", err)
  1353  	}
  1354  	return testCases, nil
  1355  }
  1356  
  1357  func validateTestCases(testCases []*testCase) error {
  1358  	if len(testCases) == 0 {
  1359  		return fmt.Errorf("no test cases defined")
  1360  	}
  1361  	testCaseUniqueNames := map[string]bool{}
  1362  	for _, tc := range testCases {
  1363  		if testCaseUniqueNames[tc.Name] {
  1364  			return fmt.Errorf("%s: name is not unique", tc.Name)
  1365  		}
  1366  		testCaseUniqueNames[tc.Name] = true
  1367  		if len(tc.Workloads) == 0 {
  1368  			return fmt.Errorf("%s: no workloads defined", tc.Name)
  1369  		}
  1370  		if err := tc.workloadNamesUnique(); err != nil {
  1371  			return err
  1372  		}
  1373  		if len(tc.WorkloadTemplate) == 0 {
  1374  			return fmt.Errorf("%s: no ops defined", tc.Name)
  1375  		}
  1376  		// Make sure there's at least one CreatePods op with collectMetrics set to
  1377  		// true in each workload. What's the point of running a performance
  1378  		// benchmark if no statistics are collected for reporting?
  1379  		if !tc.collectsMetrics() {
  1380  			return fmt.Errorf("%s: no op in the workload template collects metrics", tc.Name)
  1381  		}
  1382  		// TODO(#93795): make sure each workload within a test case has a unique
  1383  		// name? The name is used to identify the stats in benchmark reports.
  1384  		// TODO(#94404): check for unused template parameters? Probably a typo.
  1385  	}
  1386  	return nil
  1387  }
  1388  
  1389  func getPodStrategy(cpo *createPodsOp) (testutils.TestPodCreateStrategy, error) {
  1390  	basePod := makeBasePod()
  1391  	if cpo.PodTemplatePath != nil {
  1392  		var err error
  1393  		basePod, err = getPodSpecFromFile(cpo.PodTemplatePath)
  1394  		if err != nil {
  1395  			return nil, err
  1396  		}
  1397  	}
  1398  	if cpo.PersistentVolumeClaimTemplatePath == nil {
  1399  		return testutils.NewCustomCreatePodStrategy(basePod), nil
  1400  	}
  1401  
  1402  	pvTemplate, err := getPersistentVolumeSpecFromFile(cpo.PersistentVolumeTemplatePath)
  1403  	if err != nil {
  1404  		return nil, err
  1405  	}
  1406  	pvcTemplate, err := getPersistentVolumeClaimSpecFromFile(cpo.PersistentVolumeClaimTemplatePath)
  1407  	if err != nil {
  1408  		return nil, err
  1409  	}
  1410  	return testutils.NewCreatePodWithPersistentVolumeStrategy(pvcTemplate, getCustomVolumeFactory(pvTemplate), basePod), nil
  1411  }
  1412  
  1413  func getNodeSpecFromFile(path *string) (*v1.Node, error) {
  1414  	nodeSpec := &v1.Node{}
  1415  	if err := getSpecFromFile(path, nodeSpec); err != nil {
  1416  		return nil, fmt.Errorf("parsing Node: %w", err)
  1417  	}
  1418  	return nodeSpec, nil
  1419  }
  1420  
  1421  func getPodSpecFromFile(path *string) (*v1.Pod, error) {
  1422  	podSpec := &v1.Pod{}
  1423  	if err := getSpecFromFile(path, podSpec); err != nil {
  1424  		return nil, fmt.Errorf("parsing Pod: %w", err)
  1425  	}
  1426  	return podSpec, nil
  1427  }
  1428  
  1429  func getPersistentVolumeSpecFromFile(path *string) (*v1.PersistentVolume, error) {
  1430  	persistentVolumeSpec := &v1.PersistentVolume{}
  1431  	if err := getSpecFromFile(path, persistentVolumeSpec); err != nil {
  1432  		return nil, fmt.Errorf("parsing PersistentVolume: %w", err)
  1433  	}
  1434  	return persistentVolumeSpec, nil
  1435  }
  1436  
  1437  func getPersistentVolumeClaimSpecFromFile(path *string) (*v1.PersistentVolumeClaim, error) {
  1438  	persistentVolumeClaimSpec := &v1.PersistentVolumeClaim{}
  1439  	if err := getSpecFromFile(path, persistentVolumeClaimSpec); err != nil {
  1440  		return nil, fmt.Errorf("parsing PersistentVolumeClaim: %w", err)
  1441  	}
  1442  	return persistentVolumeClaimSpec, nil
  1443  }
  1444  
  1445  func getCustomVolumeFactory(pvTemplate *v1.PersistentVolume) func(id int) *v1.PersistentVolume {
  1446  	return func(id int) *v1.PersistentVolume {
  1447  		pv := pvTemplate.DeepCopy()
  1448  		volumeID := fmt.Sprintf("vol-%d", id)
  1449  		pv.ObjectMeta.Name = volumeID
  1450  		pvs := pv.Spec.PersistentVolumeSource
  1451  		if pvs.CSI != nil {
  1452  			pvs.CSI.VolumeHandle = volumeID
  1453  		} else if pvs.AWSElasticBlockStore != nil {
  1454  			pvs.AWSElasticBlockStore.VolumeID = volumeID
  1455  		}
  1456  		return pv
  1457  	}
  1458  }
  1459  
  1460  // namespacePreparer holds configuration information for the test namespace preparer.
  1461  type namespacePreparer struct {
  1462  	count  int
  1463  	prefix string
  1464  	spec   *v1.Namespace
  1465  }
  1466  
  1467  func newNamespacePreparer(tCtx ktesting.TContext, cno *createNamespacesOp) (*namespacePreparer, error) {
  1468  	ns := &v1.Namespace{}
  1469  	if cno.NamespaceTemplatePath != nil {
  1470  		if err := getSpecFromFile(cno.NamespaceTemplatePath, ns); err != nil {
  1471  			return nil, fmt.Errorf("parsing NamespaceTemplate: %w", err)
  1472  		}
  1473  	}
  1474  
  1475  	return &namespacePreparer{
  1476  		count:  cno.Count,
  1477  		prefix: cno.Prefix,
  1478  		spec:   ns,
  1479  	}, nil
  1480  }
  1481  
  1482  // namespaces returns namespace names have been (or will be) created by this namespacePreparer
  1483  func (p *namespacePreparer) namespaces() []string {
  1484  	namespaces := make([]string, p.count)
  1485  	for i := 0; i < p.count; i++ {
  1486  		namespaces[i] = fmt.Sprintf("%s-%d", p.prefix, i)
  1487  	}
  1488  	return namespaces
  1489  }
  1490  
  1491  // prepare creates the namespaces.
  1492  func (p *namespacePreparer) prepare(tCtx ktesting.TContext) error {
  1493  	base := &v1.Namespace{}
  1494  	if p.spec != nil {
  1495  		base = p.spec
  1496  	}
  1497  	tCtx.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base)
  1498  	for i := 0; i < p.count; i++ {
  1499  		n := base.DeepCopy()
  1500  		n.Name = fmt.Sprintf("%s-%d", p.prefix, i)
  1501  		if err := testutils.RetryWithExponentialBackOff(func() (bool, error) {
  1502  			_, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, n, metav1.CreateOptions{})
  1503  			return err == nil || apierrors.IsAlreadyExists(err), nil
  1504  		}); err != nil {
  1505  			return err
  1506  		}
  1507  	}
  1508  	return nil
  1509  }
  1510  
  1511  // cleanup deletes existing test namespaces.
  1512  func (p *namespacePreparer) cleanup(tCtx ktesting.TContext) error {
  1513  	var errRet error
  1514  	for i := 0; i < p.count; i++ {
  1515  		n := fmt.Sprintf("%s-%d", p.prefix, i)
  1516  		if err := tCtx.Client().CoreV1().Namespaces().Delete(tCtx, n, metav1.DeleteOptions{}); err != nil {
  1517  			tCtx.Errorf("Deleting Namespace: %v", err)
  1518  			errRet = err
  1519  		}
  1520  	}
  1521  	return errRet
  1522  }
  1523  

View as plain text