1
16
17 package preemption
18
19 import (
20 "fmt"
21 "math"
22
23 v1 "k8s.io/api/core/v1"
24 utilfeature "k8s.io/apiserver/pkg/util/feature"
25 "k8s.io/client-go/tools/record"
26 "k8s.io/klog/v2"
27 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
28 "k8s.io/kubernetes/pkg/api/v1/resource"
29 v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
30 "k8s.io/kubernetes/pkg/features"
31 "k8s.io/kubernetes/pkg/kubelet/events"
32 "k8s.io/kubernetes/pkg/kubelet/eviction"
33 "k8s.io/kubernetes/pkg/kubelet/lifecycle"
34 "k8s.io/kubernetes/pkg/kubelet/metrics"
35 kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
36 )
37
38 const message = "Preempted in order to admit critical pod"
39
40
41
42
43
44
45
46
47 type CriticalPodAdmissionHandler struct {
48 getPodsFunc eviction.ActivePodsFunc
49 killPodFunc eviction.KillPodFunc
50 recorder record.EventRecorder
51 }
52
53 var _ lifecycle.AdmissionFailureHandler = &CriticalPodAdmissionHandler{}
54
55 func NewCriticalPodAdmissionHandler(getPodsFunc eviction.ActivePodsFunc, killPodFunc eviction.KillPodFunc, recorder record.EventRecorder) *CriticalPodAdmissionHandler {
56 return &CriticalPodAdmissionHandler{
57 getPodsFunc: getPodsFunc,
58 killPodFunc: killPodFunc,
59 recorder: recorder,
60 }
61 }
62
63
64
65 func (c *CriticalPodAdmissionHandler) HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []lifecycle.PredicateFailureReason) ([]lifecycle.PredicateFailureReason, error) {
66 if !kubetypes.IsCriticalPod(admitPod) {
67 return failureReasons, nil
68 }
69
70
71 nonResourceReasons := []lifecycle.PredicateFailureReason{}
72 resourceReasons := []*admissionRequirement{}
73 for _, reason := range failureReasons {
74 if r, ok := reason.(*lifecycle.InsufficientResourceError); ok {
75 resourceReasons = append(resourceReasons, &admissionRequirement{
76 resourceName: r.ResourceName,
77 quantity: r.GetInsufficientAmount(),
78 })
79 } else {
80 nonResourceReasons = append(nonResourceReasons, reason)
81 }
82 }
83 if len(nonResourceReasons) > 0 {
84
85 return nonResourceReasons, nil
86 }
87 err := c.evictPodsToFreeRequests(admitPod, admissionRequirementList(resourceReasons))
88
89 return nil, err
90 }
91
92
93
94
95 func (c *CriticalPodAdmissionHandler) evictPodsToFreeRequests(admitPod *v1.Pod, insufficientResources admissionRequirementList) error {
96 podsToPreempt, err := getPodsToPreempt(admitPod, c.getPodsFunc(), insufficientResources)
97 if err != nil {
98 return fmt.Errorf("preemption: error finding a set of pods to preempt: %v", err)
99 }
100 for _, pod := range podsToPreempt {
101
102 c.recorder.Eventf(pod, v1.EventTypeWarning, events.PreemptContainer, message)
103
104 klog.V(3).InfoS("Preempting pod to free up resources", "pod", klog.KObj(pod), "podUID", pod.UID, "insufficientResources", insufficientResources)
105 err := c.killPodFunc(pod, true, nil, func(status *v1.PodStatus) {
106 status.Phase = v1.PodFailed
107 status.Reason = events.PreemptContainer
108 status.Message = message
109 if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
110 podutil.UpdatePodCondition(status, &v1.PodCondition{
111 Type: v1.DisruptionTarget,
112 Status: v1.ConditionTrue,
113 Reason: v1.PodReasonTerminationByKubelet,
114 Message: "Pod was preempted by Kubelet to accommodate a critical pod.",
115 })
116 }
117 })
118 if err != nil {
119 klog.ErrorS(err, "Failed to evict pod", "pod", klog.KObj(pod))
120
121 continue
122 }
123 if len(insufficientResources) > 0 {
124 metrics.Preemptions.WithLabelValues(insufficientResources[0].resourceName.String()).Inc()
125 } else {
126 metrics.Preemptions.WithLabelValues("").Inc()
127 }
128 klog.InfoS("Pod evicted successfully", "pod", klog.KObj(pod))
129 }
130 return nil
131 }
132
133
134 func getPodsToPreempt(pod *v1.Pod, pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) {
135 bestEffortPods, burstablePods, guaranteedPods := sortPodsByQOS(pod, pods)
136
137
138 unableToMeetRequirements := requirements.subtract(append(append(bestEffortPods, burstablePods...), guaranteedPods...)...)
139 if len(unableToMeetRequirements) > 0 {
140 return nil, fmt.Errorf("no set of running pods found to reclaim resources: %v", unableToMeetRequirements.toString())
141 }
142
143 guaranteedToEvict, err := getPodsToPreemptByDistance(guaranteedPods, requirements.subtract(append(bestEffortPods, burstablePods...)...))
144 if err != nil {
145 return nil, err
146 }
147
148 burstableToEvict, err := getPodsToPreemptByDistance(burstablePods, requirements.subtract(append(bestEffortPods, guaranteedToEvict...)...))
149 if err != nil {
150 return nil, err
151 }
152
153 bestEffortToEvict, err := getPodsToPreemptByDistance(bestEffortPods, requirements.subtract(append(burstableToEvict, guaranteedToEvict...)...))
154 if err != nil {
155 return nil, err
156 }
157 return append(append(bestEffortToEvict, burstableToEvict...), guaranteedToEvict...), nil
158 }
159
160
161
162
163
164
165
166 func getPodsToPreemptByDistance(pods []*v1.Pod, requirements admissionRequirementList) ([]*v1.Pod, error) {
167 podsToEvict := []*v1.Pod{}
168
169 for len(requirements) > 0 {
170 if len(pods) == 0 {
171 return nil, fmt.Errorf("no set of running pods found to reclaim resources: %v", requirements.toString())
172 }
173
174 bestDistance := float64(len(requirements) + 1)
175 bestPodIndex := 0
176
177
178 for i, pod := range pods {
179 dist := requirements.distance(pod)
180 if dist < bestDistance || (bestDistance == dist && smallerResourceRequest(pod, pods[bestPodIndex])) {
181 bestDistance = dist
182 bestPodIndex = i
183 }
184 }
185
186 requirements = requirements.subtract(pods[bestPodIndex])
187 podsToEvict = append(podsToEvict, pods[bestPodIndex])
188 pods[bestPodIndex] = pods[len(pods)-1]
189 pods = pods[:len(pods)-1]
190 }
191 return podsToEvict, nil
192 }
193
194 type admissionRequirement struct {
195 resourceName v1.ResourceName
196 quantity int64
197 }
198
199 type admissionRequirementList []*admissionRequirement
200
201
202
203
204 func (a admissionRequirementList) distance(pod *v1.Pod) float64 {
205 dist := float64(0)
206 for _, req := range a {
207 remainingRequest := float64(req.quantity - resource.GetResourceRequest(pod, req.resourceName))
208 if remainingRequest > 0 {
209 dist += math.Pow(remainingRequest/float64(req.quantity), 2)
210 }
211 }
212 return dist
213 }
214
215
216
217 func (a admissionRequirementList) subtract(pods ...*v1.Pod) admissionRequirementList {
218 newList := []*admissionRequirement{}
219 for _, req := range a {
220 newQuantity := req.quantity
221 for _, pod := range pods {
222 newQuantity -= resource.GetResourceRequest(pod, req.resourceName)
223 if newQuantity <= 0 {
224 break
225 }
226 }
227 if newQuantity > 0 {
228 newList = append(newList, &admissionRequirement{
229 resourceName: req.resourceName,
230 quantity: newQuantity,
231 })
232 }
233 }
234 return newList
235 }
236
237 func (a admissionRequirementList) toString() string {
238 s := "["
239 for _, req := range a {
240 s += fmt.Sprintf("(res: %v, q: %d), ", req.resourceName, req.quantity)
241 }
242 return s + "]"
243 }
244
245
246
247 func sortPodsByQOS(preemptor *v1.Pod, pods []*v1.Pod) (bestEffort, burstable, guaranteed []*v1.Pod) {
248 for _, pod := range pods {
249 if kubetypes.Preemptable(preemptor, pod) {
250 switch v1qos.GetPodQOS(pod) {
251 case v1.PodQOSBestEffort:
252 bestEffort = append(bestEffort, pod)
253 case v1.PodQOSBurstable:
254 burstable = append(burstable, pod)
255 case v1.PodQOSGuaranteed:
256 guaranteed = append(guaranteed, pod)
257 default:
258 }
259 }
260 }
261
262 return
263 }
264
265
266 func smallerResourceRequest(pod1 *v1.Pod, pod2 *v1.Pod) bool {
267 priorityList := []v1.ResourceName{
268 v1.ResourceMemory,
269 v1.ResourceCPU,
270 }
271 for _, res := range priorityList {
272 req1 := resource.GetResourceRequest(pod1, res)
273 req2 := resource.GetResourceRequest(pod2, res)
274 if req1 < req2 {
275 return true
276 } else if req1 > req2 {
277 return false
278 }
279 }
280 return true
281 }
282
View as plain text