...

Source file src/k8s.io/kubernetes/test/integration/scheduler_perf/dra.go

Documentation: k8s.io/kubernetes/test/integration/scheduler_perf

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package benchmark
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"path/filepath"
    23  	"sync"
    24  
    25  	resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/client-go/util/workqueue"
    28  	"k8s.io/klog/v2"
    29  	draapp "k8s.io/kubernetes/test/e2e/dra/test-driver/app"
    30  	"k8s.io/kubernetes/test/utils/ktesting"
    31  )
    32  
    33  // createResourceClaimsOp defines an op where resource claims are created.
    34  type createResourceClaimsOp struct {
    35  	// Must be createResourceClaimsOpcode.
    36  	Opcode operationCode
    37  	// Number of claims to create. Parameterizable through CountParam.
    38  	Count int
    39  	// Template parameter for Count.
    40  	CountParam string
    41  	// Namespace the claims should be created in.
    42  	Namespace string
    43  	// Path to spec file describing the claims to create.
    44  	TemplatePath string
    45  }
    46  
    47  var _ realOp = &createResourceClaimsOp{}
    48  var _ runnableOp = &createResourceClaimsOp{}
    49  
    50  func (op *createResourceClaimsOp) isValid(allowParameterization bool) error {
    51  	if op.Opcode != createResourceClaimsOpcode {
    52  		return fmt.Errorf("invalid opcode %q; expected %q", op.Opcode, createResourceClaimsOpcode)
    53  	}
    54  	if !isValidCount(allowParameterization, op.Count, op.CountParam) {
    55  		return fmt.Errorf("invalid Count=%d / CountParam=%q", op.Count, op.CountParam)
    56  	}
    57  	if op.Namespace == "" {
    58  		return fmt.Errorf("Namespace must be set")
    59  	}
    60  	if op.TemplatePath == "" {
    61  		return fmt.Errorf("TemplatePath must be set")
    62  	}
    63  	return nil
    64  }
    65  
    66  func (op *createResourceClaimsOp) collectsMetrics() bool {
    67  	return false
    68  }
    69  func (op *createResourceClaimsOp) patchParams(w *workload) (realOp, error) {
    70  	if op.CountParam != "" {
    71  		var err error
    72  		op.Count, err = w.Params.get(op.CountParam[1:])
    73  		if err != nil {
    74  			return nil, err
    75  		}
    76  	}
    77  	return op, op.isValid(false)
    78  }
    79  
    80  func (op *createResourceClaimsOp) requiredNamespaces() []string {
    81  	return []string{op.Namespace}
    82  }
    83  
    84  func (op *createResourceClaimsOp) run(tCtx ktesting.TContext) {
    85  	tCtx.Logf("creating %d claims in namespace %q", op.Count, op.Namespace)
    86  
    87  	var claimTemplate *resourcev1alpha2.ResourceClaim
    88  	if err := getSpecFromFile(&op.TemplatePath, &claimTemplate); err != nil {
    89  		tCtx.Fatalf("parsing ResourceClaim %q: %v", op.TemplatePath, err)
    90  	}
    91  	var createErr error
    92  	var mutex sync.Mutex
    93  	create := func(i int) {
    94  		err := func() error {
    95  			if _, err := tCtx.Client().ResourceV1alpha2().ResourceClaims(op.Namespace).Create(tCtx, claimTemplate.DeepCopy(), metav1.CreateOptions{}); err != nil {
    96  				return fmt.Errorf("create claim: %v", err)
    97  			}
    98  			return nil
    99  		}()
   100  		if err != nil {
   101  			mutex.Lock()
   102  			defer mutex.Unlock()
   103  			createErr = err
   104  		}
   105  	}
   106  
   107  	workers := op.Count
   108  	if workers > 30 {
   109  		workers = 30
   110  	}
   111  	workqueue.ParallelizeUntil(tCtx, workers, op.Count, create)
   112  	if createErr != nil {
   113  		tCtx.Fatal(createErr.Error())
   114  	}
   115  }
   116  
   117  // createResourceDriverOp defines an op where resource claims are created.
   118  type createResourceDriverOp struct {
   119  	// Must be createResourceDriverOpcode.
   120  	Opcode operationCode
   121  	// Name of the driver, used to reference it in a resource class.
   122  	DriverName string
   123  	// Number of claims to allow per node. Parameterizable through MaxClaimsPerNodeParam.
   124  	MaxClaimsPerNode int
   125  	// Template parameter for MaxClaimsPerNode.
   126  	MaxClaimsPerNodeParam string
   127  	// Nodes matching this glob pattern have resources managed by the driver.
   128  	Nodes string
   129  	// StructuredParameters is true if the controller that is built into the scheduler
   130  	// is used and the control-plane controller is not needed.
   131  	// Because we don't run the kubelet plugin, ResourceSlices must
   132  	// get created for all nodes.
   133  	StructuredParameters bool
   134  }
   135  
   136  var _ realOp = &createResourceDriverOp{}
   137  var _ runnableOp = &createResourceDriverOp{}
   138  
   139  func (op *createResourceDriverOp) isValid(allowParameterization bool) error {
   140  	if op.Opcode != createResourceDriverOpcode {
   141  		return fmt.Errorf("invalid opcode %q; expected %q", op.Opcode, createResourceDriverOpcode)
   142  	}
   143  	if !isValidCount(allowParameterization, op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam) {
   144  		return fmt.Errorf("invalid MaxClaimsPerNode=%d / MaxClaimsPerNodeParam=%q", op.MaxClaimsPerNode, op.MaxClaimsPerNodeParam)
   145  	}
   146  	if op.DriverName == "" {
   147  		return fmt.Errorf("DriverName must be set")
   148  	}
   149  	if op.Nodes == "" {
   150  		return fmt.Errorf("Nodes must be set")
   151  	}
   152  	return nil
   153  }
   154  
   155  func (op *createResourceDriverOp) collectsMetrics() bool {
   156  	return false
   157  }
   158  func (op *createResourceDriverOp) patchParams(w *workload) (realOp, error) {
   159  	if op.MaxClaimsPerNodeParam != "" {
   160  		var err error
   161  		op.MaxClaimsPerNode, err = w.Params.get(op.MaxClaimsPerNodeParam[1:])
   162  		if err != nil {
   163  			return nil, err
   164  		}
   165  	}
   166  	return op, op.isValid(false)
   167  }
   168  
   169  func (op *createResourceDriverOp) requiredNamespaces() []string { return nil }
   170  
   171  func (op *createResourceDriverOp) run(tCtx ktesting.TContext) {
   172  	tCtx.Logf("creating resource driver %q for nodes matching %q", op.DriverName, op.Nodes)
   173  
   174  	// Start the controller side of the DRA test driver such that it simulates
   175  	// per-node resources.
   176  	resources := draapp.Resources{
   177  		DriverName:     op.DriverName,
   178  		NodeLocal:      true,
   179  		MaxAllocations: op.MaxClaimsPerNode,
   180  	}
   181  
   182  	nodes, err := tCtx.Client().CoreV1().Nodes().List(tCtx, metav1.ListOptions{})
   183  	if err != nil {
   184  		tCtx.Fatalf("list nodes: %v", err)
   185  	}
   186  	for _, node := range nodes.Items {
   187  		match, err := filepath.Match(op.Nodes, node.Name)
   188  		if err != nil {
   189  			tCtx.Fatalf("matching glob pattern %q against node name %q: %v", op.Nodes, node.Name, err)
   190  		}
   191  		if match {
   192  			resources.Nodes = append(resources.Nodes, node.Name)
   193  		}
   194  	}
   195  
   196  	if op.StructuredParameters {
   197  		for _, nodeName := range resources.Nodes {
   198  			slice := resourceSlice(op.DriverName, nodeName, op.MaxClaimsPerNode)
   199  			_, err := tCtx.Client().ResourceV1alpha2().ResourceSlices().Create(tCtx, slice, metav1.CreateOptions{})
   200  			tCtx.ExpectNoError(err, "create node resource slice")
   201  		}
   202  		tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
   203  			err := tCtx.Client().ResourceV1alpha2().ResourceSlices().DeleteCollection(tCtx,
   204  				metav1.DeleteOptions{},
   205  				metav1.ListOptions{FieldSelector: "driverName=" + op.DriverName},
   206  			)
   207  			tCtx.ExpectNoError(err, "delete node resource slices")
   208  		})
   209  		// No need for the controller.
   210  		return
   211  	}
   212  
   213  	controller := draapp.NewController(tCtx.Client(), resources)
   214  	ctx, cancel := context.WithCancel(tCtx)
   215  	var wg sync.WaitGroup
   216  	wg.Add(1)
   217  	go func() {
   218  		defer wg.Done()
   219  		ctx := klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), op.DriverName))
   220  		controller.Run(ctx, 5 /* workers */)
   221  	}()
   222  	tCtx.Cleanup(func() {
   223  		tCtx.Logf("stopping resource driver %q", op.DriverName)
   224  		// We must cancel before waiting.
   225  		cancel()
   226  		wg.Wait()
   227  		tCtx.Logf("stopped resource driver %q", op.DriverName)
   228  	})
   229  }
   230  
   231  func resourceSlice(driverName, nodeName string, capacity int) *resourcev1alpha2.ResourceSlice {
   232  	slice := &resourcev1alpha2.ResourceSlice{
   233  		ObjectMeta: metav1.ObjectMeta{
   234  			Name: nodeName,
   235  		},
   236  
   237  		NodeName:   nodeName,
   238  		DriverName: driverName,
   239  
   240  		ResourceModel: resourcev1alpha2.ResourceModel{
   241  			NamedResources: &resourcev1alpha2.NamedResourcesResources{},
   242  		},
   243  	}
   244  
   245  	for i := 0; i < capacity; i++ {
   246  		slice.ResourceModel.NamedResources.Instances = append(slice.ResourceModel.NamedResources.Instances,
   247  			resourcev1alpha2.NamedResourcesInstance{
   248  				Name: fmt.Sprintf("instance-%d", i),
   249  			},
   250  		)
   251  	}
   252  
   253  	return slice
   254  }
   255  

View as plain text