...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package state
    18  
    19  import (
    20  	"fmt"
    21  	"path/filepath"
    22  	"sync"
    23  
    24  	"k8s.io/klog/v2"
    25  	"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
    26  	"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
    27  	"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
    28  	"k8s.io/utils/cpuset"
    29  )
    30  
    31  var _ State = &stateCheckpoint{}
    32  
    33  type stateCheckpoint struct {
    34  	mux               sync.RWMutex
    35  	policyName        string
    36  	cache             State
    37  	checkpointManager checkpointmanager.CheckpointManager
    38  	checkpointName    string
    39  	initialContainers containermap.ContainerMap
    40  }
    41  
    42  // NewCheckpointState creates new State for keeping track of cpu/pod assignment with checkpoint backend
    43  func NewCheckpointState(stateDir, checkpointName, policyName string, initialContainers containermap.ContainerMap) (State, error) {
    44  	checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
    45  	if err != nil {
    46  		return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
    47  	}
    48  	stateCheckpoint := &stateCheckpoint{
    49  		cache:             NewMemoryState(),
    50  		policyName:        policyName,
    51  		checkpointManager: checkpointManager,
    52  		checkpointName:    checkpointName,
    53  		initialContainers: initialContainers,
    54  	}
    55  
    56  	if err := stateCheckpoint.restoreState(); err != nil {
    57  		//nolint:staticcheck // ST1005 user-facing error message
    58  		return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete the CPU manager checkpoint file %q before restarting Kubelet",
    59  			err, filepath.Join(stateDir, checkpointName))
    60  	}
    61  
    62  	return stateCheckpoint, nil
    63  }
    64  
    65  // migrateV1CheckpointToV2Checkpoint() converts checkpoints from the v1 format to the v2 format
    66  func (sc *stateCheckpoint) migrateV1CheckpointToV2Checkpoint(src *CPUManagerCheckpointV1, dst *CPUManagerCheckpointV2) error {
    67  	if src.PolicyName != "" {
    68  		dst.PolicyName = src.PolicyName
    69  	}
    70  	if src.DefaultCPUSet != "" {
    71  		dst.DefaultCPUSet = src.DefaultCPUSet
    72  	}
    73  	for containerID, cset := range src.Entries {
    74  		podUID, containerName, err := sc.initialContainers.GetContainerRef(containerID)
    75  		if err != nil {
    76  			return fmt.Errorf("containerID '%v' not found in initial containers list", containerID)
    77  		}
    78  		if dst.Entries == nil {
    79  			dst.Entries = make(map[string]map[string]string)
    80  		}
    81  		if _, exists := dst.Entries[podUID]; !exists {
    82  			dst.Entries[podUID] = make(map[string]string)
    83  		}
    84  		dst.Entries[podUID][containerName] = cset
    85  	}
    86  	return nil
    87  }
    88  
    89  // restores state from a checkpoint and creates it if it doesn't exist
    90  func (sc *stateCheckpoint) restoreState() error {
    91  	sc.mux.Lock()
    92  	defer sc.mux.Unlock()
    93  	var err error
    94  
    95  	checkpointV1 := newCPUManagerCheckpointV1()
    96  	checkpointV2 := newCPUManagerCheckpointV2()
    97  
    98  	if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV1); err != nil {
    99  		checkpointV1 = &CPUManagerCheckpointV1{} // reset it back to 0
   100  		if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV2); err != nil {
   101  			if err == errors.ErrCheckpointNotFound {
   102  				return sc.storeState()
   103  			}
   104  			return err
   105  		}
   106  	}
   107  
   108  	if err = sc.migrateV1CheckpointToV2Checkpoint(checkpointV1, checkpointV2); err != nil {
   109  		return fmt.Errorf("error migrating v1 checkpoint state to v2 checkpoint state: %s", err)
   110  	}
   111  
   112  	if sc.policyName != checkpointV2.PolicyName {
   113  		return fmt.Errorf("configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpointV2.PolicyName)
   114  	}
   115  
   116  	var tmpDefaultCPUSet cpuset.CPUSet
   117  	if tmpDefaultCPUSet, err = cpuset.Parse(checkpointV2.DefaultCPUSet); err != nil {
   118  		return fmt.Errorf("could not parse default cpu set %q: %v", checkpointV2.DefaultCPUSet, err)
   119  	}
   120  
   121  	var tmpContainerCPUSet cpuset.CPUSet
   122  	tmpAssignments := ContainerCPUAssignments{}
   123  	for pod := range checkpointV2.Entries {
   124  		tmpAssignments[pod] = make(map[string]cpuset.CPUSet, len(checkpointV2.Entries[pod]))
   125  		for container, cpuString := range checkpointV2.Entries[pod] {
   126  			if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil {
   127  				return fmt.Errorf("could not parse cpuset %q for container %q in pod %q: %v", cpuString, container, pod, err)
   128  			}
   129  			tmpAssignments[pod][container] = tmpContainerCPUSet
   130  		}
   131  	}
   132  
   133  	sc.cache.SetDefaultCPUSet(tmpDefaultCPUSet)
   134  	sc.cache.SetCPUAssignments(tmpAssignments)
   135  
   136  	klog.V(2).InfoS("State checkpoint: restored state from checkpoint")
   137  	klog.V(2).InfoS("State checkpoint: defaultCPUSet", "defaultCpuSet", tmpDefaultCPUSet.String())
   138  
   139  	return nil
   140  }
   141  
   142  // saves state to a checkpoint, caller is responsible for locking
   143  func (sc *stateCheckpoint) storeState() error {
   144  	checkpoint := NewCPUManagerCheckpoint()
   145  	checkpoint.PolicyName = sc.policyName
   146  	checkpoint.DefaultCPUSet = sc.cache.GetDefaultCPUSet().String()
   147  
   148  	assignments := sc.cache.GetCPUAssignments()
   149  	for pod := range assignments {
   150  		checkpoint.Entries[pod] = make(map[string]string, len(assignments[pod]))
   151  		for container, cset := range assignments[pod] {
   152  			checkpoint.Entries[pod][container] = cset.String()
   153  		}
   154  	}
   155  
   156  	err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
   157  	if err != nil {
   158  		klog.ErrorS(err, "Failed to save checkpoint")
   159  		return err
   160  	}
   161  	return nil
   162  }
   163  
   164  // GetCPUSet returns current CPU set
   165  func (sc *stateCheckpoint) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) {
   166  	sc.mux.RLock()
   167  	defer sc.mux.RUnlock()
   168  
   169  	res, ok := sc.cache.GetCPUSet(podUID, containerName)
   170  	return res, ok
   171  }
   172  
   173  // GetDefaultCPUSet returns default CPU set
   174  func (sc *stateCheckpoint) GetDefaultCPUSet() cpuset.CPUSet {
   175  	sc.mux.RLock()
   176  	defer sc.mux.RUnlock()
   177  
   178  	return sc.cache.GetDefaultCPUSet()
   179  }
   180  
   181  // GetCPUSetOrDefault returns current CPU set, or default one if it wasn't changed
   182  func (sc *stateCheckpoint) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet {
   183  	sc.mux.RLock()
   184  	defer sc.mux.RUnlock()
   185  
   186  	return sc.cache.GetCPUSetOrDefault(podUID, containerName)
   187  }
   188  
   189  // GetCPUAssignments returns current CPU to pod assignments
   190  func (sc *stateCheckpoint) GetCPUAssignments() ContainerCPUAssignments {
   191  	sc.mux.RLock()
   192  	defer sc.mux.RUnlock()
   193  
   194  	return sc.cache.GetCPUAssignments()
   195  }
   196  
   197  // SetCPUSet sets CPU set
   198  func (sc *stateCheckpoint) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) {
   199  	sc.mux.Lock()
   200  	defer sc.mux.Unlock()
   201  	sc.cache.SetCPUSet(podUID, containerName, cset)
   202  	err := sc.storeState()
   203  	if err != nil {
   204  		klog.InfoS("Store state to checkpoint error", "err", err)
   205  	}
   206  }
   207  
   208  // SetDefaultCPUSet sets default CPU set
   209  func (sc *stateCheckpoint) SetDefaultCPUSet(cset cpuset.CPUSet) {
   210  	sc.mux.Lock()
   211  	defer sc.mux.Unlock()
   212  	sc.cache.SetDefaultCPUSet(cset)
   213  	err := sc.storeState()
   214  	if err != nil {
   215  		klog.InfoS("Store state to checkpoint error", "err", err)
   216  	}
   217  }
   218  
   219  // SetCPUAssignments sets CPU to pod assignments
   220  func (sc *stateCheckpoint) SetCPUAssignments(a ContainerCPUAssignments) {
   221  	sc.mux.Lock()
   222  	defer sc.mux.Unlock()
   223  	sc.cache.SetCPUAssignments(a)
   224  	err := sc.storeState()
   225  	if err != nil {
   226  		klog.InfoS("Store state to checkpoint error", "err", err)
   227  	}
   228  }
   229  
   230  // Delete deletes assignment for specified pod
   231  func (sc *stateCheckpoint) Delete(podUID string, containerName string) {
   232  	sc.mux.Lock()
   233  	defer sc.mux.Unlock()
   234  	sc.cache.Delete(podUID, containerName)
   235  	err := sc.storeState()
   236  	if err != nil {
   237  		klog.InfoS("Store state to checkpoint error", "err", err)
   238  	}
   239  }
   240  
   241  // ClearState clears the state and saves it in a checkpoint
   242  func (sc *stateCheckpoint) ClearState() {
   243  	sc.mux.Lock()
   244  	defer sc.mux.Unlock()
   245  	sc.cache.ClearState()
   246  	err := sc.storeState()
   247  	if err != nil {
   248  		klog.InfoS("Store state to checkpoint error", "err", err)
   249  	}
   250  }
   251  

View as plain text