/* Copyright 2022 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package app does all of the work necessary to configure and run a // Kubernetes app process. package app import ( "context" "encoding/json" "errors" "fmt" "math/rand" "strings" "sync" v1 "k8s.io/api/core/v1" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/dynamic-resource-allocation/controller" "k8s.io/klog/v2" ) type Resources struct { DriverName string DontSetReservedFor bool NodeLocal bool // Nodes is a fixed list of node names on which resources are // available. Mutually exclusive with NodeLabels. Nodes []string // NodeLabels are labels which determine on which nodes resources are // available. Mutually exclusive with Nodes. NodeLabels labels.Set MaxAllocations int Shareable bool // AllocateWrapper, if set, gets called for each Allocate call. AllocateWrapper AllocateWrapperType } func (r Resources) AllNodes(nodeLister listersv1.NodeLister) []string { if len(r.NodeLabels) > 0 { // Determine nodes with resources dynamically. nodes, _ := nodeLister.List(labels.SelectorFromValidatedSet(r.NodeLabels)) nodeNames := make([]string, 0, len(nodes)) for _, node := range nodes { nodeNames = append(nodeNames, node.Name) } return nodeNames } return r.Nodes } func (r Resources) NewAllocation(node string, data []byte) *resourcev1alpha2.AllocationResult { allocation := &resourcev1alpha2.AllocationResult{ Shareable: r.Shareable, } allocation.ResourceHandles = []resourcev1alpha2.ResourceHandle{ { DriverName: r.DriverName, Data: string(data), }, } if node == "" && len(r.NodeLabels) > 0 { // Available on all nodes matching the labels. var requirements []v1.NodeSelectorRequirement for key, value := range r.NodeLabels { requirements = append(requirements, v1.NodeSelectorRequirement{ Key: key, Operator: v1.NodeSelectorOpIn, Values: []string{value}, }) } allocation.AvailableOnNodes = &v1.NodeSelector{ NodeSelectorTerms: []v1.NodeSelectorTerm{ { MatchExpressions: requirements, }, }, } } else { var nodes []string if node != "" { // Local to one node. nodes = append(nodes, node) } else { // Available on the fixed set of nodes. nodes = r.Nodes } if len(nodes) > 0 { allocation.AvailableOnNodes = &v1.NodeSelector{ NodeSelectorTerms: []v1.NodeSelectorTerm{ { MatchExpressions: []v1.NodeSelectorRequirement{ { Key: "kubernetes.io/hostname", Operator: v1.NodeSelectorOpIn, Values: nodes, }, }, }, }, } } } return allocation } type AllocateWrapperType func(ctx context.Context, claimAllocations []*controller.ClaimAllocation, selectedNode string, handler func(ctx context.Context, claimAllocations []*controller.ClaimAllocation, selectedNode string), ) type ExampleController struct { clientset kubernetes.Interface nodeLister listersv1.NodeLister resources Resources mutex sync.Mutex // allocated maps claim.UID to the node (if network-attached) or empty (if not). allocated map[types.UID]string // claimsPerNode counts how many claims are currently allocated for a node (non-empty key) // or allocated for the entire cluster (empty key). Must be kept in sync with allocated. claimsPerNode map[string]int numAllocations, numDeallocations int64 } func NewController(clientset kubernetes.Interface, resources Resources) *ExampleController { c := &ExampleController{ clientset: clientset, resources: resources, allocated: make(map[types.UID]string), claimsPerNode: make(map[string]int), } return c } func (c *ExampleController) Run(ctx context.Context, workers int) { informerFactory := informers.NewSharedInformerFactory(c.clientset, 0 /* resync period */) ctrl := controller.New(ctx, c.resources.DriverName, c, c.clientset, informerFactory) c.nodeLister = informerFactory.Core().V1().Nodes().Lister() ctrl.SetReservedFor(!c.resources.DontSetReservedFor) informerFactory.Start(ctx.Done()) ctrl.Run(workers) // If we get here, the context was canceled and we can wait for informer factory goroutines. informerFactory.Shutdown() } type parameters struct { EnvVars map[string]string NodeName string } var _ controller.Driver = &ExampleController{} // GetNumAllocations returns the number of times that a claim was allocated. // Idempotent calls to Allocate that do not need to allocate the claim again do // not contribute to that counter. func (c *ExampleController) GetNumAllocations() int64 { c.mutex.Lock() defer c.mutex.Unlock() return c.numAllocations } // GetNumDeallocations returns the number of times that a claim was allocated. // Idempotent calls to Allocate that do not need to allocate the claim again do // not contribute to that counter. func (c *ExampleController) GetNumDeallocations() int64 { c.mutex.Lock() defer c.mutex.Unlock() return c.numDeallocations } func (c *ExampleController) GetClassParameters(ctx context.Context, class *resourcev1alpha2.ResourceClass) (interface{}, error) { if class.ParametersRef != nil { if class.ParametersRef.APIGroup != "" || class.ParametersRef.Kind != "ConfigMap" { return nil, fmt.Errorf("class parameters are only supported in APIVersion v1, Kind ConfigMap, got: %v", class.ParametersRef) } return c.readParametersFromConfigMap(ctx, class.ParametersRef.Namespace, class.ParametersRef.Name) } return nil, nil } func (c *ExampleController) GetClaimParameters(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, class *resourcev1alpha2.ResourceClass, classParameters interface{}) (interface{}, error) { if claim.Spec.ParametersRef != nil { if claim.Spec.ParametersRef.APIGroup != "" || claim.Spec.ParametersRef.Kind != "ConfigMap" { return nil, fmt.Errorf("claim parameters are only supported in APIVersion v1, Kind ConfigMap, got: %v", claim.Spec.ParametersRef) } return c.readParametersFromConfigMap(ctx, claim.Namespace, claim.Spec.ParametersRef.Name) } return nil, nil } func (c *ExampleController) readParametersFromConfigMap(ctx context.Context, namespace, name string) (map[string]string, error) { configMap, err := c.clientset.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { return nil, fmt.Errorf("get config map: %w", err) } return configMap.Data, nil } func (c *ExampleController) Allocate(ctx context.Context, claimAllocations []*controller.ClaimAllocation, selectedNode string) { if c.resources.AllocateWrapper != nil { c.resources.AllocateWrapper(ctx, claimAllocations, selectedNode, c.allocateOneByOne) } else { c.allocateOneByOne(ctx, claimAllocations, selectedNode) } return } func (c *ExampleController) allocateOneByOne(ctx context.Context, claimAllocations []*controller.ClaimAllocation, selectedNode string) { for _, ca := range claimAllocations { allocationResult, err := c.allocateOne(ctx, ca.Claim, ca.ClaimParameters, ca.Class, ca.ClassParameters, selectedNode) if err != nil { ca.Error = fmt.Errorf("failed allocating claim %v", ca.Claim.UID) continue } ca.Allocation = allocationResult } } // allocate simply copies parameters as JSON map into a ResourceHandle. func (c *ExampleController) allocateOne(ctx context.Context, claim *resourcev1alpha2.ResourceClaim, claimParameters interface{}, class *resourcev1alpha2.ResourceClass, classParameters interface{}, selectedNode string) (result *resourcev1alpha2.AllocationResult, err error) { logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "Allocate"), "claim", klog.KObj(claim), "uid", claim.UID) defer func() { logger.V(3).Info("done", "result", result, "err", err) }() c.mutex.Lock() defer c.mutex.Unlock() // Already allocated? Then we don't need to count it again. node, alreadyAllocated := c.allocated[claim.UID] if alreadyAllocated { // Idempotent result - kind of. We don't check whether // the parameters changed in the meantime. A real // driver would have to do that. logger.V(3).V(3).Info("already allocated") } else { logger.V(3).Info("starting", "selectedNode", selectedNode) nodes := c.resources.AllNodes(c.nodeLister) if c.resources.NodeLocal { node = selectedNode if node == "" { // If none has been selected because we do immediate allocation, // then we need to pick one ourselves. var viableNodes []string for _, n := range nodes { if c.resources.MaxAllocations == 0 || c.claimsPerNode[n] < c.resources.MaxAllocations { viableNodes = append(viableNodes, n) } } if len(viableNodes) == 0 { return nil, errors.New("resources exhausted on all nodes") } // Pick randomly. We could also prefer the one with the least // number of allocations (even spreading) or the most (packing). node = viableNodes[rand.Intn(len(viableNodes))] logger.V(3).Info("picked a node ourselves", "selectedNode", selectedNode) } else if !contains(nodes, node) || c.resources.MaxAllocations > 0 && c.claimsPerNode[node] >= c.resources.MaxAllocations { return nil, fmt.Errorf("resources exhausted on node %q", node) } } else { if c.resources.MaxAllocations > 0 && len(c.allocated) >= c.resources.MaxAllocations { return nil, errors.New("resources exhausted in the cluster") } } } p := parameters{ EnvVars: make(map[string]string), NodeName: node, } toEnvVars("user", claimParameters, p.EnvVars) toEnvVars("admin", classParameters, p.EnvVars) data, err := json.Marshal(p) if err != nil { return nil, fmt.Errorf("encode parameters: %w", err) } allocation := c.resources.NewAllocation(node, data) if !alreadyAllocated { c.numAllocations++ c.allocated[claim.UID] = node c.claimsPerNode[node]++ } return allocation, nil } func (c *ExampleController) Deallocate(ctx context.Context, claim *resourcev1alpha2.ResourceClaim) error { logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "Deallocate"), "claim", klog.KObj(claim), "uid", claim.UID) c.mutex.Lock() defer c.mutex.Unlock() node, ok := c.allocated[claim.UID] if !ok { logger.V(3).Info("already deallocated") return nil } logger.V(3).Info("done") c.numDeallocations++ delete(c.allocated, claim.UID) c.claimsPerNode[node]-- return nil } func (c *ExampleController) UnsuitableNodes(ctx context.Context, pod *v1.Pod, claims []*controller.ClaimAllocation, potentialNodes []string) (finalErr error) { logger := klog.LoggerWithValues(klog.LoggerWithName(klog.FromContext(ctx), "UnsuitableNodes"), "pod", klog.KObj(pod)) c.mutex.Lock() defer c.mutex.Unlock() logger.V(3).Info("starting", "claims", claims, "potentialNodes", potentialNodes) defer func() { // UnsuitableNodes is the same for all claims. logger.V(3).Info("done", "unsuitableNodes", claims[0].UnsuitableNodes, "err", finalErr) }() if c.resources.MaxAllocations == 0 { // All nodes are suitable. return nil } nodes := c.resources.AllNodes(c.nodeLister) if c.resources.NodeLocal { for _, claim := range claims { claim.UnsuitableNodes = nil for _, node := range potentialNodes { // If we have more than one claim, then a // single pod wants to use all of them. That // can only work if a node has capacity left // for all of them. Also, nodes that the driver // doesn't run on cannot be used. if !contains(nodes, node) || c.claimsPerNode[node]+len(claims) > c.resources.MaxAllocations { claim.UnsuitableNodes = append(claim.UnsuitableNodes, node) } } } return nil } allocations := c.claimsPerNode[""] for _, claim := range claims { claim.UnsuitableNodes = nil for _, node := range potentialNodes { if !contains(nodes, node) || allocations+len(claims) > c.resources.MaxAllocations { claim.UnsuitableNodes = append(claim.UnsuitableNodes, node) } } } return nil } func toEnvVars(what string, from interface{}, to map[string]string) { if from == nil { return } env := from.(map[string]string) for key, value := range env { to[what+"_"+strings.ToLower(key)] = value } } func contains[T comparable](list []T, value T) bool { for _, v := range list { if v == value { return true } } return false }