1
2
3
4
19
20 package cm
21
22 import (
23 "fmt"
24 "strconv"
25 "strings"
26 "time"
27
28 v1 "k8s.io/api/core/v1"
29 "k8s.io/apimachinery/pkg/api/resource"
30 "k8s.io/apimachinery/pkg/types"
31 utilfeature "k8s.io/apiserver/pkg/util/feature"
32 "k8s.io/klog/v2"
33 kubefeatures "k8s.io/kubernetes/pkg/features"
34 "k8s.io/kubernetes/pkg/kubelet/events"
35 "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
36 kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
37 )
38
39 const (
40 defaultNodeAllocatableCgroupName = "kubepods"
41 )
42
43
44 func (cm *containerManagerImpl) createNodeAllocatableCgroups() error {
45 nodeAllocatable := cm.internalCapacity
46
47 nc := cm.NodeConfig.NodeAllocatableConfig
48 if cm.CgroupsPerQOS && nc.EnforceNodeAllocatable.Has(kubetypes.NodeAllocatableEnforcementKey) {
49 nodeAllocatable = cm.getNodeAllocatableInternalAbsolute()
50 }
51
52 cgroupConfig := &CgroupConfig{
53 Name: cm.cgroupRoot,
54
55 ResourceParameters: getCgroupConfig(nodeAllocatable),
56 }
57 if cm.cgroupManager.Exists(cgroupConfig.Name) {
58 return nil
59 }
60 if err := cm.cgroupManager.Create(cgroupConfig); err != nil {
61 klog.ErrorS(err, "Failed to create cgroup", "cgroupName", cm.cgroupRoot)
62 return err
63 }
64 return nil
65 }
66
67
68 func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
69 nc := cm.NodeConfig.NodeAllocatableConfig
70
71
72
73 nodeAllocatable := cm.internalCapacity
74
75 if cm.CgroupsPerQOS && nc.EnforceNodeAllocatable.Has(kubetypes.NodeAllocatableEnforcementKey) {
76 nodeAllocatable = cm.getNodeAllocatableInternalAbsolute()
77 }
78
79 klog.V(4).InfoS("Attempting to enforce Node Allocatable", "config", nc)
80
81 cgroupConfig := &CgroupConfig{
82 Name: cm.cgroupRoot,
83 ResourceParameters: getCgroupConfig(nodeAllocatable),
84 }
85
86
87 nodeRef := &v1.ObjectReference{
88 Kind: "Node",
89 Name: cm.nodeInfo.Name,
90 UID: types.UID(cm.nodeInfo.Name),
91 Namespace: "",
92 }
93
94
95
96
97
98
99
100 if len(cm.cgroupRoot) > 0 {
101 go func() {
102 for {
103 err := cm.cgroupManager.Update(cgroupConfig)
104 if err == nil {
105 cm.recorder.Event(nodeRef, v1.EventTypeNormal, events.SuccessfulNodeAllocatableEnforcement, "Updated Node Allocatable limit across pods")
106 return
107 }
108 message := fmt.Sprintf("Failed to update Node Allocatable Limits %q: %v", cm.cgroupRoot, err)
109 cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
110 time.Sleep(time.Minute)
111 }
112 }()
113 }
114
115 if nc.EnforceNodeAllocatable.Has(kubetypes.SystemReservedEnforcementKey) {
116 klog.V(2).InfoS("Enforcing system reserved on cgroup", "cgroupName", nc.SystemReservedCgroupName, "limits", nc.SystemReserved)
117 if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.SystemReservedCgroupName), nc.SystemReserved); err != nil {
118 message := fmt.Sprintf("Failed to enforce System Reserved Cgroup Limits on %q: %v", nc.SystemReservedCgroupName, err)
119 cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
120 return fmt.Errorf(message)
121 }
122 cm.recorder.Eventf(nodeRef, v1.EventTypeNormal, events.SuccessfulNodeAllocatableEnforcement, "Updated limits on system reserved cgroup %v", nc.SystemReservedCgroupName)
123 }
124 if nc.EnforceNodeAllocatable.Has(kubetypes.KubeReservedEnforcementKey) {
125 klog.V(2).InfoS("Enforcing kube reserved on cgroup", "cgroupName", nc.KubeReservedCgroupName, "limits", nc.KubeReserved)
126 if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.KubeReservedCgroupName), nc.KubeReserved); err != nil {
127 message := fmt.Sprintf("Failed to enforce Kube Reserved Cgroup Limits on %q: %v", nc.KubeReservedCgroupName, err)
128 cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
129 return fmt.Errorf(message)
130 }
131 cm.recorder.Eventf(nodeRef, v1.EventTypeNormal, events.SuccessfulNodeAllocatableEnforcement, "Updated limits on kube reserved cgroup %v", nc.KubeReservedCgroupName)
132 }
133 return nil
134 }
135
136
137 func enforceExistingCgroup(cgroupManager CgroupManager, cName CgroupName, rl v1.ResourceList) error {
138 rp := getCgroupConfig(rl)
139 if rp == nil {
140 return fmt.Errorf("%q cgroup is not configured properly", cName)
141 }
142
143
144
145 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryQoS) {
146 if rp.Memory != nil {
147 if rp.Unified == nil {
148 rp.Unified = make(map[string]string)
149 }
150 rp.Unified[Cgroup2MemoryMin] = strconv.FormatInt(*rp.Memory, 10)
151 }
152 }
153
154 cgroupConfig := &CgroupConfig{
155 Name: cName,
156 ResourceParameters: rp,
157 }
158 klog.V(4).InfoS("Enforcing limits on cgroup", "cgroupName", cName, "cpuShares", cgroupConfig.ResourceParameters.CPUShares, "memory", cgroupConfig.ResourceParameters.Memory, "pidsLimit", cgroupConfig.ResourceParameters.PidsLimit)
159 if err := cgroupManager.Validate(cgroupConfig.Name); err != nil {
160 return err
161 }
162 if err := cgroupManager.Update(cgroupConfig); err != nil {
163 return err
164 }
165 return nil
166 }
167
168
169 func getCgroupConfig(rl v1.ResourceList) *ResourceConfig {
170
171 if rl == nil {
172 return nil
173 }
174 var rc ResourceConfig
175 if q, exists := rl[v1.ResourceMemory]; exists {
176
177 val := q.Value()
178 rc.Memory = &val
179 }
180 if q, exists := rl[v1.ResourceCPU]; exists {
181
182 val := MilliCPUToShares(q.MilliValue())
183 rc.CPUShares = &val
184 }
185 if q, exists := rl[pidlimit.PIDs]; exists {
186 val := q.Value()
187 rc.PidsLimit = &val
188 }
189 rc.HugePageLimit = HugePageLimits(rl)
190
191 return &rc
192 }
193
194
195
196
197 func (cm *containerManagerImpl) GetNodeAllocatableAbsolute() v1.ResourceList {
198 return cm.getNodeAllocatableAbsoluteImpl(cm.capacity)
199 }
200
201 func (cm *containerManagerImpl) getNodeAllocatableAbsoluteImpl(capacity v1.ResourceList) v1.ResourceList {
202 result := make(v1.ResourceList)
203 for k, v := range capacity {
204 value := v.DeepCopy()
205 if cm.NodeConfig.SystemReserved != nil {
206 value.Sub(cm.NodeConfig.SystemReserved[k])
207 }
208 if cm.NodeConfig.KubeReserved != nil {
209 value.Sub(cm.NodeConfig.KubeReserved[k])
210 }
211 if value.Sign() < 0 {
212
213 value.Set(0)
214 }
215 result[k] = value
216 }
217 return result
218 }
219
220
221
222
223 func (cm *containerManagerImpl) getNodeAllocatableInternalAbsolute() v1.ResourceList {
224 return cm.getNodeAllocatableAbsoluteImpl(cm.internalCapacity)
225 }
226
227
228 func (cm *containerManagerImpl) GetNodeAllocatableReservation() v1.ResourceList {
229 evictionReservation := hardEvictionReservation(cm.HardEvictionThresholds, cm.capacity)
230 result := make(v1.ResourceList)
231 for k := range cm.capacity {
232 value := resource.NewQuantity(0, resource.DecimalSI)
233 if cm.NodeConfig.SystemReserved != nil {
234 value.Add(cm.NodeConfig.SystemReserved[k])
235 }
236 if cm.NodeConfig.KubeReserved != nil {
237 value.Add(cm.NodeConfig.KubeReserved[k])
238 }
239 if evictionReservation != nil {
240 value.Add(evictionReservation[k])
241 }
242 if !value.IsZero() {
243 result[k] = *value
244 }
245 }
246 return result
247 }
248
249
250
251 func (cm *containerManagerImpl) validateNodeAllocatable() error {
252 var errors []string
253 nar := cm.GetNodeAllocatableReservation()
254 for k, v := range nar {
255 value := cm.capacity[k].DeepCopy()
256 value.Sub(v)
257
258 if value.Sign() < 0 {
259 errors = append(errors, fmt.Sprintf("Resource %q has a reservation of %v but capacity of %v. Expected capacity >= reservation.", k, v, cm.capacity[k]))
260 }
261 }
262
263 if len(errors) > 0 {
264 return fmt.Errorf("invalid Node Allocatable configuration. %s", strings.Join(errors, " "))
265 }
266 return nil
267 }
268
View as plain text