...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/dra/manager.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/dra

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package dra
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  
    23  	v1 "k8s.io/api/core/v1"
    24  	resourceapi "k8s.io/api/resource/v1alpha2"
    25  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    26  	"k8s.io/apimachinery/pkg/types"
    27  	clientset "k8s.io/client-go/kubernetes"
    28  	"k8s.io/dynamic-resource-allocation/resourceclaim"
    29  	"k8s.io/klog/v2"
    30  	drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
    31  	dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
    32  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    33  )
    34  
    35  // draManagerStateFileName is the file name where dra manager stores its state
    36  const draManagerStateFileName = "dra_manager_state"
    37  
    38  // ManagerImpl is the structure in charge of managing DRA resource Plugins.
    39  type ManagerImpl struct {
    40  	// cache contains cached claim info
    41  	cache *claimInfoCache
    42  
    43  	// KubeClient reference
    44  	kubeClient clientset.Interface
    45  }
    46  
    47  // NewManagerImpl creates a new manager.
    48  func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, nodeName types.NodeName) (*ManagerImpl, error) {
    49  	klog.V(2).InfoS("Creating DRA manager")
    50  
    51  	claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName)
    52  	if err != nil {
    53  		return nil, fmt.Errorf("failed to create claimInfo cache: %+v", err)
    54  	}
    55  
    56  	manager := &ManagerImpl{
    57  		cache:      claimInfoCache,
    58  		kubeClient: kubeClient,
    59  	}
    60  
    61  	return manager, nil
    62  }
    63  
    64  // PrepareResources attempts to prepare all of the required resource
    65  // plugin resources for the input container, issue NodePrepareResources rpc requests
    66  // for each new resource requirement, process their responses and update the cached
    67  // containerResources on success.
    68  func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
    69  	batches := make(map[string][]*drapb.Claim)
    70  	claimInfos := make(map[types.UID]*ClaimInfo)
    71  	for i := range pod.Spec.ResourceClaims {
    72  		podClaim := &pod.Spec.ResourceClaims[i]
    73  		klog.V(3).InfoS("Processing resource", "podClaim", podClaim.Name, "pod", pod.Name)
    74  		claimName, mustCheckOwner, err := resourceclaim.Name(pod, podClaim)
    75  		if err != nil {
    76  			return fmt.Errorf("prepare resource claim: %v", err)
    77  		}
    78  
    79  		if claimName == nil {
    80  			// Nothing to do.
    81  			continue
    82  		}
    83  		// Query claim object from the API server
    84  		resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(
    85  			context.TODO(),
    86  			*claimName,
    87  			metav1.GetOptions{})
    88  		if err != nil {
    89  			return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", *claimName, pod.Name, err)
    90  		}
    91  
    92  		if mustCheckOwner {
    93  			if err = resourceclaim.IsForPod(pod, resourceClaim); err != nil {
    94  				return err
    95  			}
    96  		}
    97  
    98  		// Check if pod is in the ReservedFor for the claim
    99  		if !resourceclaim.IsReservedForPod(pod, resourceClaim) {
   100  			return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)",
   101  				pod.Name, pod.UID, *claimName, resourceClaim.UID)
   102  		}
   103  
   104  		// If no container actually uses the claim, then we don't need
   105  		// to prepare it.
   106  		if !claimIsUsedByPod(podClaim, pod) {
   107  			klog.V(5).InfoS("Skipping unused resource", "claim", claimName, "pod", pod.Name)
   108  			continue
   109  		}
   110  
   111  		claimInfo := m.cache.get(*claimName, pod.Namespace)
   112  		if claimInfo == nil {
   113  			// claim does not exist in cache, create new claimInfo object
   114  			// to be processed later.
   115  			claimInfo = newClaimInfoFromResourceClaim(resourceClaim)
   116  		}
   117  
   118  		// We delay checkpointing of this change until this call
   119  		// returns successfully. It is OK to do this because we
   120  		// will only return successfully from this call if the
   121  		// checkpoint has succeeded. That means if the kubelet is
   122  		// ever restarted before this checkpoint succeeds, the pod
   123  		// whose resources are being prepared would never have
   124  		// started, so it's OK (actually correct) to not include it
   125  		// in the cache.
   126  		claimInfo.addPodReference(pod.UID)
   127  
   128  		if claimInfo.prepared {
   129  			// Already prepared this claim, no need to prepare it again
   130  			continue
   131  		}
   132  
   133  		// Loop through all plugins and prepare for calling NodePrepareResources.
   134  		for _, resourceHandle := range claimInfo.ResourceHandles {
   135  			// If no DriverName is provided in the resourceHandle, we
   136  			// use the DriverName from the status
   137  			pluginName := resourceHandle.DriverName
   138  			if pluginName == "" {
   139  				pluginName = resourceClaim.Status.DriverName
   140  			}
   141  			claim := &drapb.Claim{
   142  				Namespace:      resourceClaim.Namespace,
   143  				Uid:            string(resourceClaim.UID),
   144  				Name:           resourceClaim.Name,
   145  				ResourceHandle: resourceHandle.Data,
   146  			}
   147  			if resourceHandle.StructuredData != nil {
   148  				claim.StructuredResourceHandle = []*resourceapi.StructuredResourceHandle{resourceHandle.StructuredData}
   149  			}
   150  			batches[pluginName] = append(batches[pluginName], claim)
   151  		}
   152  		claimInfos[resourceClaim.UID] = claimInfo
   153  	}
   154  
   155  	// Call NodePrepareResources for all claims in each batch.
   156  	// If there is any error, processing gets aborted.
   157  	// We could try to continue, but that would make the code more complex.
   158  	for pluginName, claims := range batches {
   159  		// Call NodePrepareResources RPC for all resource handles.
   160  		client, err := dra.NewDRAPluginClient(pluginName)
   161  		if err != nil {
   162  			return fmt.Errorf("failed to get DRA Plugin client for plugin name %s: %v", pluginName, err)
   163  		}
   164  		response, err := client.NodePrepareResources(context.Background(), &drapb.NodePrepareResourcesRequest{Claims: claims})
   165  		if err != nil {
   166  			// General error unrelated to any particular claim.
   167  			return fmt.Errorf("NodePrepareResources failed: %v", err)
   168  		}
   169  		for claimUID, result := range response.Claims {
   170  			reqClaim := lookupClaimRequest(claims, claimUID)
   171  			if reqClaim == nil {
   172  				return fmt.Errorf("NodePrepareResources returned result for unknown claim UID %s", claimUID)
   173  			}
   174  			if result.Error != "" {
   175  				return fmt.Errorf("NodePrepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, result.Error)
   176  			}
   177  
   178  			claimInfo := claimInfos[types.UID(claimUID)]
   179  
   180  			// Add the CDI Devices returned by NodePrepareResources to
   181  			// the claimInfo object.
   182  			err = claimInfo.addCDIDevices(pluginName, result.CDIDevices)
   183  			if err != nil {
   184  				return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
   185  			}
   186  			// mark claim as (successfully) prepared by manager, so next time we dont prepare it.
   187  			claimInfo.prepared = true
   188  
   189  			// TODO: We (re)add the claimInfo object to the cache and
   190  			// sync it to the checkpoint *after* the
   191  			// NodePrepareResources call has completed. This will cause
   192  			// issues if the kubelet gets restarted between
   193  			// NodePrepareResources and syncToCheckpoint. It will result
   194  			// in not calling NodeUnprepareResources for this claim
   195  			// because no claimInfo will be synced back to the cache
   196  			// for it after the restart. We need to resolve this issue
   197  			// before moving to beta.
   198  			m.cache.add(claimInfo)
   199  		}
   200  
   201  		// Checkpoint to reduce redundant calls to
   202  		// NodePrepareResources after a kubelet restart.
   203  		err = m.cache.syncToCheckpoint()
   204  		if err != nil {
   205  			return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
   206  		}
   207  
   208  		unfinished := len(claims) - len(response.Claims)
   209  		if unfinished != 0 {
   210  			return fmt.Errorf("NodePrepareResources left out %d claims", unfinished)
   211  		}
   212  	}
   213  	// Checkpoint to capture all of the previous addPodReference() calls.
   214  	err := m.cache.syncToCheckpoint()
   215  	if err != nil {
   216  		return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
   217  	}
   218  	return nil
   219  }
   220  
   221  func lookupClaimRequest(claims []*drapb.Claim, claimUID string) *drapb.Claim {
   222  	for _, claim := range claims {
   223  		if claim.Uid == claimUID {
   224  			return claim
   225  		}
   226  	}
   227  	return nil
   228  }
   229  
   230  func claimIsUsedByPod(podClaim *v1.PodResourceClaim, pod *v1.Pod) bool {
   231  	if claimIsUsedByContainers(podClaim, pod.Spec.InitContainers) {
   232  		return true
   233  	}
   234  	if claimIsUsedByContainers(podClaim, pod.Spec.Containers) {
   235  		return true
   236  	}
   237  	return false
   238  }
   239  
   240  func claimIsUsedByContainers(podClaim *v1.PodResourceClaim, containers []v1.Container) bool {
   241  	for i := range containers {
   242  		if claimIsUsedByContainer(podClaim, &containers[i]) {
   243  			return true
   244  		}
   245  	}
   246  	return false
   247  }
   248  
   249  func claimIsUsedByContainer(podClaim *v1.PodResourceClaim, container *v1.Container) bool {
   250  	for _, c := range container.Resources.Claims {
   251  		if c.Name == podClaim.Name {
   252  			return true
   253  		}
   254  	}
   255  	return false
   256  }
   257  
   258  // GetResources gets a ContainerInfo object from the claimInfo cache.
   259  // This information is used by the caller to update a container config.
   260  func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) {
   261  	annotations := []kubecontainer.Annotation{}
   262  	cdiDevices := []kubecontainer.CDIDevice{}
   263  
   264  	for i, podResourceClaim := range pod.Spec.ResourceClaims {
   265  		claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
   266  		if err != nil {
   267  			return nil, fmt.Errorf("list resource claims: %v", err)
   268  		}
   269  		// The claim name might be nil if no underlying resource claim
   270  		// was generated for the referenced claim. There are valid use
   271  		// cases when this might happen, so we simply skip it.
   272  		if claimName == nil {
   273  			continue
   274  		}
   275  		for _, claim := range container.Resources.Claims {
   276  			if podResourceClaim.Name != claim.Name {
   277  				continue
   278  			}
   279  
   280  			claimInfo := m.cache.get(*claimName, pod.Namespace)
   281  			if claimInfo == nil {
   282  				return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, *claimName)
   283  			}
   284  
   285  			claimInfo.RLock()
   286  			claimAnnotations := claimInfo.annotationsAsList()
   287  			klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimAnnotations)
   288  			annotations = append(annotations, claimAnnotations...)
   289  			for _, devices := range claimInfo.CDIDevices {
   290  				for _, device := range devices {
   291  					cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device})
   292  				}
   293  			}
   294  			claimInfo.RUnlock()
   295  		}
   296  	}
   297  
   298  	return &ContainerInfo{Annotations: annotations, CDIDevices: cdiDevices}, nil
   299  }
   300  
   301  // UnprepareResources calls a plugin's NodeUnprepareResource API for each resource claim owned by a pod.
   302  // This function is idempotent and may be called multiple times against the same pod.
   303  // As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have
   304  // already been successfully unprepared.
   305  func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
   306  	batches := make(map[string][]*drapb.Claim)
   307  	claimInfos := make(map[types.UID]*ClaimInfo)
   308  	for i := range pod.Spec.ResourceClaims {
   309  		claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
   310  		if err != nil {
   311  			return fmt.Errorf("unprepare resource claim: %v", err)
   312  		}
   313  
   314  		// The claim name might be nil if no underlying resource claim
   315  		// was generated for the referenced claim. There are valid use
   316  		// cases when this might happen, so we simply skip it.
   317  		if claimName == nil {
   318  			continue
   319  		}
   320  
   321  		claimInfo := m.cache.get(*claimName, pod.Namespace)
   322  
   323  		// Skip calling NodeUnprepareResource if claim info is not cached
   324  		if claimInfo == nil {
   325  			continue
   326  		}
   327  
   328  		// Skip calling NodeUnprepareResource if other pods are still referencing it
   329  		if len(claimInfo.PodUIDs) > 1 {
   330  			// We delay checkpointing of this change until this call returns successfully.
   331  			// It is OK to do this because we will only return successfully from this call if
   332  			// the checkpoint has succeeded. That means if the kubelet is ever restarted
   333  			// before this checkpoint succeeds, we will simply call into this (idempotent)
   334  			// function again.
   335  			claimInfo.deletePodReference(pod.UID)
   336  			continue
   337  		}
   338  
   339  		// Loop through all plugins and prepare for calling NodeUnprepareResources.
   340  		for _, resourceHandle := range claimInfo.ResourceHandles {
   341  			// If no DriverName is provided in the resourceHandle, we
   342  			// use the DriverName from the status
   343  			pluginName := resourceHandle.DriverName
   344  			if pluginName == "" {
   345  				pluginName = claimInfo.DriverName
   346  			}
   347  
   348  			claim := &drapb.Claim{
   349  				Namespace:      claimInfo.Namespace,
   350  				Uid:            string(claimInfo.ClaimUID),
   351  				Name:           claimInfo.ClaimName,
   352  				ResourceHandle: resourceHandle.Data,
   353  			}
   354  			if resourceHandle.StructuredData != nil {
   355  				claim.StructuredResourceHandle = []*resourceapi.StructuredResourceHandle{resourceHandle.StructuredData}
   356  			}
   357  			batches[pluginName] = append(batches[pluginName], claim)
   358  		}
   359  		claimInfos[claimInfo.ClaimUID] = claimInfo
   360  	}
   361  
   362  	// Call NodeUnprepareResources for all claims in each batch.
   363  	// If there is any error, processing gets aborted.
   364  	// We could try to continue, but that would make the code more complex.
   365  	for pluginName, claims := range batches {
   366  		// Call NodeUnprepareResources RPC for all resource handles.
   367  		client, err := dra.NewDRAPluginClient(pluginName)
   368  		if err != nil {
   369  			return fmt.Errorf("failed to get DRA Plugin client for plugin name %s: %v", pluginName, err)
   370  		}
   371  		response, err := client.NodeUnprepareResources(context.Background(), &drapb.NodeUnprepareResourcesRequest{Claims: claims})
   372  		if err != nil {
   373  			// General error unrelated to any particular claim.
   374  			return fmt.Errorf("NodeUnprepareResources failed: %v", err)
   375  		}
   376  
   377  		for claimUID, result := range response.Claims {
   378  			reqClaim := lookupClaimRequest(claims, claimUID)
   379  			if reqClaim == nil {
   380  				return fmt.Errorf("NodeUnprepareResources returned result for unknown claim UID %s", claimUID)
   381  			}
   382  			if result.Error != "" {
   383  				return fmt.Errorf("NodeUnprepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, result.Error)
   384  			}
   385  
   386  			// Delete last pod UID only if unprepare succeeds.
   387  			// This ensures that the status manager doesn't enter termination status
   388  			// for the pod. This logic is implemented in
   389  			// m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference.
   390  			claimInfo := claimInfos[types.UID(claimUID)]
   391  			claimInfo.deletePodReference(pod.UID)
   392  			m.cache.delete(claimInfo.ClaimName, pod.Namespace)
   393  		}
   394  
   395  		// Checkpoint to reduce redundant calls to NodeUnprepareResources after a kubelet restart.
   396  		err = m.cache.syncToCheckpoint()
   397  		if err != nil {
   398  			return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
   399  		}
   400  
   401  		unfinished := len(claims) - len(response.Claims)
   402  		if unfinished != 0 {
   403  			return fmt.Errorf("NodeUnprepareResources left out %d claims", unfinished)
   404  		}
   405  	}
   406  
   407  	// Checkpoint to capture all of the previous deletePodReference() calls.
   408  	err := m.cache.syncToCheckpoint()
   409  	if err != nil {
   410  		return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
   411  	}
   412  	return nil
   413  }
   414  
   415  // PodMightNeedToUnprepareResources returns true if the pod might need to
   416  // unprepare resources
   417  func (m *ManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool {
   418  	return m.cache.hasPodReference(UID)
   419  }
   420  
   421  // GetCongtainerClaimInfos gets Container's ClaimInfo
   422  func (m *ManagerImpl) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) ([]*ClaimInfo, error) {
   423  	claimInfos := make([]*ClaimInfo, 0, len(pod.Spec.ResourceClaims))
   424  
   425  	for i, podResourceClaim := range pod.Spec.ResourceClaims {
   426  		claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
   427  		if err != nil {
   428  			return nil, fmt.Errorf("determine resource claim information: %v", err)
   429  		}
   430  
   431  		for _, claim := range container.Resources.Claims {
   432  			if podResourceClaim.Name != claim.Name {
   433  				continue
   434  			}
   435  			claimInfo := m.cache.get(*claimName, pod.Namespace)
   436  			if claimInfo == nil {
   437  				return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, *claimName)
   438  			}
   439  			claimInfos = append(claimInfos, claimInfo)
   440  		}
   441  	}
   442  	return claimInfos, nil
   443  }
   444  

View as plain text