     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package cpumanager
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math"
    23  	"sync"
    24  	"time"
    26  	cadvisorapi "github.com/google/cadvisor/info/v1"
    27  	v1 "k8s.io/api/core/v1"
    28  	"k8s.io/apimachinery/pkg/util/wait"
    29  	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
    30  	"k8s.io/klog/v2"
    32  	"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
    33  	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
    34  	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
    35  	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
    36  	"k8s.io/kubernetes/pkg/kubelet/config"
    37  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    38  	"k8s.io/kubernetes/pkg/kubelet/status"
    39  	"k8s.io/utils/cpuset"
    40  )
    42  // ActivePodsFunc is a function that returns a list of pods to reconcile.
    43  type ActivePodsFunc func() []*v1.Pod
    45  type runtimeService interface {
    46  	UpdateContainerResources(ctx context.Context, id string, resources *runtimeapi.ContainerResources) error
    47  }
    49  type policyName string
    51  // cpuManagerStateFileName is the file name where cpu manager stores its state
    52  const cpuManagerStateFileName = "cpu_manager_state"
    54  // Manager interface provides methods for Kubelet to manage pod cpus.
    55  type Manager interface {
    56  	// Start is called during Kubelet initialization.
    57  	Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error
    59  	// Called to trigger the allocation of CPUs to a container. This must be
    60  	// called at some point prior to the AddContainer() call for a container,
    61  	// e.g. at pod admission time.
    62  	Allocate(pod *v1.Pod, container *v1.Container) error
    64  	// AddContainer adds the mapping between container ID to pod UID and the container name
    65  	// The mapping used to remove the CPU allocation during the container removal
    66  	AddContainer(p *v1.Pod, c *v1.Container, containerID string)
    68  	// RemoveContainer is called after Kubelet decides to kill or delete a
    69  	// container. After this call, the CPU manager stops trying to reconcile
    70  	// that container and any CPUs dedicated to the container are freed.
    71  	RemoveContainer(containerID string) error
    73  	// State returns a read-only interface to the internal CPU manager state.
    74  	State() state.Reader
    76  	// GetTopologyHints implements the topologymanager.HintProvider Interface
    77  	// and is consulted to achieve NUMA aware resource alignment among this
    78  	// and other resource controllers.
    79  	GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
    81  	// GetExclusiveCPUs implements the podresources.CPUsProvider interface to provide
    82  	// exclusively allocated cpus for the container
    83  	GetExclusiveCPUs(podUID, containerName string) cpuset.CPUSet
    85  	// GetPodTopologyHints implements the topologymanager.HintProvider Interface
    86  	// and is consulted to achieve NUMA aware resource alignment per Pod
    87  	// among this and other resource controllers.
    88  	GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint
    90  	// GetAllocatableCPUs returns the total set of CPUs available for allocation.
    91  	GetAllocatableCPUs() cpuset.CPUSet
    93  	// GetCPUAffinity returns cpuset which includes cpus from shared pools
    94  	// as well as exclusively allocated cpus
    95  	GetCPUAffinity(podUID, containerName string) cpuset.CPUSet
    96  }
    98  type manager struct {
    99  	sync.Mutex
   100  	policy Policy
   102  	// reconcilePeriod is the duration between calls to reconcileState.
   103  	reconcilePeriod time.Duration
   105  	// state allows pluggable CPU assignment policies while sharing a common
   106  	// representation of state for the system to inspect and reconcile.
   107  	state state.State
   109  	// lastUpdatedstate holds state for each container from the last time it was updated.
   110  	lastUpdateState state.State
   112  	// containerRuntime is the container runtime service interface needed
   113  	// to make UpdateContainerResources() calls against the containers.
   114  	containerRuntime runtimeService
   116  	// activePods is a method for listing active pods on the node
   117  	// so all the containers can be updated in the reconciliation loop.
   118  	activePods ActivePodsFunc
   120  	// podStatusProvider provides a method for obtaining pod statuses
   121  	// and the containerID of their containers
   122  	podStatusProvider status.PodStatusProvider
   124  	// containerMap provides a mapping from (pod, container) -> containerID
   125  	// for all containers a pod
   126  	containerMap containermap.ContainerMap
   128  	topology *topology.CPUTopology
   130  	nodeAllocatableReservation v1.ResourceList
   132  	// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
   133  	// We use it to determine when we can purge inactive pods from checkpointed state.
   134  	sourcesReady config.SourcesReady
   136  	// stateFileDirectory holds the directory where the state file for checkpoints is held.
   137  	stateFileDirectory string
   139  	// allocatableCPUs is the set of online CPUs as reported by the system
   140  	allocatableCPUs cpuset.CPUSet
   142  	// pendingAdmissionPod contain the pod during the admission phase
   143  	pendingAdmissionPod *v1.Pod
   144  }
   146  var _ Manager = &manager{}
   148  type sourcesReadyStub struct{}
   150  func (s *sourcesReadyStub) AddSource(source string) {}
   151  func (s *sourcesReadyStub) AllReady() bool          { return true }
   153  // NewManager creates new cpu manager based on provided policy
   154  func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
   155  	var topo *topology.CPUTopology
   156  	var policy Policy
   157  	var err error
   159  	switch policyName(cpuPolicyName) {
   161  	case PolicyNone:
   162  		policy, err = NewNonePolicy(cpuPolicyOptions)
   163  		if err != nil {
   164  			return nil, fmt.Errorf("new none policy error: %w", err)
   165  		}
   167  	case PolicyStatic:
   168  		topo, err = topology.Discover(machineInfo)
   169  		if err != nil {
   170  			return nil, err
   171  		}
   172  		klog.InfoS("Detected CPU topology", "topology", topo)
   174  		reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
   175  		if !ok {
   176  			// The static policy cannot initialize without this information.
   177  			return nil, fmt.Errorf("[cpumanager] unable to determine reserved CPU resources for static policy")
   178  		}
   179  		if reservedCPUs.IsZero() {
   180  			// The static policy requires this to be nonzero. Zero CPU reservation
   181  			// would allow the shared pool to be completely exhausted. At that point
   182  			// either we would violate our guarantee of exclusivity or need to evict
   183  			// any pod that has at least one container that requires zero CPUs.
   184  			// See the comments in policy_static.go for more details.
   185  			return nil, fmt.Errorf("[cpumanager] the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero")
   186  		}
   188  		// Take the ceiling of the reservation, since fractional CPUs cannot be
   189  		// exclusively allocated.
   190  		reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
   191  		numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
   192  		policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions)
   193  		if err != nil {
   194  			return nil, fmt.Errorf("new static policy error: %w", err)
   195  		}
   197  	default:
   198  		return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)
   199  	}
   201  	manager := &manager{
   202  		policy:                     policy,
   203  		reconcilePeriod:            reconcilePeriod,
   204  		lastUpdateState:            state.NewMemoryState(),
   205  		topology:                   topo,
   206  		nodeAllocatableReservation: nodeAllocatableReservation,
   207  		stateFileDirectory:         stateFileDirectory,
   208  	}
   209  	manager.sourcesReady = &sourcesReadyStub{}
   210  	return manager, nil
   211  }
   213  func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
   214  	klog.InfoS("Starting CPU manager", "policy", m.policy.Name())
   215  	klog.InfoS("Reconciling", "reconcilePeriod", m.reconcilePeriod)
   216  	m.sourcesReady = sourcesReady
   217  	m.activePods = activePods
   218  	m.podStatusProvider = podStatusProvider
   219  	m.containerRuntime = containerRuntime
   220  	m.containerMap = initialContainers
   222  	stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
   223  	if err != nil {
   224  		klog.ErrorS(err, "Could not initialize checkpoint manager, please drain node and remove policy state file")
   225  		return err
   226  	}
   227  	m.state = stateImpl
   229  	err = m.policy.Start(m.state)
   230  	if err != nil {
   231  		klog.ErrorS(err, "Policy start error")
   232  		return err
   233  	}
   235  	m.allocatableCPUs = m.policy.GetAllocatableCPUs(m.state)
   237  	if m.policy.Name() == string(PolicyNone) {
   238  		return nil
   239  	}
   240  	// Periodically call m.reconcileState() to continue to keep the CPU sets of
   241  	// all pods in sync with and guaranteed CPUs handed out among them.
   242  	go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop)
   243  	return nil
   244  }
   246  func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
   247  	// The pod is during the admission phase. We need to save the pod to avoid it
   248  	// being cleaned before the admission ended
   249  	m.setPodPendingAdmission(p)
   251  	// Garbage collect any stranded resources before allocating CPUs.
   252  	m.removeStaleState()
   254  	m.Lock()
   255  	defer m.Unlock()
   257  	// Call down into the policy to assign this container CPUs if required.
   258  	err := m.policy.Allocate(m.state, p, c)
   259  	if err != nil {
   260  		klog.ErrorS(err, "Allocate error")
   261  		return err
   262  	}
   264  	return nil
   265  }
   267  func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
   268  	m.Lock()
   269  	defer m.Unlock()
   270  	if cset, exists := m.state.GetCPUSet(string(pod.UID), container.Name); exists {
   271  		m.lastUpdateState.SetCPUSet(string(pod.UID), container.Name, cset)
   272  	}
   273  	m.containerMap.Add(string(pod.UID), container.Name, containerID)
   274  }
   276  func (m *manager) RemoveContainer(containerID string) error {
   277  	m.Lock()
   278  	defer m.Unlock()
   280  	err := m.policyRemoveContainerByID(containerID)
   281  	if err != nil {
   282  		klog.ErrorS(err, "RemoveContainer error")
   283  		return err
   284  	}
   286  	return nil
   287  }
   289  func (m *manager) policyRemoveContainerByID(containerID string) error {
   290  	podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
   291  	if err != nil {
   292  		return nil
   293  	}
   295  	err = m.policy.RemoveContainer(m.state, podUID, containerName)
   296  	if err == nil {
   297  		m.lastUpdateState.Delete(podUID, containerName)
   298  		m.containerMap.RemoveByContainerID(containerID)
   299  	}
   301  	return err
   302  }
   304  func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) error {
   305  	err := m.policy.RemoveContainer(m.state, podUID, containerName)
   306  	if err == nil {
   307  		m.lastUpdateState.Delete(podUID, containerName)
   308  		m.containerMap.RemoveByContainerRef(podUID, containerName)
   309  	}
   311  	return err
   312  }
   314  func (m *manager) State() state.Reader {
   315  	return m.state
   316  }
   318  func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
   319  	// The pod is during the admission phase. We need to save the pod to avoid it
   320  	// being cleaned before the admission ended
   321  	m.setPodPendingAdmission(pod)
   322  	// Garbage collect any stranded resources before providing TopologyHints
   323  	m.removeStaleState()
   324  	// Delegate to active policy
   325  	return m.policy.GetTopologyHints(m.state, pod, container)
   326  }
   328  func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
   329  	// The pod is during the admission phase. We need to save the pod to avoid it
   330  	// being cleaned before the admission ended
   331  	m.setPodPendingAdmission(pod)
   332  	// Garbage collect any stranded resources before providing TopologyHints
   333  	m.removeStaleState()
   334  	// Delegate to active policy
   335  	return m.policy.GetPodTopologyHints(m.state, pod)
   336  }
   338  func (m *manager) GetAllocatableCPUs() cpuset.CPUSet {
   339  	return m.allocatableCPUs.Clone()
   340  }
   342  type reconciledContainer struct {
   343  	podName       string
   344  	containerName string
   345  	containerID   string
   346  }
   348  func (m *manager) removeStaleState() {
   349  	// Only once all sources are ready do we attempt to remove any stale state.
   350  	// This ensures that the call to `m.activePods()` below will succeed with
   351  	// the actual active pods list.
   352  	if !m.sourcesReady.AllReady() {
   353  		return
   354  	}
   356  	// We grab the lock to ensure that no new containers will grab CPUs while
   357  	// executing the code below. Without this lock, its possible that we end up
   358  	// removing state that is newly added by an asynchronous call to
   359  	// AddContainer() during the execution of this code.
   360  	m.Lock()
   361  	defer m.Unlock()
   363  	// Get the list of active pods.
   364  	activeAndAdmittedPods := m.activePods()
   365  	if m.pendingAdmissionPod != nil {
   366  		activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
   367  	}
   369  	// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
   370  	activeContainers := make(map[string]map[string]struct{})
   371  	for _, pod := range activeAndAdmittedPods {
   372  		activeContainers[string(pod.UID)] = make(map[string]struct{})
   373  		for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
   374  			activeContainers[string(pod.UID)][container.Name] = struct{}{}
   375  		}
   376  	}
   378  	// Loop through the CPUManager state. Remove any state for containers not
   379  	// in the `activeContainers` list built above.
   380  	assignments := m.state.GetCPUAssignments()
   381  	for podUID := range assignments {
   382  		for containerName := range assignments[podUID] {
   383  			if _, ok := activeContainers[podUID][containerName]; !ok {
   384  				klog.ErrorS(nil, "RemoveStaleState: removing container", "podUID", podUID, "containerName", containerName)
   385  				err := m.policyRemoveContainerByRef(podUID, containerName)
   386  				if err != nil {
   387  					klog.ErrorS(err, "RemoveStaleState: failed to remove container", "podUID", podUID, "containerName", containerName)
   388  				}
   389  			}
   390  		}
   391  	}
   393  	m.containerMap.Visit(func(podUID, containerName, containerID string) {
   394  		if _, ok := activeContainers[podUID][containerName]; !ok {
   395  			klog.ErrorS(nil, "RemoveStaleState: removing container", "podUID", podUID, "containerName", containerName)
   396  			err := m.policyRemoveContainerByRef(podUID, containerName)
   397  			if err != nil {
   398  				klog.ErrorS(err, "RemoveStaleState: failed to remove container", "podUID", podUID, "containerName", containerName)
   399  			}
   400  		}
   401  	})
   402  }
   404  func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) {
   405  	ctx := context.Background()
   406  	success = []reconciledContainer{}
   407  	failure = []reconciledContainer{}
   409  	m.removeStaleState()
   410  	for _, pod := range m.activePods() {
   411  		pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
   412  		if !ok {
   413  			klog.V(4).InfoS("ReconcileState: skipping pod; status not found", "pod", klog.KObj(pod))
   414  			failure = append(failure, reconciledContainer{pod.Name, "", ""})
   415  			continue
   416  		}
   418  		allContainers := pod.Spec.InitContainers
   419  		allContainers = append(allContainers, pod.Spec.Containers...)
   420  		for _, container := range allContainers {
   421  			containerID, err := findContainerIDByName(&pstatus, container.Name)
   422  			if err != nil {
   423  				klog.V(4).InfoS("ReconcileState: skipping container; ID not found in pod status", "pod", klog.KObj(pod), "containerName", container.Name, "err", err)
   424  				failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
   425  				continue
   426  			}
   428  			cstatus, err := findContainerStatusByName(&pstatus, container.Name)
   429  			if err != nil {
   430  				klog.V(4).InfoS("ReconcileState: skipping container; container status not found in pod status", "pod", klog.KObj(pod), "containerName", container.Name, "err", err)
   431  				failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
   432  				continue
   433  			}
   435  			if cstatus.State.Waiting != nil ||
   436  				(cstatus.State.Waiting == nil && cstatus.State.Running == nil && cstatus.State.Terminated == nil) {
   437  				klog.V(4).InfoS("ReconcileState: skipping container; container still in the waiting state", "pod", klog.KObj(pod), "containerName", container.Name, "err", err)
   438  				failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
   439  				continue
   440  			}
   442  			m.Lock()
   443  			if cstatus.State.Terminated != nil {
   444  				// The container is terminated but we can't call m.RemoveContainer()
   445  				// here because it could remove the allocated cpuset for the container
   446  				// which may be in the process of being restarted.  That would result
   447  				// in the container losing any exclusively-allocated CPUs that it
   448  				// was allocated.
   449  				_, _, err := m.containerMap.GetContainerRef(containerID)
   450  				if err == nil {
   451  					klog.V(4).InfoS("ReconcileState: ignoring terminated container", "pod", klog.KObj(pod), "containerID", containerID)
   452  				}
   453  				m.Unlock()
   454  				continue
   455  			}
   457  			// Once we make it here we know we have a running container.
   458  			// Idempotently add it to the containerMap incase it is missing.
   459  			// This can happen after a kubelet restart, for example.
   460  			m.containerMap.Add(string(pod.UID), container.Name, containerID)
   461  			m.Unlock()
   463  			cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name)
   464  			if cset.IsEmpty() {
   465  				// NOTE: This should not happen outside of tests.
   466  				klog.V(4).InfoS("ReconcileState: skipping container; assigned cpuset is empty", "pod", klog.KObj(pod), "containerName", container.Name)
   467  				failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
   468  				continue
   469  			}
   471  			lcset := m.lastUpdateState.GetCPUSetOrDefault(string(pod.UID), container.Name)
   472  			if !cset.Equals(lcset) {
   473  				klog.V(4).InfoS("ReconcileState: updating container", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID, "cpuSet", cset)
   474  				err = m.updateContainerCPUSet(ctx, containerID, cset)
   475  				if err != nil {
   476  					klog.ErrorS(err, "ReconcileState: failed to update container", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID, "cpuSet", cset)
   477  					failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
   478  					continue
   479  				}
   480  				m.lastUpdateState.SetCPUSet(string(pod.UID), container.Name, cset)
   481  			}
   482  			success = append(success, reconciledContainer{pod.Name, container.Name, containerID})
   483  		}
   484  	}
   485  	return success, failure
   486  }
   488  func findContainerIDByName(status *v1.PodStatus, name string) (string, error) {
   489  	allStatuses := status.InitContainerStatuses
   490  	allStatuses = append(allStatuses, status.ContainerStatuses...)
   491  	for _, container := range allStatuses {
   492  		if container.Name == name && container.ContainerID != "" {
   493  			cid := &kubecontainer.ContainerID{}
   494  			err := cid.ParseString(container.ContainerID)
   495  			if err != nil {
   496  				return "", err
   497  			}
   498  			return cid.ID, nil
   499  		}
   500  	}
   501  	return "", fmt.Errorf("unable to find ID for container with name %v in pod status (it may not be running)", name)
   502  }
   504  func findContainerStatusByName(status *v1.PodStatus, name string) (*v1.ContainerStatus, error) {
   505  	for _, containerStatus := range append(status.InitContainerStatuses, status.ContainerStatuses...) {
   506  		if containerStatus.Name == name {
   507  			return &containerStatus, nil
   508  		}
   509  	}
   510  	return nil, fmt.Errorf("unable to find status for container with name %v in pod status (it may not be running)", name)
   511  }
   513  func (m *manager) updateContainerCPUSet(ctx context.Context, containerID string, cpus cpuset.CPUSet) error {
   514  	// TODO: Consider adding a `ResourceConfigForContainer` helper in
   515  	// helpers_linux.go similar to what exists for pods.
   516  	// It would be better to pass the full container resources here instead of
   517  	// this patch-like partial resources.
   518  	return m.containerRuntime.UpdateContainerResources(
   519  		ctx,
   520  		containerID,
   521  		&runtimeapi.ContainerResources{
   522  			Linux: &runtimeapi.LinuxContainerResources{
   523  				CpusetCpus: cpus.String(),
   524  			},
   525  		})
   526  }
   528  func (m *manager) GetExclusiveCPUs(podUID, containerName string) cpuset.CPUSet {
   529  	if result, ok := m.state.GetCPUSet(podUID, containerName); ok {
   530  		return result
   531  	}
   533  	return cpuset.CPUSet{}
   534  }
   536  func (m *manager) GetCPUAffinity(podUID, containerName string) cpuset.CPUSet {
   537  	return m.state.GetCPUSetOrDefault(podUID, containerName)
   538  }
   540  func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
   541  	m.Lock()
   542  	defer m.Unlock()
   544  	m.pendingAdmissionPod = pod
   545  }

