...

Source file src/k8s.io/kubectl/pkg/drain/drain.go

Documentation: k8s.io/kubectl/pkg/drain

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package drain
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"io"
    23  	"math"
    24  	"time"
    25  
    26  	corev1 "k8s.io/api/core/v1"
    27  	policyv1 "k8s.io/api/policy/v1"
    28  	policyv1beta1 "k8s.io/api/policy/v1beta1"
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/fields"
    32  	"k8s.io/apimachinery/pkg/labels"
    33  	"k8s.io/apimachinery/pkg/runtime"
    34  	"k8s.io/apimachinery/pkg/runtime/schema"
    35  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	"k8s.io/cli-runtime/pkg/resource"
    38  	"k8s.io/client-go/kubernetes"
    39  	cmdutil "k8s.io/kubectl/pkg/cmd/util"
    40  )
    41  
    42  const (
    43  	// EvictionKind represents the kind of evictions object
    44  	EvictionKind = "Eviction"
    45  	// EvictionSubresource represents the kind of evictions object as pod's subresource
    46  	EvictionSubresource = "pods/eviction"
    47  	podSkipMsgTemplate  = "pod %q has DeletionTimestamp older than %v seconds, skipping\n"
    48  )
    49  
    50  // Helper contains the parameters to control the behaviour of drainer
    51  type Helper struct {
    52  	Ctx    context.Context
    53  	Client kubernetes.Interface
    54  	Force  bool
    55  
    56  	// GracePeriodSeconds is how long to wait for a pod to terminate.
    57  	// IMPORTANT: 0 means "delete immediately"; set to a negative value
    58  	// to use the pod's terminationGracePeriodSeconds.
    59  	GracePeriodSeconds int
    60  
    61  	IgnoreAllDaemonSets bool
    62  	Timeout             time.Duration
    63  	DeleteEmptyDirData  bool
    64  	Selector            string
    65  	PodSelector         string
    66  	ChunkSize           int64
    67  
    68  	// DisableEviction forces drain to use delete rather than evict
    69  	DisableEviction bool
    70  
    71  	// SkipWaitForDeleteTimeoutSeconds ignores pods that have a
    72  	// DeletionTimeStamp > N seconds. It's up to the user to decide when this
    73  	// option is appropriate; examples include the Node is unready and the pods
    74  	// won't drain otherwise
    75  	SkipWaitForDeleteTimeoutSeconds int
    76  
    77  	// AdditionalFilters are applied sequentially after base drain filters to
    78  	// exclude pods using custom logic.  Any filter that returns PodDeleteStatus
    79  	// with Delete == false will immediately stop execution of further filters.
    80  	AdditionalFilters []PodFilter
    81  
    82  	Out    io.Writer
    83  	ErrOut io.Writer
    84  
    85  	DryRunStrategy cmdutil.DryRunStrategy
    86  
    87  	// OnPodDeletedOrEvicted is called when a pod is evicted/deleted; for printing progress output
    88  	// Deprecated: use OnPodDeletionOrEvictionFinished instead
    89  	OnPodDeletedOrEvicted func(pod *corev1.Pod, usingEviction bool)
    90  
    91  	// OnPodDeletionOrEvictionFinished is called when a pod is eviction/deletetion is failed; for printing progress output
    92  	OnPodDeletionOrEvictionFinished func(pod *corev1.Pod, usingEviction bool, err error)
    93  
    94  	// OnPodDeletionOrEvictionStarted is called when a pod eviction/deletion is started; for printing progress output
    95  	OnPodDeletionOrEvictionStarted func(pod *corev1.Pod, usingEviction bool)
    96  }
    97  
    98  type waitForDeleteParams struct {
    99  	ctx                             context.Context
   100  	pods                            []corev1.Pod
   101  	interval                        time.Duration
   102  	timeout                         time.Duration
   103  	usingEviction                   bool
   104  	getPodFn                        func(string, string) (*corev1.Pod, error)
   105  	onDoneFn                        func(pod *corev1.Pod, usingEviction bool)
   106  	onFinishFn                      func(pod *corev1.Pod, usingEviction bool, err error)
   107  	globalTimeout                   time.Duration
   108  	skipWaitForDeleteTimeoutSeconds int
   109  	out                             io.Writer
   110  }
   111  
   112  // CheckEvictionSupport uses Discovery API to find out if the server support
   113  // eviction subresource If support, it will return its groupVersion; Otherwise,
   114  // it will return an empty GroupVersion
   115  func CheckEvictionSupport(clientset kubernetes.Interface) (schema.GroupVersion, error) {
   116  	discoveryClient := clientset.Discovery()
   117  
   118  	// version info available in subresources since v1.8.0 in https://github.com/kubernetes/kubernetes/pull/49971
   119  	resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1")
   120  	if err != nil {
   121  		return schema.GroupVersion{}, err
   122  	}
   123  	for _, resource := range resourceList.APIResources {
   124  		if resource.Name == EvictionSubresource && resource.Kind == EvictionKind && len(resource.Group) > 0 && len(resource.Version) > 0 {
   125  			return schema.GroupVersion{Group: resource.Group, Version: resource.Version}, nil
   126  		}
   127  	}
   128  	return schema.GroupVersion{}, nil
   129  }
   130  
   131  func (d *Helper) makeDeleteOptions() metav1.DeleteOptions {
   132  	deleteOptions := metav1.DeleteOptions{}
   133  	if d.GracePeriodSeconds >= 0 {
   134  		gracePeriodSeconds := int64(d.GracePeriodSeconds)
   135  		deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
   136  	}
   137  	if d.DryRunStrategy == cmdutil.DryRunServer {
   138  		deleteOptions.DryRun = []string{metav1.DryRunAll}
   139  	}
   140  	return deleteOptions
   141  }
   142  
   143  // DeletePod will delete the given pod, or return an error if it couldn't
   144  func (d *Helper) DeletePod(pod corev1.Pod) error {
   145  	return d.Client.CoreV1().Pods(pod.Namespace).Delete(d.getContext(), pod.Name, d.makeDeleteOptions())
   146  }
   147  
   148  // EvictPod will evict the given pod, or return an error if it couldn't
   149  func (d *Helper) EvictPod(pod corev1.Pod, evictionGroupVersion schema.GroupVersion) error {
   150  	delOpts := d.makeDeleteOptions()
   151  
   152  	switch evictionGroupVersion {
   153  	case policyv1.SchemeGroupVersion:
   154  		// send policy/v1 if the server supports it
   155  		eviction := &policyv1.Eviction{
   156  			ObjectMeta: metav1.ObjectMeta{
   157  				Name:      pod.Name,
   158  				Namespace: pod.Namespace,
   159  			},
   160  			DeleteOptions: &delOpts,
   161  		}
   162  		return d.Client.PolicyV1().Evictions(eviction.Namespace).Evict(context.TODO(), eviction)
   163  
   164  	default:
   165  		// otherwise, fall back to policy/v1beta1, supported by all servers that support the eviction subresource
   166  		eviction := &policyv1beta1.Eviction{
   167  			ObjectMeta: metav1.ObjectMeta{
   168  				Name:      pod.Name,
   169  				Namespace: pod.Namespace,
   170  			},
   171  			DeleteOptions: &delOpts,
   172  		}
   173  		return d.Client.PolicyV1beta1().Evictions(eviction.Namespace).Evict(context.TODO(), eviction)
   174  	}
   175  }
   176  
   177  // GetPodsForDeletion receives resource info for a node, and returns those pods as PodDeleteList,
   178  // or error if it cannot list pods. All pods that are ready to be deleted can be obtained with .Pods(),
   179  // and string with all warning can be obtained with .Warnings(), and .Errors() for all errors that
   180  // occurred during deletion.
   181  func (d *Helper) GetPodsForDeletion(nodeName string) (*PodDeleteList, []error) {
   182  	labelSelector, err := labels.Parse(d.PodSelector)
   183  	if err != nil {
   184  		return nil, []error{err}
   185  	}
   186  
   187  	podList := &corev1.PodList{}
   188  	initialOpts := &metav1.ListOptions{
   189  		LabelSelector: labelSelector.String(),
   190  		FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}).String(),
   191  		Limit:         d.ChunkSize,
   192  	}
   193  
   194  	err = resource.FollowContinue(initialOpts, func(options metav1.ListOptions) (runtime.Object, error) {
   195  		newPods, err := d.Client.CoreV1().Pods(metav1.NamespaceAll).List(d.getContext(), options)
   196  		if err != nil {
   197  			podR := corev1.SchemeGroupVersion.WithResource(corev1.ResourcePods.String())
   198  			return nil, resource.EnhanceListError(err, options, podR.String())
   199  		}
   200  		podList.Items = append(podList.Items, newPods.Items...)
   201  		return newPods, nil
   202  	})
   203  
   204  	if err != nil {
   205  		return nil, []error{err}
   206  	}
   207  
   208  	list := filterPods(podList, d.makeFilters())
   209  	if errs := list.errors(); len(errs) > 0 {
   210  		return list, errs
   211  	}
   212  
   213  	return list, nil
   214  }
   215  
   216  func filterPods(podList *corev1.PodList, filters []PodFilter) *PodDeleteList {
   217  	pods := []PodDelete{}
   218  	for _, pod := range podList.Items {
   219  		var status PodDeleteStatus
   220  		for _, filter := range filters {
   221  			status = filter(pod)
   222  			if !status.Delete {
   223  				// short-circuit as soon as pod is filtered out
   224  				// at that point, there is no reason to run pod
   225  				// through any additional filters
   226  				break
   227  			}
   228  		}
   229  		// Add the pod to PodDeleteList no matter what PodDeleteStatus is,
   230  		// those pods whose PodDeleteStatus is false like DaemonSet will
   231  		// be catched by list.errors()
   232  		pod.Kind = "Pod"
   233  		pod.APIVersion = "v1"
   234  		pods = append(pods, PodDelete{
   235  			Pod:    pod,
   236  			Status: status,
   237  		})
   238  	}
   239  	list := &PodDeleteList{items: pods}
   240  	return list
   241  }
   242  
   243  // DeleteOrEvictPods deletes or evicts the pods on the api server
   244  func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error {
   245  	if len(pods) == 0 {
   246  		return nil
   247  	}
   248  
   249  	// TODO(justinsb): unnecessary?
   250  	getPodFn := func(namespace, name string) (*corev1.Pod, error) {
   251  		return d.Client.CoreV1().Pods(namespace).Get(d.getContext(), name, metav1.GetOptions{})
   252  	}
   253  
   254  	if !d.DisableEviction {
   255  		evictionGroupVersion, err := CheckEvictionSupport(d.Client)
   256  		if err != nil {
   257  			return err
   258  		}
   259  
   260  		if !evictionGroupVersion.Empty() {
   261  			return d.evictPods(pods, evictionGroupVersion, getPodFn)
   262  		}
   263  	}
   264  
   265  	return d.deletePods(pods, getPodFn)
   266  }
   267  
   268  func (d *Helper) evictPods(pods []corev1.Pod, evictionGroupVersion schema.GroupVersion, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
   269  	returnCh := make(chan error, 1)
   270  	// 0 timeout means infinite, we use MaxInt64 to represent it.
   271  	var globalTimeout time.Duration
   272  	if d.Timeout == 0 {
   273  		globalTimeout = time.Duration(math.MaxInt64)
   274  	} else {
   275  		globalTimeout = d.Timeout
   276  	}
   277  	ctx, cancel := context.WithTimeout(d.getContext(), globalTimeout)
   278  	defer cancel()
   279  	for _, pod := range pods {
   280  		go func(pod corev1.Pod, returnCh chan error) {
   281  			refreshPod := false
   282  			for {
   283  				switch d.DryRunStrategy {
   284  				case cmdutil.DryRunServer:
   285  					fmt.Fprintf(d.Out, "evicting pod %s/%s (server dry run)\n", pod.Namespace, pod.Name)
   286  				default:
   287  					if d.OnPodDeletionOrEvictionStarted != nil {
   288  						d.OnPodDeletionOrEvictionStarted(&pod, true)
   289  					}
   290  					fmt.Fprintf(d.Out, "evicting pod %s/%s\n", pod.Namespace, pod.Name)
   291  				}
   292  				select {
   293  				case <-ctx.Done():
   294  					// return here or we'll leak a goroutine.
   295  					returnCh <- fmt.Errorf("error when evicting pods/%q -n %q: global timeout reached: %v", pod.Name, pod.Namespace, globalTimeout)
   296  					return
   297  				default:
   298  				}
   299  
   300  				// Create a temporary pod so we don't mutate the pod in the loop.
   301  				activePod := pod
   302  				if refreshPod {
   303  					freshPod, err := getPodFn(pod.Namespace, pod.Name)
   304  					// We ignore errors and let eviction sort it out with
   305  					// the original pod.
   306  					if err == nil {
   307  						activePod = *freshPod
   308  					}
   309  					refreshPod = false
   310  				}
   311  
   312  				err := d.EvictPod(activePod, evictionGroupVersion)
   313  				if err == nil {
   314  					break
   315  				} else if apierrors.IsNotFound(err) {
   316  					returnCh <- nil
   317  					return
   318  				} else if apierrors.IsTooManyRequests(err) {
   319  					fmt.Fprintf(d.ErrOut, "error when evicting pods/%q -n %q (will retry after 5s): %v\n", activePod.Name, activePod.Namespace, err)
   320  					time.Sleep(5 * time.Second)
   321  				} else if !activePod.ObjectMeta.DeletionTimestamp.IsZero() && apierrors.IsForbidden(err) && apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
   322  					// an eviction request in a deleting namespace will throw a forbidden error,
   323  					// if the pod is already marked deleted, we can ignore this error, an eviction
   324  					// request will never succeed, but we will waitForDelete for this pod.
   325  					break
   326  				} else if apierrors.IsForbidden(err) && apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
   327  					// an eviction request in a deleting namespace will throw a forbidden error,
   328  					// if the pod is not marked deleted, we retry until it is.
   329  					fmt.Fprintf(d.ErrOut, "error when evicting pod %q from terminating namespace %q (will retry after 5s): %v\n", activePod.Name, activePod.Namespace, err)
   330  					time.Sleep(5 * time.Second)
   331  				} else {
   332  					returnCh <- fmt.Errorf("error when evicting pods/%q -n %q: %v", activePod.Name, activePod.Namespace, err)
   333  					return
   334  				}
   335  			}
   336  			if d.DryRunStrategy == cmdutil.DryRunServer {
   337  				returnCh <- nil
   338  				return
   339  			}
   340  			params := waitForDeleteParams{
   341  				ctx:                             ctx,
   342  				pods:                            []corev1.Pod{pod},
   343  				interval:                        1 * time.Second,
   344  				timeout:                         time.Duration(math.MaxInt64),
   345  				usingEviction:                   true,
   346  				getPodFn:                        getPodFn,
   347  				onDoneFn:                        d.OnPodDeletedOrEvicted,
   348  				onFinishFn:                      d.OnPodDeletionOrEvictionFinished,
   349  				globalTimeout:                   globalTimeout,
   350  				skipWaitForDeleteTimeoutSeconds: d.SkipWaitForDeleteTimeoutSeconds,
   351  				out:                             d.Out,
   352  			}
   353  			_, err := waitForDelete(params)
   354  			if err == nil {
   355  				returnCh <- nil
   356  			} else {
   357  				returnCh <- fmt.Errorf("error when waiting for pod %q in namespace %q to terminate: %v", pod.Name, pod.Namespace, err)
   358  			}
   359  		}(pod, returnCh)
   360  	}
   361  
   362  	doneCount := 0
   363  	var errors []error
   364  
   365  	numPods := len(pods)
   366  	for doneCount < numPods {
   367  		select {
   368  		case err := <-returnCh:
   369  			doneCount++
   370  			if err != nil {
   371  				errors = append(errors, err)
   372  			}
   373  		}
   374  	}
   375  
   376  	return utilerrors.NewAggregate(errors)
   377  }
   378  
   379  func (d *Helper) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
   380  	// 0 timeout means infinite, we use MaxInt64 to represent it.
   381  	var globalTimeout time.Duration
   382  	if d.Timeout == 0 {
   383  		globalTimeout = time.Duration(math.MaxInt64)
   384  	} else {
   385  		globalTimeout = d.Timeout
   386  	}
   387  	for _, pod := range pods {
   388  		err := d.DeletePod(pod)
   389  		if err != nil && !apierrors.IsNotFound(err) {
   390  			return err
   391  		}
   392  		if d.OnPodDeletionOrEvictionStarted != nil {
   393  			d.OnPodDeletionOrEvictionStarted(&pod, false)
   394  		}
   395  	}
   396  	ctx := d.getContext()
   397  	params := waitForDeleteParams{
   398  		ctx:                             ctx,
   399  		pods:                            pods,
   400  		interval:                        1 * time.Second,
   401  		timeout:                         globalTimeout,
   402  		usingEviction:                   false,
   403  		getPodFn:                        getPodFn,
   404  		onDoneFn:                        d.OnPodDeletedOrEvicted,
   405  		onFinishFn:                      d.OnPodDeletionOrEvictionFinished,
   406  		globalTimeout:                   globalTimeout,
   407  		skipWaitForDeleteTimeoutSeconds: d.SkipWaitForDeleteTimeoutSeconds,
   408  		out:                             d.Out,
   409  	}
   410  	_, err := waitForDelete(params)
   411  	return err
   412  }
   413  
   414  func waitForDelete(params waitForDeleteParams) ([]corev1.Pod, error) {
   415  	pods := params.pods
   416  	err := wait.PollImmediate(params.interval, params.timeout, func() (bool, error) {
   417  		pendingPods := []corev1.Pod{}
   418  		for i, pod := range pods {
   419  			p, err := params.getPodFn(pod.Namespace, pod.Name)
   420  			// The implementation of getPodFn that uses client-go returns an empty Pod struct when there is an error,
   421  			// so we need to check that err == nil and p != nil to know that a pod was found successfully.
   422  			if apierrors.IsNotFound(err) || (err == nil && p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
   423  				if params.onFinishFn != nil {
   424  					params.onFinishFn(&pod, params.usingEviction, nil)
   425  				} else if params.onDoneFn != nil {
   426  					params.onDoneFn(&pod, params.usingEviction)
   427  				}
   428  				continue
   429  			} else if err != nil {
   430  				if params.onFinishFn != nil {
   431  					params.onFinishFn(&pod, params.usingEviction, err)
   432  				}
   433  				return false, err
   434  			} else {
   435  				if shouldSkipPod(*p, params.skipWaitForDeleteTimeoutSeconds) {
   436  					fmt.Fprintf(params.out, podSkipMsgTemplate, pod.Name, params.skipWaitForDeleteTimeoutSeconds)
   437  					continue
   438  				}
   439  				pendingPods = append(pendingPods, pods[i])
   440  			}
   441  		}
   442  		pods = pendingPods
   443  		if len(pendingPods) > 0 {
   444  			select {
   445  			case <-params.ctx.Done():
   446  				return false, fmt.Errorf("global timeout reached: %v", params.globalTimeout)
   447  			default:
   448  				return false, nil
   449  			}
   450  		}
   451  		return true, nil
   452  	})
   453  	return pods, err
   454  }
   455  
   456  // Since Helper does not have a constructor, we can't enforce Helper.Ctx != nil
   457  // Multiple public methods prevent us from initializing the context in a single
   458  // place as well.
   459  func (d *Helper) getContext() context.Context {
   460  	if d.Ctx != nil {
   461  		return d.Ctx
   462  	}
   463  	return context.Background()
   464  }
   465  

View as plain text