1
16
17
18
19 package app
20
21 import (
22 "context"
23 "encoding/json"
24 "errors"
25 "fmt"
26 "math/rand"
27 "strings"
28 "sync"
29
30 v1 "k8s.io/api/core/v1"
31 resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/labels"
34 "k8s.io/apimachinery/pkg/types"
35 "k8s.io/client-go/informers"
36 "k8s.io/client-go/kubernetes"
37 listersv1 "k8s.io/client-go/listers/core/v1"
38 "k8s.io/dynamic-resource-allocation/controller"
39 "k8s.io/klog/v2"
40 )
41
42 type Resources struct {
43 DriverName string
44 DontSetReservedFor bool
45 NodeLocal bool
46
47
48 Nodes []string
49
50
51 NodeLabels labels.Set
52 MaxAllocations int
53 Shareable bool
54
55
56 AllocateWrapper AllocateWrapperType
57 }
58
59 func (r Resources) AllNodes(nodeLister listersv1.NodeLister) []string {
60 if len(r.NodeLabels) > 0 {
61
62 nodes, _ := nodeLister.List(labels.SelectorFromValidatedSet(r.NodeLabels))
63 nodeNames := make([]string, 0, len(nodes))
64 for _, node := range nodes {
65 nodeNames = append(nodeNames, node.Name)
66 }
67 return nodeNames
68 }
69 return r.Nodes
70 }
71
72 func (r Resources) NewAllocation(node string, data []byte) *resourcev1alpha2.AllocationResult {
73 allocation := &resourcev1alpha2.AllocationResult{
74 Shareable: r.Shareable,
75 }
76 allocation.ResourceHandles = []resourcev1alpha2.ResourceHandle{
77 {
78 DriverName: r.DriverName,
79 Data: string(data),
80 },
81 }
82 if node == "" && len(r.NodeLabels) > 0 {
83
84 var requirements []v1.NodeSelectorRequirement
85 for key, value := range r.NodeLabels {
86 requirements = append(requirements, v1.NodeSelectorRequirement{
87 Key: key,
88 Operator: v1.NodeSelectorOpIn,
89 Values: []string{value},
90 })
91 }
92 allocation.AvailableOnNodes = &v1.NodeSelector{
93 NodeSelectorTerms: []v1.NodeSelectorTerm{
94 {
95 MatchExpressions: requirements,
96 },
97 },
98 }
99 } else {
100 var nodes []string
101 if node != "" {
102
103 nodes = append(nodes, node)
104 } else {
105
106 nodes = r.Nodes
107 }
108 if len(nodes) > 0 {
109 allocation.AvailableOnNodes = &v1.NodeSelector{
110 NodeSelectorTerms: []v1.NodeSelectorTerm{
111 {
112 MatchExpressions: []v1.NodeSelectorRequirement{
113 {
114 Key: "kubernetes.io/hostname",
115 Operator: v1.NodeSelectorOpIn,
116 Values: nodes,
117 },
118 },
119 },
120 },
121 }
122 }
123 }
124
125 return allocation
126 }
127
128 type AllocateWrapperType func(ctx context.Context, claimAllocations []*controller.ClaimAllocation,
129 selectedNode string,
130 handler func(ctx context.Context,
131 claimAllocations []*controller.ClaimAllocation,
132 selectedNode string),
133 )
134
135 type ExampleController struct {
136 clientset kubernetes.Interface
137 nodeLister listersv1.NodeLister
138 resources Resources
139
140 mutex sync.Mutex
141
142 allocated map[types.UID]string
143
144
145 claimsPerNode map[string]int
146
147 numAllocations, numDeallocations int64
148 }
149
150 func NewController(clientset kubernetes.Interface, resources Resources) *ExampleController {
151 c := &ExampleController{
152 clientset: clientset,
153 resources: resources,
154
155 allocated: make(map[types.UID]string),
156 claimsPerNode: make(map[string]int),
157 }
158 return c
159 }
160
161 func (c *ExampleController) Run(ctx context.Context, workers int) {
162 informerFactory := informers.NewSharedInformerFactory(c.clientset, 0 )
163 ctrl := controller.New(ctx, c.resources.DriverName, c, c.clientset, informerFactory)
164 c.nodeLister = informerFactory.Core().V1().Nodes().Lister()
165 ctrl.SetReservedFor(!c.resources.DontSetReservedFor)
166 informerFactory.Start(ctx.Done())
167 ctrl.Run(workers)
168
169 informerFactory.Shutdown()
170 }
171
172 type parameters struct {
173 EnvVars map[string]string
174 NodeName string
175 }
176
177 var _ controller.Driver = &ExampleController{}
178
179
180
181
182 func (c *ExampleController) GetNumAllocations() int64 {
183 c.mutex.Lock()
184 defer c.mutex.Unlock()
185
186 return c.numAllocations
187 }
188
189
190
191
192 func (c *ExampleController) GetNumDeallocations() int64 {
193 c.mutex.Lock()
194 defer c.mutex.Unlock()
195
196 return c.numDeallocations
197 }
198
199 func (c *ExampleController) GetClassParameters(ctx context.Context, class *resourcev1alpha2.ResourceClass) (interface{}, error) {
200 if class.ParametersRef != nil {
201 if class.ParametersRef.APIGroup != "" ||
202 class.ParametersRef.Kind != "ConfigMap" {
203 return nil, fmt.Errorf("class parameters are only supported in APIVersion v1, Kind ConfigMap, got: %v", class.ParametersRef)
204 }
205 return c.readParametersFromConfigMap(ctx, class.ParametersRef.Namespace, class.ParametersRef.Name)
206 }
207 return nil, nil
208 }
209
210 func (c *ExampleController) GetClaimParameters(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, class *resourcev1alpha2.ResourceClass, classParameters interface{}) (interface{}, error) {
211 if claim.Spec.ParametersRef != nil {
212 if claim.Spec.ParametersRef.APIGroup != "" ||
213 claim.Spec.ParametersRef.Kind != "ConfigMap" {
214 return nil, fmt.Errorf("claim parameters are only supported in APIVersion v1, Kind ConfigMap, got: %v", claim.Spec.ParametersRef)
215 }
216 return c.readParametersFromConfigMap(ctx, claim.Namespace, claim.Spec.ParametersRef.Name)
217 }
218 return nil, nil
219 }
220
221 func (c *ExampleController) readParametersFromConfigMap(ctx context.Context, namespace, name string) (map[string]string, error) {
222 configMap, err := c.clientset.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
223 if err != nil {
224 return nil, fmt.Errorf("get config map: %w", err)
225 }
226 return configMap.Data, nil
227 }
228
229 func (c *ExampleController) Allocate(ctx context.Context, claimAllocations []*controller.ClaimAllocation, selectedNode string) {
230
231 if c.resources.AllocateWrapper != nil {
232 c.resources.AllocateWrapper(ctx, claimAllocations, selectedNode, c.allocateOneByOne)
233 } else {
234 c.allocateOneByOne(ctx, claimAllocations, selectedNode)
235 }
236
237 return
238 }
239
240 func (c *ExampleController) allocateOneByOne(ctx context.Context, claimAllocations []*controller.ClaimAllocation, selectedNode string) {
241 for _, ca := range claimAllocations {
242 allocationResult, err := c.allocateOne(ctx, ca.Claim, ca.ClaimParameters, ca.Class, ca.ClassParameters, selectedNode)
243 if err != nil {
244 ca.Error = fmt.Errorf("failed allocating claim %v", ca.Claim.UID)
245 continue
246 }
247 ca.Allocation = allocationResult
248 }
249 }
250
251
252 func (c *ExampleController) allocateOne(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error) {
253 logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "Allocate"), "claim", klog.KObj(claim), "uid", claim.UID)
254 defer func() {
255 logger.V(3).Info("done", "result", result, "err", err)
256 }()
257
258 c.mutex.Lock()
259 defer c.mutex.Unlock()
260
261
262 node, alreadyAllocated := c.allocated[claim.UID]
263 if alreadyAllocated {
264
265
266
267 logger.V(3).V(3).Info("already allocated")
268 } else {
269 logger.V(3).Info("starting", "selectedNode", selectedNode)
270 nodes := c.resources.AllNodes(c.nodeLister)
271 if c.resources.NodeLocal {
272 node = selectedNode
273 if node == "" {
274
275
276 var viableNodes []string
277 for _, n := range nodes {
278 if c.resources.MaxAllocations == 0 ||
279 c.claimsPerNode[n] < c.resources.MaxAllocations {
280 viableNodes = append(viableNodes, n)
281 }
282 }
283 if len(viableNodes) == 0 {
284 return nil, errors.New("resources exhausted on all nodes")
285 }
286
287
288 node = viableNodes[rand.Intn(len(viableNodes))]
289 logger.V(3).Info("picked a node ourselves", "selectedNode", selectedNode)
290 } else if !contains(nodes, node) ||
291 c.resources.MaxAllocations > 0 &&
292 c.claimsPerNode[node] >= c.resources.MaxAllocations {
293 return nil, fmt.Errorf("resources exhausted on node %q", node)
294 }
295 } else {
296 if c.resources.MaxAllocations > 0 &&
297 len(c.allocated) >= c.resources.MaxAllocations {
298 return nil, errors.New("resources exhausted in the cluster")
299 }
300 }
301 }
302
303 p := parameters{
304 EnvVars: make(map[string]string),
305 NodeName: node,
306 }
307 toEnvVars("user", claimParameters, p.EnvVars)
308 toEnvVars("admin", classParameters, p.EnvVars)
309 data, err := json.Marshal(p)
310 if err != nil {
311 return nil, fmt.Errorf("encode parameters: %w", err)
312 }
313 allocation := c.resources.NewAllocation(node, data)
314 if !alreadyAllocated {
315 c.numAllocations++
316 c.allocated[claim.UID] = node
317 c.claimsPerNode[node]++
318 }
319 return allocation, nil
320 }
321
322 func (c *ExampleController) Deallocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim) error {
323 logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "Deallocate"), "claim", klog.KObj(claim), "uid", claim.UID)
324 c.mutex.Lock()
325 defer c.mutex.Unlock()
326
327 node, ok := c.allocated[claim.UID]
328 if !ok {
329 logger.V(3).Info("already deallocated")
330 return nil
331 }
332
333 logger.V(3).Info("done")
334 c.numDeallocations++
335 delete(c.allocated, claim.UID)
336 c.claimsPerNode[node]--
337 return nil
338 }
339
340 func (c *ExampleController) UnsuitableNodes(ctx context.Context, pod *v1.Pod, claims []*controller.ClaimAllocation, potentialNodes []string) (finalErr error) {
341 logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "UnsuitableNodes"), "pod", klog.KObj(pod))
342 c.mutex.Lock()
343 defer c.mutex.Unlock()
344
345 logger.V(3).Info("starting", "claims", claims, "potentialNodes", potentialNodes)
346 defer func() {
347
348 logger.V(3).Info("done", "unsuitableNodes", claims[0].UnsuitableNodes, "err", finalErr)
349 }()
350
351 if c.resources.MaxAllocations == 0 {
352
353 return nil
354 }
355 nodes := c.resources.AllNodes(c.nodeLister)
356 if c.resources.NodeLocal {
357 for _, claim := range claims {
358 claim.UnsuitableNodes = nil
359 for _, node := range potentialNodes {
360
361
362
363
364
365 if !contains(nodes, node) ||
366 c.claimsPerNode[node]+len(claims) > c.resources.MaxAllocations {
367 claim.UnsuitableNodes = append(claim.UnsuitableNodes, node)
368 }
369 }
370 }
371 return nil
372 }
373
374 allocations := c.claimsPerNode[""]
375 for _, claim := range claims {
376 claim.UnsuitableNodes = nil
377 for _, node := range potentialNodes {
378 if !contains(nodes, node) ||
379 allocations+len(claims) > c.resources.MaxAllocations {
380 claim.UnsuitableNodes = append(claim.UnsuitableNodes, node)
381 }
382 }
383 }
384
385 return nil
386 }
387
388 func toEnvVars(what string, from interface{}, to map[string]string) {
389 if from == nil {
390 return
391 }
392
393 env := from.(map[string]string)
394 for key, value := range env {
395 to[what+"_"+strings.ToLower(key)] = value
396 }
397 }
398
399 func contains[T comparable](list []T, value T) bool {
400 for _, v := range list {
401 if v == value {
402 return true
403 }
404 }
405
406 return false
407 }
408
View as plain text