...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state/state_checkpoint.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package state
    18  
    19  import (
    20  	"fmt"
    21  	"path/filepath"
    22  	"sync"
    23  
    24  	"k8s.io/klog/v2"
    25  	"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
    26  	"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
    27  )
    28  
    29  var _ State = &stateCheckpoint{}
    30  
    31  type stateCheckpoint struct {
    32  	sync.RWMutex
    33  	cache             State
    34  	policyName        string
    35  	checkpointManager checkpointmanager.CheckpointManager
    36  	checkpointName    string
    37  }
    38  
    39  // NewCheckpointState creates new State for keeping track of memory/pod assignment with checkpoint backend
    40  func NewCheckpointState(stateDir, checkpointName, policyName string) (State, error) {
    41  	checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
    42  	if err != nil {
    43  		return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
    44  	}
    45  	stateCheckpoint := &stateCheckpoint{
    46  		cache:             NewMemoryState(),
    47  		policyName:        policyName,
    48  		checkpointManager: checkpointManager,
    49  		checkpointName:    checkpointName,
    50  	}
    51  
    52  	if err := stateCheckpoint.restoreState(); err != nil {
    53  		//nolint:staticcheck // ST1005 user-facing error message
    54  		return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete the memory manager checkpoint file %q before restarting Kubelet",
    55  			err, filepath.Join(stateDir, checkpointName))
    56  	}
    57  
    58  	return stateCheckpoint, nil
    59  }
    60  
    61  // restores state from a checkpoint and creates it if it doesn't exist
    62  func (sc *stateCheckpoint) restoreState() error {
    63  	sc.Lock()
    64  	defer sc.Unlock()
    65  	var err error
    66  
    67  	checkpoint := NewMemoryManagerCheckpoint()
    68  	if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil {
    69  		if err == errors.ErrCheckpointNotFound {
    70  			return sc.storeState()
    71  		}
    72  		return err
    73  	}
    74  
    75  	if sc.policyName != checkpoint.PolicyName {
    76  		return fmt.Errorf("[memorymanager] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName)
    77  	}
    78  
    79  	sc.cache.SetMachineState(checkpoint.MachineState)
    80  	sc.cache.SetMemoryAssignments(checkpoint.Entries)
    81  
    82  	klog.V(2).InfoS("State checkpoint: restored state from checkpoint")
    83  
    84  	return nil
    85  }
    86  
    87  // saves state to a checkpoint, caller is responsible for locking
    88  func (sc *stateCheckpoint) storeState() error {
    89  	checkpoint := NewMemoryManagerCheckpoint()
    90  	checkpoint.PolicyName = sc.policyName
    91  	checkpoint.MachineState = sc.cache.GetMachineState()
    92  	checkpoint.Entries = sc.cache.GetMemoryAssignments()
    93  
    94  	err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
    95  	if err != nil {
    96  		klog.ErrorS(err, "Could not save checkpoint")
    97  		return err
    98  	}
    99  	return nil
   100  }
   101  
   102  // GetMemoryState returns Memory Map stored in the State
   103  func (sc *stateCheckpoint) GetMachineState() NUMANodeMap {
   104  	sc.RLock()
   105  	defer sc.RUnlock()
   106  
   107  	return sc.cache.GetMachineState()
   108  }
   109  
   110  // GetMemoryBlocks returns memory assignments of a container
   111  func (sc *stateCheckpoint) GetMemoryBlocks(podUID string, containerName string) []Block {
   112  	sc.RLock()
   113  	defer sc.RUnlock()
   114  
   115  	return sc.cache.GetMemoryBlocks(podUID, containerName)
   116  }
   117  
   118  // GetMemoryAssignments returns ContainerMemoryAssignments
   119  func (sc *stateCheckpoint) GetMemoryAssignments() ContainerMemoryAssignments {
   120  	sc.RLock()
   121  	defer sc.RUnlock()
   122  
   123  	return sc.cache.GetMemoryAssignments()
   124  }
   125  
   126  // SetMachineState stores NUMANodeMap in State
   127  func (sc *stateCheckpoint) SetMachineState(memoryMap NUMANodeMap) {
   128  	sc.Lock()
   129  	defer sc.Unlock()
   130  
   131  	sc.cache.SetMachineState(memoryMap)
   132  	err := sc.storeState()
   133  	if err != nil {
   134  		klog.InfoS("Store state to checkpoint error", "err", err)
   135  	}
   136  }
   137  
   138  // SetMemoryBlocks stores memory assignments of container
   139  func (sc *stateCheckpoint) SetMemoryBlocks(podUID string, containerName string, blocks []Block) {
   140  	sc.Lock()
   141  	defer sc.Unlock()
   142  
   143  	sc.cache.SetMemoryBlocks(podUID, containerName, blocks)
   144  	err := sc.storeState()
   145  	if err != nil {
   146  		klog.InfoS("Store state to checkpoint error", "err", err)
   147  	}
   148  }
   149  
   150  // SetMemoryAssignments sets ContainerMemoryAssignments by using the passed parameter
   151  func (sc *stateCheckpoint) SetMemoryAssignments(assignments ContainerMemoryAssignments) {
   152  	sc.Lock()
   153  	defer sc.Unlock()
   154  
   155  	sc.cache.SetMemoryAssignments(assignments)
   156  	err := sc.storeState()
   157  	if err != nil {
   158  		klog.InfoS("Store state to checkpoint error", "err", err)
   159  	}
   160  }
   161  
   162  // Delete deletes corresponding Blocks from ContainerMemoryAssignments
   163  func (sc *stateCheckpoint) Delete(podUID string, containerName string) {
   164  	sc.Lock()
   165  	defer sc.Unlock()
   166  
   167  	sc.cache.Delete(podUID, containerName)
   168  	err := sc.storeState()
   169  	if err != nil {
   170  		klog.InfoS("Store state to checkpoint error", "err", err)
   171  	}
   172  }
   173  
   174  // ClearState clears machineState and ContainerMemoryAssignments
   175  func (sc *stateCheckpoint) ClearState() {
   176  	sc.Lock()
   177  	defer sc.Unlock()
   178  
   179  	sc.cache.ClearState()
   180  	err := sc.storeState()
   181  	if err != nil {
   182  		klog.InfoS("Store state to checkpoint error", "err", err)
   183  	}
   184  }
   185  

View as plain text