1
16
17 package state
18
19 import (
20 "fmt"
21 "path/filepath"
22 "sync"
23
24 "k8s.io/klog/v2"
25 "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
26 "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
27 "k8s.io/kubernetes/pkg/kubelet/cm/containermap"
28 "k8s.io/utils/cpuset"
29 )
30
31 var _ State = &stateCheckpoint{}
32
33 type stateCheckpoint struct {
34 mux sync.RWMutex
35 policyName string
36 cache State
37 checkpointManager checkpointmanager.CheckpointManager
38 checkpointName string
39 initialContainers containermap.ContainerMap
40 }
41
42
43 func NewCheckpointState(stateDir, checkpointName, policyName string, initialContainers containermap.ContainerMap) (State, error) {
44 checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
45 if err != nil {
46 return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
47 }
48 stateCheckpoint := &stateCheckpoint{
49 cache: NewMemoryState(),
50 policyName: policyName,
51 checkpointManager: checkpointManager,
52 checkpointName: checkpointName,
53 initialContainers: initialContainers,
54 }
55
56 if err := stateCheckpoint.restoreState(); err != nil {
57
58 return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete the CPU manager checkpoint file %q before restarting Kubelet",
59 err, filepath.Join(stateDir, checkpointName))
60 }
61
62 return stateCheckpoint, nil
63 }
64
65
66 func (sc *stateCheckpoint) migrateV1CheckpointToV2Checkpoint(src *CPUManagerCheckpointV1, dst *CPUManagerCheckpointV2) error {
67 if src.PolicyName != "" {
68 dst.PolicyName = src.PolicyName
69 }
70 if src.DefaultCPUSet != "" {
71 dst.DefaultCPUSet = src.DefaultCPUSet
72 }
73 for containerID, cset := range src.Entries {
74 podUID, containerName, err := sc.initialContainers.GetContainerRef(containerID)
75 if err != nil {
76 return fmt.Errorf("containerID '%v' not found in initial containers list", containerID)
77 }
78 if dst.Entries == nil {
79 dst.Entries = make(map[string]map[string]string)
80 }
81 if _, exists := dst.Entries[podUID]; !exists {
82 dst.Entries[podUID] = make(map[string]string)
83 }
84 dst.Entries[podUID][containerName] = cset
85 }
86 return nil
87 }
88
89
90 func (sc *stateCheckpoint) restoreState() error {
91 sc.mux.Lock()
92 defer sc.mux.Unlock()
93 var err error
94
95 checkpointV1 := newCPUManagerCheckpointV1()
96 checkpointV2 := newCPUManagerCheckpointV2()
97
98 if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV1); err != nil {
99 checkpointV1 = &CPUManagerCheckpointV1{}
100 if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV2); err != nil {
101 if err == errors.ErrCheckpointNotFound {
102 return sc.storeState()
103 }
104 return err
105 }
106 }
107
108 if err = sc.migrateV1CheckpointToV2Checkpoint(checkpointV1, checkpointV2); err != nil {
109 return fmt.Errorf("error migrating v1 checkpoint state to v2 checkpoint state: %s", err)
110 }
111
112 if sc.policyName != checkpointV2.PolicyName {
113 return fmt.Errorf("configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpointV2.PolicyName)
114 }
115
116 var tmpDefaultCPUSet cpuset.CPUSet
117 if tmpDefaultCPUSet, err = cpuset.Parse(checkpointV2.DefaultCPUSet); err != nil {
118 return fmt.Errorf("could not parse default cpu set %q: %v", checkpointV2.DefaultCPUSet, err)
119 }
120
121 var tmpContainerCPUSet cpuset.CPUSet
122 tmpAssignments := ContainerCPUAssignments{}
123 for pod := range checkpointV2.Entries {
124 tmpAssignments[pod] = make(map[string]cpuset.CPUSet, len(checkpointV2.Entries[pod]))
125 for container, cpuString := range checkpointV2.Entries[pod] {
126 if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil {
127 return fmt.Errorf("could not parse cpuset %q for container %q in pod %q: %v", cpuString, container, pod, err)
128 }
129 tmpAssignments[pod][container] = tmpContainerCPUSet
130 }
131 }
132
133 sc.cache.SetDefaultCPUSet(tmpDefaultCPUSet)
134 sc.cache.SetCPUAssignments(tmpAssignments)
135
136 klog.V(2).InfoS("State checkpoint: restored state from checkpoint")
137 klog.V(2).InfoS("State checkpoint: defaultCPUSet", "defaultCpuSet", tmpDefaultCPUSet.String())
138
139 return nil
140 }
141
142
143 func (sc *stateCheckpoint) storeState() error {
144 checkpoint := NewCPUManagerCheckpoint()
145 checkpoint.PolicyName = sc.policyName
146 checkpoint.DefaultCPUSet = sc.cache.GetDefaultCPUSet().String()
147
148 assignments := sc.cache.GetCPUAssignments()
149 for pod := range assignments {
150 checkpoint.Entries[pod] = make(map[string]string, len(assignments[pod]))
151 for container, cset := range assignments[pod] {
152 checkpoint.Entries[pod][container] = cset.String()
153 }
154 }
155
156 err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
157 if err != nil {
158 klog.ErrorS(err, "Failed to save checkpoint")
159 return err
160 }
161 return nil
162 }
163
164
165 func (sc *stateCheckpoint) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) {
166 sc.mux.RLock()
167 defer sc.mux.RUnlock()
168
169 res, ok := sc.cache.GetCPUSet(podUID, containerName)
170 return res, ok
171 }
172
173
174 func (sc *stateCheckpoint) GetDefaultCPUSet() cpuset.CPUSet {
175 sc.mux.RLock()
176 defer sc.mux.RUnlock()
177
178 return sc.cache.GetDefaultCPUSet()
179 }
180
181
182 func (sc *stateCheckpoint) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet {
183 sc.mux.RLock()
184 defer sc.mux.RUnlock()
185
186 return sc.cache.GetCPUSetOrDefault(podUID, containerName)
187 }
188
189
190 func (sc *stateCheckpoint) GetCPUAssignments() ContainerCPUAssignments {
191 sc.mux.RLock()
192 defer sc.mux.RUnlock()
193
194 return sc.cache.GetCPUAssignments()
195 }
196
197
198 func (sc *stateCheckpoint) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) {
199 sc.mux.Lock()
200 defer sc.mux.Unlock()
201 sc.cache.SetCPUSet(podUID, containerName, cset)
202 err := sc.storeState()
203 if err != nil {
204 klog.InfoS("Store state to checkpoint error", "err", err)
205 }
206 }
207
208
209 func (sc *stateCheckpoint) SetDefaultCPUSet(cset cpuset.CPUSet) {
210 sc.mux.Lock()
211 defer sc.mux.Unlock()
212 sc.cache.SetDefaultCPUSet(cset)
213 err := sc.storeState()
214 if err != nil {
215 klog.InfoS("Store state to checkpoint error", "err", err)
216 }
217 }
218
219
220 func (sc *stateCheckpoint) SetCPUAssignments(a ContainerCPUAssignments) {
221 sc.mux.Lock()
222 defer sc.mux.Unlock()
223 sc.cache.SetCPUAssignments(a)
224 err := sc.storeState()
225 if err != nil {
226 klog.InfoS("Store state to checkpoint error", "err", err)
227 }
228 }
229
230
231 func (sc *stateCheckpoint) Delete(podUID string, containerName string) {
232 sc.mux.Lock()
233 defer sc.mux.Unlock()
234 sc.cache.Delete(podUID, containerName)
235 err := sc.storeState()
236 if err != nil {
237 klog.InfoS("Store state to checkpoint error", "err", err)
238 }
239 }
240
241
242 func (sc *stateCheckpoint) ClearState() {
243 sc.mux.Lock()
244 defer sc.mux.Unlock()
245 sc.cache.ClearState()
246 err := sc.storeState()
247 if err != nil {
248 klog.InfoS("Store state to checkpoint error", "err", err)
249 }
250 }
251
View as plain text