...
1
16
17 package state
18
19 import (
20 "fmt"
21 "path/filepath"
22 "sync"
23
24 "k8s.io/klog/v2"
25 "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
26 "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
27 )
28
29 var _ State = &stateCheckpoint{}
30
31 type stateCheckpoint struct {
32 sync.RWMutex
33 cache State
34 policyName string
35 checkpointManager checkpointmanager.CheckpointManager
36 checkpointName string
37 }
38
39
40 func NewCheckpointState(stateDir, checkpointName, policyName string) (State, error) {
41 checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
42 if err != nil {
43 return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
44 }
45 stateCheckpoint := &stateCheckpoint{
46 cache: NewMemoryState(),
47 policyName: policyName,
48 checkpointManager: checkpointManager,
49 checkpointName: checkpointName,
50 }
51
52 if err := stateCheckpoint.restoreState(); err != nil {
53
54 return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete the memory manager checkpoint file %q before restarting Kubelet",
55 err, filepath.Join(stateDir, checkpointName))
56 }
57
58 return stateCheckpoint, nil
59 }
60
61
62 func (sc *stateCheckpoint) restoreState() error {
63 sc.Lock()
64 defer sc.Unlock()
65 var err error
66
67 checkpoint := NewMemoryManagerCheckpoint()
68 if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil {
69 if err == errors.ErrCheckpointNotFound {
70 return sc.storeState()
71 }
72 return err
73 }
74
75 if sc.policyName != checkpoint.PolicyName {
76 return fmt.Errorf("[memorymanager] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName)
77 }
78
79 sc.cache.SetMachineState(checkpoint.MachineState)
80 sc.cache.SetMemoryAssignments(checkpoint.Entries)
81
82 klog.V(2).InfoS("State checkpoint: restored state from checkpoint")
83
84 return nil
85 }
86
87
88 func (sc *stateCheckpoint) storeState() error {
89 checkpoint := NewMemoryManagerCheckpoint()
90 checkpoint.PolicyName = sc.policyName
91 checkpoint.MachineState = sc.cache.GetMachineState()
92 checkpoint.Entries = sc.cache.GetMemoryAssignments()
93
94 err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
95 if err != nil {
96 klog.ErrorS(err, "Could not save checkpoint")
97 return err
98 }
99 return nil
100 }
101
102
103 func (sc *stateCheckpoint) GetMachineState() NUMANodeMap {
104 sc.RLock()
105 defer sc.RUnlock()
106
107 return sc.cache.GetMachineState()
108 }
109
110
111 func (sc *stateCheckpoint) GetMemoryBlocks(podUID string, containerName string) []Block {
112 sc.RLock()
113 defer sc.RUnlock()
114
115 return sc.cache.GetMemoryBlocks(podUID, containerName)
116 }
117
118
119 func (sc *stateCheckpoint) GetMemoryAssignments() ContainerMemoryAssignments {
120 sc.RLock()
121 defer sc.RUnlock()
122
123 return sc.cache.GetMemoryAssignments()
124 }
125
126
127 func (sc *stateCheckpoint) SetMachineState(memoryMap NUMANodeMap) {
128 sc.Lock()
129 defer sc.Unlock()
130
131 sc.cache.SetMachineState(memoryMap)
132 err := sc.storeState()
133 if err != nil {
134 klog.InfoS("Store state to checkpoint error", "err", err)
135 }
136 }
137
138
139 func (sc *stateCheckpoint) SetMemoryBlocks(podUID string, containerName string, blocks []Block) {
140 sc.Lock()
141 defer sc.Unlock()
142
143 sc.cache.SetMemoryBlocks(podUID, containerName, blocks)
144 err := sc.storeState()
145 if err != nil {
146 klog.InfoS("Store state to checkpoint error", "err", err)
147 }
148 }
149
150
151 func (sc *stateCheckpoint) SetMemoryAssignments(assignments ContainerMemoryAssignments) {
152 sc.Lock()
153 defer sc.Unlock()
154
155 sc.cache.SetMemoryAssignments(assignments)
156 err := sc.storeState()
157 if err != nil {
158 klog.InfoS("Store state to checkpoint error", "err", err)
159 }
160 }
161
162
163 func (sc *stateCheckpoint) Delete(podUID string, containerName string) {
164 sc.Lock()
165 defer sc.Unlock()
166
167 sc.cache.Delete(podUID, containerName)
168 err := sc.storeState()
169 if err != nil {
170 klog.InfoS("Store state to checkpoint error", "err", err)
171 }
172 }
173
174
175 func (sc *stateCheckpoint) ClearState() {
176 sc.Lock()
177 defer sc.Unlock()
178
179 sc.cache.ClearState()
180 err := sc.storeState()
181 if err != nil {
182 klog.InfoS("Store state to checkpoint error", "err", err)
183 }
184 }
185
View as plain text