1
16
17 package dynamicresources
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23
24 v1 "k8s.io/api/core/v1"
25 resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
26 "k8s.io/apimachinery/pkg/labels"
27 "k8s.io/apimachinery/pkg/runtime"
28 resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
29 "k8s.io/klog/v2"
30 namedresourcesmodel "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources"
31 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
32 )
33
34
35
36 type resources map[string]map[string]ResourceModels
37
38
39
40 type ResourceModels struct {
41 NamedResources namedresourcesmodel.Model
42 }
43
44
45
46
47
48 func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2listers.ResourceSliceLister, claimAssumeCache volumebinding.AssumeCache, inFlightAllocations *sync.Map) (resources, error) {
49 model := make(resources)
50
51 slices, err := resourceSliceLister.List(labels.Everything())
52 if err != nil {
53 return nil, fmt.Errorf("list node resource slices: %w", err)
54 }
55 for _, slice := range slices {
56 if model[slice.NodeName] == nil {
57 model[slice.NodeName] = make(map[string]ResourceModels)
58 }
59 resource := model[slice.NodeName][slice.DriverName]
60 namedresourcesmodel.AddResources(&resource.NamedResources, slice.NamedResources)
61 model[slice.NodeName][slice.DriverName] = resource
62 }
63
64 objs := claimAssumeCache.List(nil)
65 for _, obj := range objs {
66 claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
67 if !ok {
68 return nil, fmt.Errorf("got unexpected object of type %T from claim assume cache", obj)
69 }
70 if obj, ok := inFlightAllocations.Load(claim.UID); ok {
71
72
73 claim = obj.(*resourcev1alpha2.ResourceClaim)
74 }
75 if claim.Status.Allocation == nil {
76 continue
77 }
78 for _, handle := range claim.Status.Allocation.ResourceHandles {
79 structured := handle.StructuredData
80 if structured == nil {
81 continue
82 }
83 if model[structured.NodeName] == nil {
84 model[structured.NodeName] = make(map[string]ResourceModels)
85 }
86 resource := model[structured.NodeName][handle.DriverName]
87 for _, result := range structured.Results {
88
89 namedresourcesmodel.AddAllocation(&resource.NamedResources, result.NamedResources)
90 }
91 }
92 }
93
94 return model, nil
95 }
96
97 func newClaimController(logger klog.Logger, class *resourcev1alpha2.ResourceClass, classParameters *resourcev1alpha2.ResourceClassParameters, claimParameters *resourcev1alpha2.ResourceClaimParameters) (*claimController, error) {
98
99
100
101 type perDriverRequests struct {
102 parameters []runtime.RawExtension
103 requests []*resourcev1alpha2.NamedResourcesRequest
104 }
105 namedresourcesRequests := make(map[string]perDriverRequests)
106 for i, request := range claimParameters.DriverRequests {
107 driverName := request.DriverName
108 p := namedresourcesRequests[driverName]
109 for e, request := range request.Requests {
110 switch {
111 case request.ResourceRequestModel.NamedResources != nil:
112 p.parameters = append(p.parameters, request.VendorParameters)
113 p.requests = append(p.requests, request.ResourceRequestModel.NamedResources)
114 default:
115 return nil, fmt.Errorf("claim parameters %s: driverRequersts[%d].requests[%d]: no supported structured parameters found", klog.KObj(claimParameters), i, e)
116 }
117 }
118 if len(p.requests) > 0 {
119 namedresourcesRequests[driverName] = p
120 }
121 }
122
123 c := &claimController{
124 class: class,
125 classParameters: classParameters,
126 claimParameters: claimParameters,
127 namedresources: make(map[string]perDriverController, len(namedresourcesRequests)),
128 }
129 for driverName, perDriver := range namedresourcesRequests {
130 var filter *resourcev1alpha2.NamedResourcesFilter
131 for _, f := range classParameters.Filters {
132 if f.DriverName == driverName && f.ResourceFilterModel.NamedResources != nil {
133 filter = f.ResourceFilterModel.NamedResources
134 break
135 }
136 }
137 controller, err := namedresourcesmodel.NewClaimController(filter, perDriver.requests)
138 if err != nil {
139 return nil, fmt.Errorf("creating claim controller for named resources structured model: %w", err)
140 }
141 c.namedresources[driverName] = perDriverController{
142 parameters: perDriver.parameters,
143 controller: controller,
144 }
145 }
146 return c, nil
147 }
148
149
150
151 type claimController struct {
152 class *resourcev1alpha2.ResourceClass
153 classParameters *resourcev1alpha2.ResourceClassParameters
154 claimParameters *resourcev1alpha2.ResourceClaimParameters
155 namedresources map[string]perDriverController
156 }
157
158 type perDriverController struct {
159 parameters []runtime.RawExtension
160 controller *namedresourcesmodel.Controller
161 }
162
163 func (c claimController) nodeIsSuitable(ctx context.Context, nodeName string, resources resources) (bool, error) {
164 nodeResources := resources[nodeName]
165 for driverName, perDriver := range c.namedresources {
166 okay, err := perDriver.controller.NodeIsSuitable(ctx, nodeResources[driverName].NamedResources)
167 if err != nil {
168
169
170
171 return false, fmt.Errorf("checking node %q and resources of driver %q: %w", nodeName, driverName, err)
172 }
173 if !okay {
174 return false, nil
175 }
176 }
177 return true, nil
178 }
179
180 func (c claimController) allocate(ctx context.Context, nodeName string, resources resources) (string, *resourcev1alpha2.AllocationResult, error) {
181 allocation := &resourcev1alpha2.AllocationResult{
182 Shareable: c.claimParameters.Shareable,
183 AvailableOnNodes: &v1.NodeSelector{
184 NodeSelectorTerms: []v1.NodeSelectorTerm{
185 {
186 MatchExpressions: []v1.NodeSelectorRequirement{
187 {Key: "kubernetes.io/hostname", Operator: v1.NodeSelectorOpIn, Values: []string{nodeName}},
188 },
189 },
190 },
191 },
192 }
193
194 nodeResources := resources[nodeName]
195 for driverName, perDriver := range c.namedresources {
196
197
198 results, err := perDriver.controller.Allocate(ctx, nodeResources[driverName].NamedResources)
199 if err != nil {
200 return "", nil, fmt.Errorf("allocating via named resources structured model: %w", err)
201 }
202 handle := resourcev1alpha2.ResourceHandle{
203 DriverName: driverName,
204 StructuredData: &resourcev1alpha2.StructuredResourceHandle{
205 NodeName: nodeName,
206 },
207 }
208 for i, result := range results {
209 if result == nil {
210 continue
211 }
212 handle.StructuredData.Results = append(handle.StructuredData.Results,
213 resourcev1alpha2.DriverAllocationResult{
214 VendorRequestParameters: perDriver.parameters[i],
215 AllocationResultModel: resourcev1alpha2.AllocationResultModel{
216 NamedResources: result,
217 },
218 },
219 )
220 }
221 if c.classParameters != nil {
222 for _, p := range c.classParameters.VendorParameters {
223 if p.DriverName == driverName {
224 handle.StructuredData.VendorClassParameters = p.Parameters
225 break
226 }
227 }
228 }
229 for _, request := range c.claimParameters.DriverRequests {
230 if request.DriverName == driverName {
231 handle.StructuredData.VendorClaimParameters = request.VendorParameters
232 break
233 }
234 }
235 allocation.ResourceHandles = append(allocation.ResourceHandles, handle)
236 }
237
238 return c.class.DriverName, allocation, nil
239 }
240
View as plain text