...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources

     1  /*
     2  Copyright 2024 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package dynamicresources
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  
    24  	v1 "k8s.io/api/core/v1"
    25  	resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
    26  	"k8s.io/apimachinery/pkg/labels"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
    29  	"k8s.io/klog/v2"
    30  	namedresourcesmodel "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources"
    31  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
    32  )
    33  
    34  // resources is a map "node name" -> "driver name" -> available and
    35  // allocated resources per structured parameter model.
    36  type resources map[string]map[string]ResourceModels
    37  
    38  // ResourceModels may have more than one entry because it is valid for a driver to
    39  // use more than one structured parameter model.
    40  type ResourceModels struct {
    41  	NamedResources namedresourcesmodel.Model
    42  }
    43  
    44  // newResourceModel parses the available information about resources. Objects
    45  // with an unknown structured parameter model silently ignored. An error gets
    46  // logged later when parameters required for a pod depend on such an unknown
    47  // model.
    48  func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2listers.ResourceSliceLister, claimAssumeCache volumebinding.AssumeCache, inFlightAllocations *sync.Map) (resources, error) {
    49  	model := make(resources)
    50  
    51  	slices, err := resourceSliceLister.List(labels.Everything())
    52  	if err != nil {
    53  		return nil, fmt.Errorf("list node resource slices: %w", err)
    54  	}
    55  	for _, slice := range slices {
    56  		if model[slice.NodeName] == nil {
    57  			model[slice.NodeName] = make(map[string]ResourceModels)
    58  		}
    59  		resource := model[slice.NodeName][slice.DriverName]
    60  		namedresourcesmodel.AddResources(&resource.NamedResources, slice.NamedResources)
    61  		model[slice.NodeName][slice.DriverName] = resource
    62  	}
    63  
    64  	objs := claimAssumeCache.List(nil)
    65  	for _, obj := range objs {
    66  		claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
    67  		if !ok {
    68  			return nil, fmt.Errorf("got unexpected object of type %T from claim assume cache", obj)
    69  		}
    70  		if obj, ok := inFlightAllocations.Load(claim.UID); ok {
    71  			// If the allocation is in-flight, then we have to use the allocation
    72  			// from that claim.
    73  			claim = obj.(*resourcev1alpha2.ResourceClaim)
    74  		}
    75  		if claim.Status.Allocation == nil {
    76  			continue
    77  		}
    78  		for _, handle := range claim.Status.Allocation.ResourceHandles {
    79  			structured := handle.StructuredData
    80  			if structured == nil {
    81  				continue
    82  			}
    83  			if model[structured.NodeName] == nil {
    84  				model[structured.NodeName] = make(map[string]ResourceModels)
    85  			}
    86  			resource := model[structured.NodeName][handle.DriverName]
    87  			for _, result := range structured.Results {
    88  				// Call AddAllocation for each known model. Each call itself needs to check for nil.
    89  				namedresourcesmodel.AddAllocation(&resource.NamedResources, result.NamedResources)
    90  			}
    91  		}
    92  	}
    93  
    94  	return model, nil
    95  }
    96  
    97  func newClaimController(logger klog.Logger, class *resourcev1alpha2.ResourceClass, classParameters *resourcev1alpha2.ResourceClassParameters, claimParameters *resourcev1alpha2.ResourceClaimParameters) (*claimController, error) {
    98  	// Each node driver is separate from the others. Each driver may have
    99  	// multiple requests which need to be allocated together, so here
   100  	// we have to collect them per model.
   101  	type perDriverRequests struct {
   102  		parameters []runtime.RawExtension
   103  		requests   []*resourcev1alpha2.NamedResourcesRequest
   104  	}
   105  	namedresourcesRequests := make(map[string]perDriverRequests)
   106  	for i, request := range claimParameters.DriverRequests {
   107  		driverName := request.DriverName
   108  		p := namedresourcesRequests[driverName]
   109  		for e, request := range request.Requests {
   110  			switch {
   111  			case request.ResourceRequestModel.NamedResources != nil:
   112  				p.parameters = append(p.parameters, request.VendorParameters)
   113  				p.requests = append(p.requests, request.ResourceRequestModel.NamedResources)
   114  			default:
   115  				return nil, fmt.Errorf("claim parameters %s: driverRequersts[%d].requests[%d]: no supported structured parameters found", klog.KObj(claimParameters), i, e)
   116  			}
   117  		}
   118  		if len(p.requests) > 0 {
   119  			namedresourcesRequests[driverName] = p
   120  		}
   121  	}
   122  
   123  	c := &claimController{
   124  		class:           class,
   125  		classParameters: classParameters,
   126  		claimParameters: claimParameters,
   127  		namedresources:  make(map[string]perDriverController, len(namedresourcesRequests)),
   128  	}
   129  	for driverName, perDriver := range namedresourcesRequests {
   130  		var filter *resourcev1alpha2.NamedResourcesFilter
   131  		for _, f := range classParameters.Filters {
   132  			if f.DriverName == driverName && f.ResourceFilterModel.NamedResources != nil {
   133  				filter = f.ResourceFilterModel.NamedResources
   134  				break
   135  			}
   136  		}
   137  		controller, err := namedresourcesmodel.NewClaimController(filter, perDriver.requests)
   138  		if err != nil {
   139  			return nil, fmt.Errorf("creating claim controller for named resources structured model: %w", err)
   140  		}
   141  		c.namedresources[driverName] = perDriverController{
   142  			parameters: perDriver.parameters,
   143  			controller: controller,
   144  		}
   145  	}
   146  	return c, nil
   147  }
   148  
   149  // claimController currently wraps exactly one structured parameter model.
   150  
   151  type claimController struct {
   152  	class           *resourcev1alpha2.ResourceClass
   153  	classParameters *resourcev1alpha2.ResourceClassParameters
   154  	claimParameters *resourcev1alpha2.ResourceClaimParameters
   155  	namedresources  map[string]perDriverController
   156  }
   157  
   158  type perDriverController struct {
   159  	parameters []runtime.RawExtension
   160  	controller *namedresourcesmodel.Controller
   161  }
   162  
   163  func (c claimController) nodeIsSuitable(ctx context.Context, nodeName string, resources resources) (bool, error) {
   164  	nodeResources := resources[nodeName]
   165  	for driverName, perDriver := range c.namedresources {
   166  		okay, err := perDriver.controller.NodeIsSuitable(ctx, nodeResources[driverName].NamedResources)
   167  		if err != nil {
   168  			// This is an error in the CEL expression which needs
   169  			// to be fixed. Better fail very visibly instead of
   170  			// ignoring the node.
   171  			return false, fmt.Errorf("checking node %q and resources of driver %q: %w", nodeName, driverName, err)
   172  		}
   173  		if !okay {
   174  			return false, nil
   175  		}
   176  	}
   177  	return true, nil
   178  }
   179  
   180  func (c claimController) allocate(ctx context.Context, nodeName string, resources resources) (string, *resourcev1alpha2.AllocationResult, error) {
   181  	allocation := &resourcev1alpha2.AllocationResult{
   182  		Shareable: c.claimParameters.Shareable,
   183  		AvailableOnNodes: &v1.NodeSelector{
   184  			NodeSelectorTerms: []v1.NodeSelectorTerm{
   185  				{
   186  					MatchExpressions: []v1.NodeSelectorRequirement{
   187  						{Key: "kubernetes.io/hostname", Operator: v1.NodeSelectorOpIn, Values: []string{nodeName}},
   188  					},
   189  				},
   190  			},
   191  		},
   192  	}
   193  
   194  	nodeResources := resources[nodeName]
   195  	for driverName, perDriver := range c.namedresources {
   196  		// Must return one entry for each request. The entry may be nil. This way,
   197  		// the result can be correlated with the per-request parameters.
   198  		results, err := perDriver.controller.Allocate(ctx, nodeResources[driverName].NamedResources)
   199  		if err != nil {
   200  			return "", nil, fmt.Errorf("allocating via named resources structured model: %w", err)
   201  		}
   202  		handle := resourcev1alpha2.ResourceHandle{
   203  			DriverName: driverName,
   204  			StructuredData: &resourcev1alpha2.StructuredResourceHandle{
   205  				NodeName: nodeName,
   206  			},
   207  		}
   208  		for i, result := range results {
   209  			if result == nil {
   210  				continue
   211  			}
   212  			handle.StructuredData.Results = append(handle.StructuredData.Results,
   213  				resourcev1alpha2.DriverAllocationResult{
   214  					VendorRequestParameters: perDriver.parameters[i],
   215  					AllocationResultModel: resourcev1alpha2.AllocationResultModel{
   216  						NamedResources: result,
   217  					},
   218  				},
   219  			)
   220  		}
   221  		if c.classParameters != nil {
   222  			for _, p := range c.classParameters.VendorParameters {
   223  				if p.DriverName == driverName {
   224  					handle.StructuredData.VendorClassParameters = p.Parameters
   225  					break
   226  				}
   227  			}
   228  		}
   229  		for _, request := range c.claimParameters.DriverRequests {
   230  			if request.DriverName == driverName {
   231  				handle.StructuredData.VendorClaimParameters = request.VendorParameters
   232  				break
   233  			}
   234  		}
   235  		allocation.ResourceHandles = append(allocation.ResourceHandles, handle)
   236  	}
   237  
   238  	return c.class.DriverName, allocation, nil
   239  }
   240  

View as plain text