1
16
17 package drain
18
19 import (
20 "context"
21 "fmt"
22 "io"
23 "math"
24 "time"
25
26 corev1 "k8s.io/api/core/v1"
27 policyv1 "k8s.io/api/policy/v1"
28 policyv1beta1 "k8s.io/api/policy/v1beta1"
29 apierrors "k8s.io/apimachinery/pkg/api/errors"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/fields"
32 "k8s.io/apimachinery/pkg/labels"
33 "k8s.io/apimachinery/pkg/runtime"
34 "k8s.io/apimachinery/pkg/runtime/schema"
35 utilerrors "k8s.io/apimachinery/pkg/util/errors"
36 "k8s.io/apimachinery/pkg/util/wait"
37 "k8s.io/cli-runtime/pkg/resource"
38 "k8s.io/client-go/kubernetes"
39 cmdutil "k8s.io/kubectl/pkg/cmd/util"
40 )
41
42 const (
43
44 EvictionKind = "Eviction"
45
46 EvictionSubresource = "pods/eviction"
47 podSkipMsgTemplate = "pod %q has DeletionTimestamp older than %v seconds, skipping\n"
48 )
49
50
51 type Helper struct {
52 Ctx context.Context
53 Client kubernetes.Interface
54 Force bool
55
56
57
58
59 GracePeriodSeconds int
60
61 IgnoreAllDaemonSets bool
62 Timeout time.Duration
63 DeleteEmptyDirData bool
64 Selector string
65 PodSelector string
66 ChunkSize int64
67
68
69 DisableEviction bool
70
71
72
73
74
75 SkipWaitForDeleteTimeoutSeconds int
76
77
78
79
80 AdditionalFilters []PodFilter
81
82 Out io.Writer
83 ErrOut io.Writer
84
85 DryRunStrategy cmdutil.DryRunStrategy
86
87
88
89 OnPodDeletedOrEvicted func(pod *corev1.Pod, usingEviction bool)
90
91
92 OnPodDeletionOrEvictionFinished func(pod *corev1.Pod, usingEviction bool, err error)
93
94
95 OnPodDeletionOrEvictionStarted func(pod *corev1.Pod, usingEviction bool)
96 }
97
98 type waitForDeleteParams struct {
99 ctx context.Context
100 pods []corev1.Pod
101 interval time.Duration
102 timeout time.Duration
103 usingEviction bool
104 getPodFn func(string, string) (*corev1.Pod, error)
105 onDoneFn func(pod *corev1.Pod, usingEviction bool)
106 onFinishFn func(pod *corev1.Pod, usingEviction bool, err error)
107 globalTimeout time.Duration
108 skipWaitForDeleteTimeoutSeconds int
109 out io.Writer
110 }
111
112
113
114
115 func CheckEvictionSupport(clientset kubernetes.Interface) (schema.GroupVersion, error) {
116 discoveryClient := clientset.Discovery()
117
118
119 resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1")
120 if err != nil {
121 return schema.GroupVersion{}, err
122 }
123 for _, resource := range resourceList.APIResources {
124 if resource.Name == EvictionSubresource && resource.Kind == EvictionKind && len(resource.Group) > 0 && len(resource.Version) > 0 {
125 return schema.GroupVersion{Group: resource.Group, Version: resource.Version}, nil
126 }
127 }
128 return schema.GroupVersion{}, nil
129 }
130
131 func (d *Helper) makeDeleteOptions() metav1.DeleteOptions {
132 deleteOptions := metav1.DeleteOptions{}
133 if d.GracePeriodSeconds >= 0 {
134 gracePeriodSeconds := int64(d.GracePeriodSeconds)
135 deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
136 }
137 if d.DryRunStrategy == cmdutil.DryRunServer {
138 deleteOptions.DryRun = []string{metav1.DryRunAll}
139 }
140 return deleteOptions
141 }
142
143
144 func (d *Helper) DeletePod(pod corev1.Pod) error {
145 return d.Client.CoreV1().Pods(pod.Namespace).Delete(d.getContext(), pod.Name, d.makeDeleteOptions())
146 }
147
148
149 func (d *Helper) EvictPod(pod corev1.Pod, evictionGroupVersion schema.GroupVersion) error {
150 delOpts := d.makeDeleteOptions()
151
152 switch evictionGroupVersion {
153 case policyv1.SchemeGroupVersion:
154
155 eviction := &policyv1.Eviction{
156 ObjectMeta: metav1.ObjectMeta{
157 Name: pod.Name,
158 Namespace: pod.Namespace,
159 },
160 DeleteOptions: &delOpts,
161 }
162 return d.Client.PolicyV1().Evictions(eviction.Namespace).Evict(context.TODO(), eviction)
163
164 default:
165
166 eviction := &policyv1beta1.Eviction{
167 ObjectMeta: metav1.ObjectMeta{
168 Name: pod.Name,
169 Namespace: pod.Namespace,
170 },
171 DeleteOptions: &delOpts,
172 }
173 return d.Client.PolicyV1beta1().Evictions(eviction.Namespace).Evict(context.TODO(), eviction)
174 }
175 }
176
177
178
179
180
181 func (d *Helper) GetPodsForDeletion(nodeName string) (*PodDeleteList, []error) {
182 labelSelector, err := labels.Parse(d.PodSelector)
183 if err != nil {
184 return nil, []error{err}
185 }
186
187 podList := &corev1.PodList{}
188 initialOpts := &metav1.ListOptions{
189 LabelSelector: labelSelector.String(),
190 FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}).String(),
191 Limit: d.ChunkSize,
192 }
193
194 err = resource.FollowContinue(initialOpts, func(options metav1.ListOptions) (runtime.Object, error) {
195 newPods, err := d.Client.CoreV1().Pods(metav1.NamespaceAll).List(d.getContext(), options)
196 if err != nil {
197 podR := corev1.SchemeGroupVersion.WithResource(corev1.ResourcePods.String())
198 return nil, resource.EnhanceListError(err, options, podR.String())
199 }
200 podList.Items = append(podList.Items, newPods.Items...)
201 return newPods, nil
202 })
203
204 if err != nil {
205 return nil, []error{err}
206 }
207
208 list := filterPods(podList, d.makeFilters())
209 if errs := list.errors(); len(errs) > 0 {
210 return list, errs
211 }
212
213 return list, nil
214 }
215
216 func filterPods(podList *corev1.PodList, filters []PodFilter) *PodDeleteList {
217 pods := []PodDelete{}
218 for _, pod := range podList.Items {
219 var status PodDeleteStatus
220 for _, filter := range filters {
221 status = filter(pod)
222 if !status.Delete {
223
224
225
226 break
227 }
228 }
229
230
231
232 pod.Kind = "Pod"
233 pod.APIVersion = "v1"
234 pods = append(pods, PodDelete{
235 Pod: pod,
236 Status: status,
237 })
238 }
239 list := &PodDeleteList{items: pods}
240 return list
241 }
242
243
244 func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error {
245 if len(pods) == 0 {
246 return nil
247 }
248
249
250 getPodFn := func(namespace, name string) (*corev1.Pod, error) {
251 return d.Client.CoreV1().Pods(namespace).Get(d.getContext(), name, metav1.GetOptions{})
252 }
253
254 if !d.DisableEviction {
255 evictionGroupVersion, err := CheckEvictionSupport(d.Client)
256 if err != nil {
257 return err
258 }
259
260 if !evictionGroupVersion.Empty() {
261 return d.evictPods(pods, evictionGroupVersion, getPodFn)
262 }
263 }
264
265 return d.deletePods(pods, getPodFn)
266 }
267
268 func (d *Helper) evictPods(pods []corev1.Pod, evictionGroupVersion schema.GroupVersion, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
269 returnCh := make(chan error, 1)
270
271 var globalTimeout time.Duration
272 if d.Timeout == 0 {
273 globalTimeout = time.Duration(math.MaxInt64)
274 } else {
275 globalTimeout = d.Timeout
276 }
277 ctx, cancel := context.WithTimeout(d.getContext(), globalTimeout)
278 defer cancel()
279 for _, pod := range pods {
280 go func(pod corev1.Pod, returnCh chan error) {
281 refreshPod := false
282 for {
283 switch d.DryRunStrategy {
284 case cmdutil.DryRunServer:
285 fmt.Fprintf(d.Out, "evicting pod %s/%s (server dry run)\n", pod.Namespace, pod.Name)
286 default:
287 if d.OnPodDeletionOrEvictionStarted != nil {
288 d.OnPodDeletionOrEvictionStarted(&pod, true)
289 }
290 fmt.Fprintf(d.Out, "evicting pod %s/%s\n", pod.Namespace, pod.Name)
291 }
292 select {
293 case <-ctx.Done():
294
295 returnCh <- fmt.Errorf("error when evicting pods/%q -n %q: global timeout reached: %v", pod.Name, pod.Namespace, globalTimeout)
296 return
297 default:
298 }
299
300
301 activePod := pod
302 if refreshPod {
303 freshPod, err := getPodFn(pod.Namespace, pod.Name)
304
305
306 if err == nil {
307 activePod = *freshPod
308 }
309 refreshPod = false
310 }
311
312 err := d.EvictPod(activePod, evictionGroupVersion)
313 if err == nil {
314 break
315 } else if apierrors.IsNotFound(err) {
316 returnCh <- nil
317 return
318 } else if apierrors.IsTooManyRequests(err) {
319 fmt.Fprintf(d.ErrOut, "error when evicting pods/%q -n %q (will retry after 5s): %v\n", activePod.Name, activePod.Namespace, err)
320 time.Sleep(5 * time.Second)
321 } else if !activePod.ObjectMeta.DeletionTimestamp.IsZero() && apierrors.IsForbidden(err) && apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
322
323
324
325 break
326 } else if apierrors.IsForbidden(err) && apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
327
328
329 fmt.Fprintf(d.ErrOut, "error when evicting pod %q from terminating namespace %q (will retry after 5s): %v\n", activePod.Name, activePod.Namespace, err)
330 time.Sleep(5 * time.Second)
331 } else {
332 returnCh <- fmt.Errorf("error when evicting pods/%q -n %q: %v", activePod.Name, activePod.Namespace, err)
333 return
334 }
335 }
336 if d.DryRunStrategy == cmdutil.DryRunServer {
337 returnCh <- nil
338 return
339 }
340 params := waitForDeleteParams{
341 ctx: ctx,
342 pods: []corev1.Pod{pod},
343 interval: 1 * time.Second,
344 timeout: time.Duration(math.MaxInt64),
345 usingEviction: true,
346 getPodFn: getPodFn,
347 onDoneFn: d.OnPodDeletedOrEvicted,
348 onFinishFn: d.OnPodDeletionOrEvictionFinished,
349 globalTimeout: globalTimeout,
350 skipWaitForDeleteTimeoutSeconds: d.SkipWaitForDeleteTimeoutSeconds,
351 out: d.Out,
352 }
353 _, err := waitForDelete(params)
354 if err == nil {
355 returnCh <- nil
356 } else {
357 returnCh <- fmt.Errorf("error when waiting for pod %q in namespace %q to terminate: %v", pod.Name, pod.Namespace, err)
358 }
359 }(pod, returnCh)
360 }
361
362 doneCount := 0
363 var errors []error
364
365 numPods := len(pods)
366 for doneCount < numPods {
367 select {
368 case err := <-returnCh:
369 doneCount++
370 if err != nil {
371 errors = append(errors, err)
372 }
373 }
374 }
375
376 return utilerrors.NewAggregate(errors)
377 }
378
379 func (d *Helper) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
380
381 var globalTimeout time.Duration
382 if d.Timeout == 0 {
383 globalTimeout = time.Duration(math.MaxInt64)
384 } else {
385 globalTimeout = d.Timeout
386 }
387 for _, pod := range pods {
388 err := d.DeletePod(pod)
389 if err != nil && !apierrors.IsNotFound(err) {
390 return err
391 }
392 if d.OnPodDeletionOrEvictionStarted != nil {
393 d.OnPodDeletionOrEvictionStarted(&pod, false)
394 }
395 }
396 ctx := d.getContext()
397 params := waitForDeleteParams{
398 ctx: ctx,
399 pods: pods,
400 interval: 1 * time.Second,
401 timeout: globalTimeout,
402 usingEviction: false,
403 getPodFn: getPodFn,
404 onDoneFn: d.OnPodDeletedOrEvicted,
405 onFinishFn: d.OnPodDeletionOrEvictionFinished,
406 globalTimeout: globalTimeout,
407 skipWaitForDeleteTimeoutSeconds: d.SkipWaitForDeleteTimeoutSeconds,
408 out: d.Out,
409 }
410 _, err := waitForDelete(params)
411 return err
412 }
413
414 func waitForDelete(params waitForDeleteParams) ([]corev1.Pod, error) {
415 pods := params.pods
416 err := wait.PollImmediate(params.interval, params.timeout, func() (bool, error) {
417 pendingPods := []corev1.Pod{}
418 for i, pod := range pods {
419 p, err := params.getPodFn(pod.Namespace, pod.Name)
420
421
422 if apierrors.IsNotFound(err) || (err == nil && p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
423 if params.onFinishFn != nil {
424 params.onFinishFn(&pod, params.usingEviction, nil)
425 } else if params.onDoneFn != nil {
426 params.onDoneFn(&pod, params.usingEviction)
427 }
428 continue
429 } else if err != nil {
430 if params.onFinishFn != nil {
431 params.onFinishFn(&pod, params.usingEviction, err)
432 }
433 return false, err
434 } else {
435 if shouldSkipPod(*p, params.skipWaitForDeleteTimeoutSeconds) {
436 fmt.Fprintf(params.out, podSkipMsgTemplate, pod.Name, params.skipWaitForDeleteTimeoutSeconds)
437 continue
438 }
439 pendingPods = append(pendingPods, pods[i])
440 }
441 }
442 pods = pendingPods
443 if len(pendingPods) > 0 {
444 select {
445 case <-params.ctx.Done():
446 return false, fmt.Errorf("global timeout reached: %v", params.globalTimeout)
447 default:
448 return false, nil
449 }
450 }
451 return true, nil
452 })
453 return pods, err
454 }
455
456
457
458
459 func (d *Helper) getContext() context.Context {
460 if d.Ctx != nil {
461 return d.Ctx
462 }
463 return context.Background()
464 }
465
View as plain text