1
16
17 package storage
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "time"
24
25 policyv1 "k8s.io/api/policy/v1"
26 policyv1beta1 "k8s.io/api/policy/v1beta1"
27 "k8s.io/apimachinery/pkg/api/errors"
28 "k8s.io/apimachinery/pkg/api/meta"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/labels"
31 "k8s.io/apimachinery/pkg/runtime"
32 "k8s.io/apimachinery/pkg/runtime/schema"
33 "k8s.io/apimachinery/pkg/util/wait"
34 "k8s.io/apiserver/pkg/registry/rest"
35 "k8s.io/apiserver/pkg/util/dryrun"
36 "k8s.io/apiserver/pkg/util/feature"
37 policyclient "k8s.io/client-go/kubernetes/typed/policy/v1"
38 "k8s.io/client-go/util/retry"
39 pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget"
40 podutil "k8s.io/kubernetes/pkg/api/pod"
41 api "k8s.io/kubernetes/pkg/apis/core"
42 "k8s.io/kubernetes/pkg/apis/policy"
43 "k8s.io/kubernetes/pkg/features"
44 )
45
46 const (
47
48
49
50
51
52
53 MaxDisruptedPodSize = 2000
54 )
55
56
57
58 var EvictionsRetry = wait.Backoff{
59 Steps: 20,
60 Duration: 500 * time.Millisecond,
61 Factor: 1.0,
62 Jitter: 0.1,
63 }
64
65 func newEvictionStorage(store rest.StandardStorage, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) *EvictionREST {
66 return &EvictionREST{store: store, podDisruptionBudgetClient: podDisruptionBudgetClient}
67 }
68
69
70 type EvictionREST struct {
71 store rest.StandardStorage
72 podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter
73 }
74
75 var _ = rest.NamedCreater(&EvictionREST{})
76 var _ = rest.GroupVersionKindProvider(&EvictionREST{})
77 var _ = rest.GroupVersionAcceptor(&EvictionREST{})
78
79 var v1Eviction = schema.GroupVersionKind{Group: "policy", Version: "v1", Kind: "Eviction"}
80
81
82 func (r *EvictionREST) GroupVersionKind(containingGV schema.GroupVersion) schema.GroupVersionKind {
83 return v1Eviction
84 }
85
86
87 func (r *EvictionREST) AcceptsGroupVersion(gv schema.GroupVersion) bool {
88 switch gv {
89 case policyv1.SchemeGroupVersion, policyv1beta1.SchemeGroupVersion:
90 return true
91 default:
92 return false
93 }
94 }
95
96
97 func (r *EvictionREST) New() runtime.Object {
98 return &policy.Eviction{}
99 }
100
101
102 func (r *EvictionREST) Destroy() {
103
104
105 }
106
107
108
109 func propagateDryRun(eviction *policy.Eviction, options *metav1.CreateOptions) (*metav1.DeleteOptions, error) {
110 if eviction.DeleteOptions == nil {
111 return &metav1.DeleteOptions{DryRun: options.DryRun}, nil
112 }
113 if len(eviction.DeleteOptions.DryRun) == 0 {
114 eviction.DeleteOptions.DryRun = options.DryRun
115 return eviction.DeleteOptions, nil
116 }
117 if len(options.DryRun) == 0 {
118 return eviction.DeleteOptions, nil
119 }
120
121 if !reflect.DeepEqual(options.DryRun, eviction.DeleteOptions.DryRun) {
122 return nil, fmt.Errorf("Non-matching dry-run options in request and content: %v and %v", options.DryRun, eviction.DeleteOptions.DryRun)
123 }
124 return eviction.DeleteOptions, nil
125 }
126
127
128 func (r *EvictionREST) Create(ctx context.Context, name string, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
129 eviction, ok := obj.(*policy.Eviction)
130 if !ok {
131 return nil, errors.NewBadRequest(fmt.Sprintf("not a Eviction object: %T", obj))
132 }
133
134 if name != eviction.Name {
135 return nil, errors.NewBadRequest("name in URL does not match name in Eviction object")
136 }
137
138 originalDeleteOptions, err := propagateDryRun(eviction, options)
139 if err != nil {
140 return nil, err
141 }
142
143 if createValidation != nil {
144 if err := createValidation(ctx, eviction.DeepCopyObject()); err != nil {
145 return nil, err
146 }
147 }
148
149 var pod *api.Pod
150 deletedPod := false
151
152 shouldRetry := errors.IsConflict
153 if !resourceVersionIsUnset(originalDeleteOptions) {
154
155 shouldRetry = func(err error) bool { return false }
156 }
157
158 err = retry.OnError(EvictionsRetry, shouldRetry, func() error {
159 pod, err = getPod(r, ctx, eviction.Name)
160 if err != nil {
161 return err
162 }
163
164
165
166 if !canIgnorePDB(pod) {
167
168 return nil
169 }
170
171
172 deleteOptions := originalDeleteOptions
173
174
175
176
177 if shouldEnforceResourceVersion(pod) && resourceVersionIsUnset(originalDeleteOptions) {
178
179
180 deleteOptions = deleteOptions.DeepCopy()
181 setPreconditionsResourceVersion(deleteOptions, &pod.ResourceVersion)
182 }
183 err = addConditionAndDeletePod(r, ctx, eviction.Name, rest.ValidateAllObjectFunc, deleteOptions)
184 if err != nil {
185 return err
186 }
187 deletedPod = true
188 return nil
189 })
190 switch {
191 case err != nil:
192
193
194 return nil, err
195
196 case deletedPod:
197
198 return &metav1.Status{Status: metav1.StatusSuccess}, nil
199
200 default:
201
202
203 }
204
205 var rtStatus *metav1.Status
206 var pdbName string
207 updateDeletionOptions := false
208
209 err = func() error {
210 pdbs, err := r.getPodDisruptionBudgets(ctx, pod)
211 if err != nil {
212 return err
213 }
214
215 if len(pdbs) > 1 {
216 rtStatus = &metav1.Status{
217 Status: metav1.StatusFailure,
218 Message: "This pod has more than one PodDisruptionBudget, which the eviction subresource does not support.",
219 Code: 500,
220 }
221 return nil
222 }
223 if len(pdbs) == 0 {
224 return nil
225 }
226
227 pdb := &pdbs[0]
228 pdbName = pdb.Name
229
230
231
232 if !podutil.IsPodReady(pod) {
233 if feature.DefaultFeatureGate.Enabled(features.PDBUnhealthyPodEvictionPolicy) {
234 if pdb.Spec.UnhealthyPodEvictionPolicy != nil && *pdb.Spec.UnhealthyPodEvictionPolicy == policyv1.AlwaysAllow {
235
236 updateDeletionOptions = true
237 return nil
238 }
239 }
240
241 if pdb.Status.CurrentHealthy >= pdb.Status.DesiredHealthy && pdb.Status.DesiredHealthy > 0 {
242
243
244 updateDeletionOptions = true
245 return nil
246 }
247
248 }
249
250 refresh := false
251 err = retry.RetryOnConflict(EvictionsRetry, func() error {
252 if refresh {
253 pdb, err = r.podDisruptionBudgetClient.PodDisruptionBudgets(pod.Namespace).Get(context.TODO(), pdbName, metav1.GetOptions{})
254 if err != nil {
255 return err
256 }
257 }
258
259
260
261
262 if err = r.checkAndDecrement(pod.Namespace, pod.Name, *pdb, dryrun.IsDryRun(originalDeleteOptions.DryRun)); err != nil {
263 refresh = true
264 return err
265 }
266 return nil
267 })
268 return err
269 }()
270 if err == wait.ErrWaitTimeout {
271 err = errors.NewTimeoutError(fmt.Sprintf("couldn't update PodDisruptionBudget %q due to conflicts", pdbName), 10)
272 }
273 if err != nil {
274 return nil, err
275 }
276
277 if rtStatus != nil {
278 return rtStatus, nil
279 }
280
281
282
283
284 deleteOptions := originalDeleteOptions
285
286
287
288 if updateDeletionOptions {
289
290 deleteOptions = deleteOptions.DeepCopy()
291 setPreconditionsResourceVersion(deleteOptions, &pod.ResourceVersion)
292 }
293
294
295 err = addConditionAndDeletePod(r, ctx, eviction.Name, rest.ValidateAllObjectFunc, deleteOptions)
296 if err != nil {
297 if errors.IsConflict(err) && updateDeletionOptions &&
298 (originalDeleteOptions.Preconditions == nil || originalDeleteOptions.Preconditions.ResourceVersion == nil) {
299
300
301
302 return nil, createTooManyRequestsError(pdbName)
303 }
304 return nil, err
305 }
306
307
308 return &metav1.Status{Status: metav1.StatusSuccess}, nil
309 }
310
311 func addConditionAndDeletePod(r *EvictionREST, ctx context.Context, name string, validation rest.ValidateObjectFunc, options *metav1.DeleteOptions) error {
312 if !dryrun.IsDryRun(options.DryRun) && feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
313 getLatestPod := func(_ context.Context, _, oldObj runtime.Object) (runtime.Object, error) {
314
315
316 latestPod := oldObj.(*api.Pod).DeepCopy()
317 if options.Preconditions != nil {
318 if uid := options.Preconditions.UID; uid != nil && len(*uid) > 0 && *uid != latestPod.UID {
319 return nil, errors.NewConflict(
320 schema.GroupResource{Group: "", Resource: "Pod"},
321 latestPod.Name,
322 fmt.Errorf("the UID in the precondition (%s) does not match the UID in record (%s). The object might have been deleted and then recreated", *uid, latestPod.UID),
323 )
324 }
325 if rv := options.Preconditions.ResourceVersion; rv != nil && len(*rv) > 0 && *rv != latestPod.ResourceVersion {
326 return nil, errors.NewConflict(
327 schema.GroupResource{Group: "", Resource: "Pod"},
328 latestPod.Name,
329 fmt.Errorf("the ResourceVersion in the precondition (%s) does not match the ResourceVersion in record (%s). The object might have been modified", *rv, latestPod.ResourceVersion),
330 )
331 }
332 }
333 return latestPod, nil
334 }
335
336 conditionAppender := func(_ context.Context, newObj, _ runtime.Object) (runtime.Object, error) {
337 podObj := newObj.(*api.Pod)
338 podutil.UpdatePodCondition(&podObj.Status, &api.PodCondition{
339 Type: api.DisruptionTarget,
340 Status: api.ConditionTrue,
341 Reason: "EvictionByEvictionAPI",
342 Message: "Eviction API: evicting",
343 })
344 return podObj, nil
345 }
346
347 podUpdatedObjectInfo := rest.DefaultUpdatedObjectInfo(nil, getLatestPod, conditionAppender)
348
349 updatedPodObject, _, err := r.store.Update(ctx, name, podUpdatedObjectInfo, rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
350 if err != nil {
351 return err
352 }
353
354 if !resourceVersionIsUnset(options) {
355 newResourceVersion, err := meta.NewAccessor().ResourceVersion(updatedPodObject)
356 if err != nil {
357 return err
358 }
359
360 options = options.DeepCopy()
361 options.Preconditions.ResourceVersion = &newResourceVersion
362 }
363 }
364 _, _, err := r.store.Delete(ctx, name, rest.ValidateAllObjectFunc, options)
365 return err
366 }
367
368 func getPod(r *EvictionREST, ctx context.Context, name string) (*api.Pod, error) {
369 obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
370 if err != nil {
371 return nil, err
372 }
373 return obj.(*api.Pod), nil
374 }
375
376 func setPreconditionsResourceVersion(deleteOptions *metav1.DeleteOptions, resourceVersion *string) {
377 if deleteOptions.Preconditions == nil {
378 deleteOptions.Preconditions = &metav1.Preconditions{}
379 }
380 deleteOptions.Preconditions.ResourceVersion = resourceVersion
381 }
382
383
384
385 func canIgnorePDB(pod *api.Pod) bool {
386 if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed ||
387 pod.Status.Phase == api.PodPending || !pod.ObjectMeta.DeletionTimestamp.IsZero() {
388 return true
389 }
390 return false
391 }
392
393 func shouldEnforceResourceVersion(pod *api.Pod) bool {
394
395 if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed || !pod.ObjectMeta.DeletionTimestamp.IsZero() {
396 return false
397 }
398
399
400 return true
401 }
402
403 func resourceVersionIsUnset(options *metav1.DeleteOptions) bool {
404 return options.Preconditions == nil || options.Preconditions.ResourceVersion == nil
405 }
406
407 func createTooManyRequestsError(name string) error {
408
409
410
411
412 err := errors.NewTooManyRequests("Cannot evict pod as it would violate the pod's disruption budget.", 10)
413 err.ErrStatus.Details.Causes = append(err.ErrStatus.Details.Causes, metav1.StatusCause{Type: policyv1.DisruptionBudgetCause, Message: fmt.Sprintf("The disruption budget %s is still being processed by the server.", name)})
414 return err
415 }
416
417
418 func (r *EvictionREST) checkAndDecrement(namespace string, podName string, pdb policyv1.PodDisruptionBudget, dryRun bool) error {
419 if pdb.Status.ObservedGeneration < pdb.Generation {
420
421 return createTooManyRequestsError(pdb.Name)
422 }
423 if pdb.Status.DisruptionsAllowed < 0 {
424 return errors.NewForbidden(policy.Resource("poddisruptionbudget"), pdb.Name, fmt.Errorf("pdb disruptions allowed is negative"))
425 }
426 if len(pdb.Status.DisruptedPods) > MaxDisruptedPodSize {
427 return errors.NewForbidden(policy.Resource("poddisruptionbudget"), pdb.Name, fmt.Errorf("DisruptedPods map too big - too many evictions not confirmed by PDB controller"))
428 }
429 if pdb.Status.DisruptionsAllowed == 0 {
430 err := errors.NewTooManyRequests("Cannot evict pod as it would violate the pod's disruption budget.", 0)
431 err.ErrStatus.Details.Causes = append(err.ErrStatus.Details.Causes, metav1.StatusCause{Type: policyv1.DisruptionBudgetCause, Message: fmt.Sprintf("The disruption budget %s needs %d healthy pods and has %d currently", pdb.Name, pdb.Status.DesiredHealthy, pdb.Status.CurrentHealthy)})
432 return err
433 }
434
435 pdb.Status.DisruptionsAllowed--
436 if pdb.Status.DisruptionsAllowed == 0 {
437 pdbhelper.UpdateDisruptionAllowedCondition(&pdb)
438 }
439
440
441 if dryRun {
442 return nil
443 }
444
445 if pdb.Status.DisruptedPods == nil {
446 pdb.Status.DisruptedPods = make(map[string]metav1.Time)
447 }
448
449
450
451
452
453 pdb.Status.DisruptedPods[podName] = metav1.Time{Time: time.Now()}
454 if _, err := r.podDisruptionBudgetClient.PodDisruptionBudgets(namespace).UpdateStatus(context.TODO(), &pdb, metav1.UpdateOptions{}); err != nil {
455 return err
456 }
457
458 return nil
459 }
460
461
462 func (r *EvictionREST) getPodDisruptionBudgets(ctx context.Context, pod *api.Pod) ([]policyv1.PodDisruptionBudget, error) {
463 pdbList, err := r.podDisruptionBudgetClient.PodDisruptionBudgets(pod.Namespace).List(context.TODO(), metav1.ListOptions{})
464 if err != nil {
465 return nil, err
466 }
467
468 var pdbs []policyv1.PodDisruptionBudget
469 for _, pdb := range pdbList.Items {
470 if pdb.Namespace != pod.Namespace {
471 continue
472 }
473 selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
474 if err != nil {
475
476 continue
477 }
478 if !selector.Matches(labels.Set(pod.Labels)) {
479 continue
480 }
481
482 pdbs = append(pdbs, pdb)
483 }
484
485 return pdbs, nil
486 }
487
View as plain text