...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/pod_devices.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/devicemanager

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package devicemanager
    18  
    19  import (
    20  	"sync"
    21  
    22  	"k8s.io/klog/v2"
    23  
    24  	"k8s.io/apimachinery/pkg/types"
    25  	"k8s.io/apimachinery/pkg/util/sets"
    26  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    27  	pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
    28  	kubefeatures "k8s.io/kubernetes/pkg/features"
    29  	"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
    30  	"k8s.io/kubernetes/pkg/kubelet/cm/util/cdi"
    31  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    32  )
    33  
    34  type deviceAllocateInfo struct {
    35  	// deviceIds contains device Ids allocated to this container for the given resourceName.
    36  	deviceIds checkpoint.DevicesPerNUMA
    37  	// allocResp contains cached rpc AllocateResponse.
    38  	allocResp *pluginapi.ContainerAllocateResponse
    39  }
    40  
    41  type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName.
    42  type containerDevices map[string]resourceAllocateInfo   // Keyed by containerName.
    43  type podDevices struct {
    44  	sync.RWMutex
    45  	devs map[string]containerDevices // Keyed by podUID.
    46  }
    47  
    48  // NewPodDevices is a function that returns object of podDevices type with its own guard
    49  // RWMutex and a map where key is a pod UID and value contains
    50  // container devices information of type containerDevices.
    51  func newPodDevices() *podDevices {
    52  	return &podDevices{devs: make(map[string]containerDevices)}
    53  }
    54  
    55  func (pdev *podDevices) pods() sets.Set[string] {
    56  	pdev.RLock()
    57  	defer pdev.RUnlock()
    58  	ret := sets.New[string]()
    59  	for k := range pdev.devs {
    60  		ret.Insert(k)
    61  	}
    62  	return ret
    63  }
    64  
    65  func (pdev *podDevices) size() int {
    66  	pdev.RLock()
    67  	defer pdev.RUnlock()
    68  	return len(pdev.devs)
    69  }
    70  
    71  func (pdev *podDevices) hasPod(podUID string) bool {
    72  	pdev.RLock()
    73  	defer pdev.RUnlock()
    74  	_, podExists := pdev.devs[podUID]
    75  	return podExists
    76  }
    77  
    78  func (pdev *podDevices) insert(podUID, contName, resource string, devices checkpoint.DevicesPerNUMA, resp *pluginapi.ContainerAllocateResponse) {
    79  	pdev.Lock()
    80  	defer pdev.Unlock()
    81  	if _, podExists := pdev.devs[podUID]; !podExists {
    82  		pdev.devs[podUID] = make(containerDevices)
    83  	}
    84  	if _, contExists := pdev.devs[podUID][contName]; !contExists {
    85  		pdev.devs[podUID][contName] = make(resourceAllocateInfo)
    86  	}
    87  	pdev.devs[podUID][contName][resource] = deviceAllocateInfo{
    88  		deviceIds: devices,
    89  		allocResp: resp,
    90  	}
    91  }
    92  
    93  func (pdev *podDevices) delete(pods []string) {
    94  	pdev.Lock()
    95  	defer pdev.Unlock()
    96  	for _, uid := range pods {
    97  		delete(pdev.devs, uid)
    98  	}
    99  }
   100  
   101  // Returns list of device Ids allocated to the given pod for the given resource.
   102  // Returns nil if we don't have cached state for the given <podUID, resource>.
   103  func (pdev *podDevices) podDevices(podUID, resource string) sets.Set[string] {
   104  	pdev.RLock()
   105  	defer pdev.RUnlock()
   106  
   107  	ret := sets.New[string]()
   108  	for contName := range pdev.devs[podUID] {
   109  		ret = ret.Union(pdev.containerDevices(podUID, contName, resource))
   110  	}
   111  	return ret
   112  }
   113  
   114  // Returns list of device Ids allocated to the given container for the given resource.
   115  // Returns nil if we don't have cached state for the given <podUID, contName, resource>.
   116  func (pdev *podDevices) containerDevices(podUID, contName, resource string) sets.Set[string] {
   117  	pdev.RLock()
   118  	defer pdev.RUnlock()
   119  	if _, podExists := pdev.devs[podUID]; !podExists {
   120  		return nil
   121  	}
   122  	if _, contExists := pdev.devs[podUID][contName]; !contExists {
   123  		return nil
   124  	}
   125  	devs, resourceExists := pdev.devs[podUID][contName][resource]
   126  	if !resourceExists {
   127  		return nil
   128  	}
   129  	return devs.deviceIds.Devices()
   130  }
   131  
   132  // Populates allocatedResources with the device resources allocated to the specified <podUID, contName>.
   133  func (pdev *podDevices) addContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.Set[string]) {
   134  	pdev.RLock()
   135  	defer pdev.RUnlock()
   136  	containers, exists := pdev.devs[podUID]
   137  	if !exists {
   138  		return
   139  	}
   140  	resources, exists := containers[contName]
   141  	if !exists {
   142  		return
   143  	}
   144  	for resource, devices := range resources {
   145  		allocatedResources[resource] = allocatedResources[resource].Union(devices.deviceIds.Devices())
   146  	}
   147  }
   148  
   149  // Removes the device resources allocated to the specified <podUID, contName> from allocatedResources.
   150  func (pdev *podDevices) removeContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.Set[string]) {
   151  	pdev.RLock()
   152  	defer pdev.RUnlock()
   153  	containers, exists := pdev.devs[podUID]
   154  	if !exists {
   155  		return
   156  	}
   157  	resources, exists := containers[contName]
   158  	if !exists {
   159  		return
   160  	}
   161  	for resource, devices := range resources {
   162  		allocatedResources[resource] = allocatedResources[resource].Difference(devices.deviceIds.Devices())
   163  	}
   164  }
   165  
   166  // Returns all devices allocated to the pods being tracked, keyed by resourceName.
   167  func (pdev *podDevices) devices() map[string]sets.Set[string] {
   168  	ret := make(map[string]sets.Set[string])
   169  	pdev.RLock()
   170  	defer pdev.RUnlock()
   171  	for _, containerDevices := range pdev.devs {
   172  		for _, resources := range containerDevices {
   173  			for resource, devices := range resources {
   174  				if _, exists := ret[resource]; !exists {
   175  					ret[resource] = sets.New[string]()
   176  				}
   177  				if devices.allocResp != nil {
   178  					ret[resource] = ret[resource].Union(devices.deviceIds.Devices())
   179  				}
   180  			}
   181  		}
   182  	}
   183  	return ret
   184  }
   185  
   186  // Turns podDevices to checkpointData.
   187  func (pdev *podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
   188  	var data []checkpoint.PodDevicesEntry
   189  	pdev.RLock()
   190  	defer pdev.RUnlock()
   191  	for podUID, containerDevices := range pdev.devs {
   192  		for conName, resources := range containerDevices {
   193  			for resource, devices := range resources {
   194  				if devices.allocResp == nil {
   195  					klog.ErrorS(nil, "Can't marshal allocResp, allocation response is missing", "podUID", podUID, "containerName", conName, "resourceName", resource)
   196  					continue
   197  				}
   198  
   199  				allocResp, err := devices.allocResp.Marshal()
   200  				if err != nil {
   201  					klog.ErrorS(err, "Can't marshal allocResp", "podUID", podUID, "containerName", conName, "resourceName", resource)
   202  					continue
   203  				}
   204  				data = append(data, checkpoint.PodDevicesEntry{
   205  					PodUID:        podUID,
   206  					ContainerName: conName,
   207  					ResourceName:  resource,
   208  					DeviceIDs:     devices.deviceIds,
   209  					AllocResp:     allocResp})
   210  			}
   211  		}
   212  	}
   213  	return data
   214  }
   215  
   216  // Populates podDevices from the passed in checkpointData.
   217  func (pdev *podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) {
   218  	for _, entry := range data {
   219  		klog.V(2).InfoS("Get checkpoint entry",
   220  			"podUID", entry.PodUID, "containerName", entry.ContainerName,
   221  			"resourceName", entry.ResourceName, "deviceIDs", entry.DeviceIDs, "allocated", entry.AllocResp)
   222  		allocResp := &pluginapi.ContainerAllocateResponse{}
   223  		err := allocResp.Unmarshal(entry.AllocResp)
   224  		if err != nil {
   225  			klog.ErrorS(err, "Can't unmarshal allocResp", "podUID", entry.PodUID, "containerName", entry.ContainerName, "resourceName", entry.ResourceName)
   226  			continue
   227  		}
   228  		pdev.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, allocResp)
   229  	}
   230  }
   231  
   232  // Returns combined container runtime settings to consume the container's allocated devices.
   233  func (pdev *podDevices) deviceRunContainerOptions(podUID, contName string) *DeviceRunContainerOptions {
   234  	pdev.RLock()
   235  	defer pdev.RUnlock()
   236  
   237  	containers, exists := pdev.devs[podUID]
   238  	if !exists {
   239  		return nil
   240  	}
   241  	resources, exists := containers[contName]
   242  	if !exists {
   243  		return nil
   244  	}
   245  	opts := &DeviceRunContainerOptions{}
   246  	// Maps to detect duplicate settings.
   247  	devsMap := make(map[string]string)
   248  	mountsMap := make(map[string]string)
   249  	envsMap := make(map[string]string)
   250  	annotationsMap := make(map[string]string)
   251  	// Keep track of all CDI devices requested for the container.
   252  	allCDIDevices := sets.New[string]()
   253  	// Loops through AllocationResponses of all cached device resources.
   254  	for _, devices := range resources {
   255  		resp := devices.allocResp
   256  		// Each Allocate response has the following artifacts.
   257  		// Environment variables
   258  		// Mount points
   259  		// Device files
   260  		// Container annotations
   261  		// CDI device IDs
   262  		// These artifacts are per resource per container.
   263  		// Updates RunContainerOptions.Envs.
   264  		for k, v := range resp.Envs {
   265  			if e, ok := envsMap[k]; ok {
   266  				klog.V(4).InfoS("Skip existing env", "envKey", k, "envValue", v)
   267  				if e != v {
   268  					klog.ErrorS(nil, "Environment variable has conflicting setting", "envKey", k, "expected", v, "got", e)
   269  				}
   270  				continue
   271  			}
   272  			klog.V(4).InfoS("Add env", "envKey", k, "envValue", v)
   273  			envsMap[k] = v
   274  			opts.Envs = append(opts.Envs, kubecontainer.EnvVar{Name: k, Value: v})
   275  		}
   276  
   277  		// Updates RunContainerOptions.Devices.
   278  		for _, dev := range resp.Devices {
   279  			if d, ok := devsMap[dev.ContainerPath]; ok {
   280  				klog.V(4).InfoS("Skip existing device", "containerPath", dev.ContainerPath, "hostPath", dev.HostPath)
   281  				if d != dev.HostPath {
   282  					klog.ErrorS(nil, "Container device has conflicting mapping host devices",
   283  						"containerPath", dev.ContainerPath, "got", d, "expected", dev.HostPath)
   284  				}
   285  				continue
   286  			}
   287  			klog.V(4).InfoS("Add device", "containerPath", dev.ContainerPath, "hostPath", dev.HostPath)
   288  			devsMap[dev.ContainerPath] = dev.HostPath
   289  			opts.Devices = append(opts.Devices, kubecontainer.DeviceInfo{
   290  				PathOnHost:      dev.HostPath,
   291  				PathInContainer: dev.ContainerPath,
   292  				Permissions:     dev.Permissions,
   293  			})
   294  		}
   295  
   296  		// Updates RunContainerOptions.Mounts.
   297  		for _, mount := range resp.Mounts {
   298  			if m, ok := mountsMap[mount.ContainerPath]; ok {
   299  				klog.V(4).InfoS("Skip existing mount", "containerPath", mount.ContainerPath, "hostPath", mount.HostPath)
   300  				if m != mount.HostPath {
   301  					klog.ErrorS(nil, "Container mount has conflicting mapping host mounts",
   302  						"containerPath", mount.ContainerPath, "conflictingPath", m, "hostPath", mount.HostPath)
   303  				}
   304  				continue
   305  			}
   306  			klog.V(4).InfoS("Add mount", "containerPath", mount.ContainerPath, "hostPath", mount.HostPath)
   307  			mountsMap[mount.ContainerPath] = mount.HostPath
   308  			opts.Mounts = append(opts.Mounts, kubecontainer.Mount{
   309  				Name:          mount.ContainerPath,
   310  				ContainerPath: mount.ContainerPath,
   311  				HostPath:      mount.HostPath,
   312  				ReadOnly:      mount.ReadOnly,
   313  				// TODO: This may need to be part of Device plugin API.
   314  				SELinuxRelabel: false,
   315  			})
   316  		}
   317  
   318  		// Updates for Annotations
   319  		for k, v := range resp.Annotations {
   320  			if e, ok := annotationsMap[k]; ok {
   321  				klog.V(4).InfoS("Skip existing annotation", "annotationKey", k, "annotationValue", v)
   322  				if e != v {
   323  					klog.ErrorS(nil, "Annotation has conflicting setting", "annotationKey", k, "expected", e, "got", v)
   324  				}
   325  				continue
   326  			}
   327  			klog.V(4).InfoS("Add annotation", "annotationKey", k, "annotationValue", v)
   328  			annotationsMap[k] = v
   329  			opts.Annotations = append(opts.Annotations, kubecontainer.Annotation{Name: k, Value: v})
   330  		}
   331  
   332  		if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DevicePluginCDIDevices) {
   333  			// Updates for CDI devices.
   334  			cdiDevices := getCDIDeviceInfo(resp, allCDIDevices)
   335  			opts.CDIDevices = append(opts.CDIDevices, cdiDevices...)
   336  		}
   337  	}
   338  
   339  	// Although the CDI devices are expected to be empty when this feature is disabled, we still
   340  	// guard this with a feature gate to avoid any potential issues.
   341  	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DevicePluginCDIDevices) {
   342  		// We construct a resource ID from the pod UID and container name.
   343  		// This ID has no semantic meaning, and is only used to ensure that the generated CDI annotation key is unique
   344  		// for a given container. Since this is only called once per pod-container combination, this should be the case.
   345  		resourceID := podUID + "-" + contName
   346  		cdiAnnotations := getCDIAnnotations(resourceID, allCDIDevices, annotationsMap)
   347  		opts.Annotations = append(opts.Annotations, cdiAnnotations...)
   348  	}
   349  
   350  	return opts
   351  }
   352  
   353  // getCDIAnnotations returns the cdi annotations for a given container.
   354  // This creates a CDI annotation with a key of the form: devicemanager_{{resourceID}}.
   355  // The value of the annotation is a comma separated list of sorted CDI device IDs.
   356  // If the annotation key is already defined in the provided annotations map, then the existing value is used.
   357  func getCDIAnnotations(resourceID string, cdiDevices sets.Set[string], annotationsMap map[string]string) []kubecontainer.Annotation {
   358  	// We sort the CDI devices to ensure that the annotation value is deterministic.
   359  	sortedCDIDevices := sets.List[string](cdiDevices)
   360  	annotations, err := cdi.GenerateAnnotations(types.UID(resourceID), "devicemanager", sortedCDIDevices)
   361  	if err != nil {
   362  		klog.ErrorS(err, "Failed to create CDI annotations")
   363  		return nil
   364  	}
   365  
   366  	var cdiAnnotations []kubecontainer.Annotation
   367  	for _, annotation := range annotations {
   368  		if e, ok := annotationsMap[annotation.Name]; ok {
   369  			klog.V(4).InfoS("Skip existing annotation", "annotationKey", annotation.Name, "annotationValue", annotation.Value)
   370  			if e != annotation.Value {
   371  				klog.ErrorS(nil, "Annotation has conflicting setting", "annotationKey", annotation.Name, "expected", e, "got", annotation.Value)
   372  			}
   373  			continue
   374  		}
   375  		klog.V(4).InfoS("Add annotation", "annotationKey", annotation.Name, "annotationValue", annotation.Value)
   376  		annotationsMap[annotation.Name] = annotation.Value
   377  		cdiAnnotations = append(cdiAnnotations, kubecontainer.Annotation{Name: annotation.Name, Value: annotation.Value})
   378  	}
   379  
   380  	return cdiAnnotations
   381  }
   382  
   383  // getCDIDeviceInfo returns CDI devices from an allocate response
   384  func getCDIDeviceInfo(resp *pluginapi.ContainerAllocateResponse, knownCDIDevices sets.Set[string]) []kubecontainer.CDIDevice {
   385  	var cdiDevices []kubecontainer.CDIDevice
   386  	for _, cdiDevice := range resp.CDIDevices {
   387  		if knownCDIDevices.Has(cdiDevice.Name) {
   388  			klog.V(4).InfoS("Skip existing CDI Device", "name", cdiDevice.Name)
   389  			continue
   390  		}
   391  		klog.V(4).InfoS("Add CDI device", "name", cdiDevice.Name)
   392  		knownCDIDevices.Insert(cdiDevice.Name)
   393  
   394  		device := kubecontainer.CDIDevice{
   395  			Name: cdiDevice.Name,
   396  		}
   397  		cdiDevices = append(cdiDevices, device)
   398  	}
   399  
   400  	return cdiDevices
   401  }
   402  
   403  // getContainerDevices returns the devices assigned to the provided container for all ResourceNames
   404  func (pdev *podDevices) getContainerDevices(podUID, contName string) ResourceDeviceInstances {
   405  	pdev.RLock()
   406  	defer pdev.RUnlock()
   407  
   408  	if _, podExists := pdev.devs[podUID]; !podExists {
   409  		return nil
   410  	}
   411  	if _, contExists := pdev.devs[podUID][contName]; !contExists {
   412  		return nil
   413  	}
   414  	resDev := NewResourceDeviceInstances()
   415  	for resource, allocateInfo := range pdev.devs[podUID][contName] {
   416  		if len(allocateInfo.deviceIds) == 0 {
   417  			continue
   418  		}
   419  		devicePluginMap := make(map[string]pluginapi.Device)
   420  		for numaid, devlist := range allocateInfo.deviceIds {
   421  			for _, devID := range devlist {
   422  				var topology *pluginapi.TopologyInfo
   423  				if numaid != nodeWithoutTopology {
   424  					NUMANodes := []*pluginapi.NUMANode{{ID: numaid}}
   425  					if pDev, ok := devicePluginMap[devID]; ok && pDev.Topology != nil {
   426  						if nodes := pDev.Topology.GetNodes(); nodes != nil {
   427  							NUMANodes = append(NUMANodes, nodes...)
   428  						}
   429  					}
   430  
   431  					// ID and Healthy are not relevant here.
   432  					topology = &pluginapi.TopologyInfo{Nodes: NUMANodes}
   433  				}
   434  				devicePluginMap[devID] = pluginapi.Device{
   435  					Topology: topology,
   436  				}
   437  			}
   438  		}
   439  		resDev[resource] = devicePluginMap
   440  	}
   441  	return resDev
   442  }
   443  
   444  // DeviceInstances is a mapping device name -> plugin device data
   445  type DeviceInstances map[string]pluginapi.Device
   446  
   447  // ResourceDeviceInstances is a mapping resource name -> DeviceInstances
   448  type ResourceDeviceInstances map[string]DeviceInstances
   449  
   450  // NewResourceDeviceInstances returns a new ResourceDeviceInstances
   451  func NewResourceDeviceInstances() ResourceDeviceInstances {
   452  	return make(ResourceDeviceInstances)
   453  }
   454  
   455  // Clone returns a clone of ResourceDeviceInstances
   456  func (rdev ResourceDeviceInstances) Clone() ResourceDeviceInstances {
   457  	clone := NewResourceDeviceInstances()
   458  	for resourceName, resourceDevs := range rdev {
   459  		clone[resourceName] = make(map[string]pluginapi.Device)
   460  		for devID, dev := range resourceDevs {
   461  			clone[resourceName][devID] = dev
   462  		}
   463  	}
   464  	return clone
   465  }
   466  
   467  // Filter takes a condition set expressed as map[string]sets.Set[string] and returns a new
   468  // ResourceDeviceInstances with only the devices matching the condition set.
   469  func (rdev ResourceDeviceInstances) Filter(cond map[string]sets.Set[string]) ResourceDeviceInstances {
   470  	filtered := NewResourceDeviceInstances()
   471  	for resourceName, filterIDs := range cond {
   472  		if _, exists := rdev[resourceName]; !exists {
   473  			continue
   474  		}
   475  		filtered[resourceName] = DeviceInstances{}
   476  		for instanceID, instance := range rdev[resourceName] {
   477  			if filterIDs.Has(instanceID) {
   478  				filtered[resourceName][instanceID] = instance
   479  			}
   480  		}
   481  	}
   482  	return filtered
   483  }
   484  

View as plain text