...

Source file src/k8s.io/kubernetes/pkg/controller/disruption/disruption.go

Documentation: k8s.io/kubernetes/pkg/controller/disruption

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package disruption
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	apps "k8s.io/api/apps/v1beta1"
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/api/extensions/v1beta1"
    27  	policy "k8s.io/api/policy/v1"
    28  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    29  	"k8s.io/apimachinery/pkg/api/errors"
    30  	apimeta "k8s.io/apimachinery/pkg/api/meta"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/runtime/schema"
    33  	"k8s.io/apimachinery/pkg/types"
    34  	"k8s.io/apimachinery/pkg/util/intstr"
    35  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	"k8s.io/client-go/discovery"
    38  	appsv1informers "k8s.io/client-go/informers/apps/v1"
    39  	coreinformers "k8s.io/client-go/informers/core/v1"
    40  	policyinformers "k8s.io/client-go/informers/policy/v1"
    41  	clientset "k8s.io/client-go/kubernetes"
    42  	"k8s.io/client-go/kubernetes/scheme"
    43  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    44  	appsv1listers "k8s.io/client-go/listers/apps/v1"
    45  	corelisters "k8s.io/client-go/listers/core/v1"
    46  	policylisters "k8s.io/client-go/listers/policy/v1"
    47  	scaleclient "k8s.io/client-go/scale"
    48  	"k8s.io/client-go/tools/cache"
    49  	"k8s.io/client-go/tools/record"
    50  	"k8s.io/client-go/util/workqueue"
    51  	pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget"
    52  	"k8s.io/klog/v2"
    53  	apipod "k8s.io/kubernetes/pkg/api/v1/pod"
    54  	"k8s.io/kubernetes/pkg/controller"
    55  	"k8s.io/utils/clock"
    56  )
    57  
    58  const (
    59  	// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
    60  	// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
    61  	// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
    62  	// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
    63  	// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
    64  	// be more than enough.
    65  	// If the controller is running on a different node it is important that the two nodes have synced
    66  	// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
    67  	// protection against unwanted pod disruptions.
    68  	DeletionTimeout = 2 * time.Minute
    69  
    70  	// stalePodDisruptionTimeout sets the maximum time a pod can have a stale
    71  	// DisruptionTarget condition (the condition is present, but the Pod doesn't
    72  	// have a DeletionTimestamp).
    73  	// Once the timeout is reached, this controller attempts to set the status
    74  	// of the condition to False.
    75  	stalePodDisruptionTimeout = 2 * time.Minute
    76  )
    77  
    78  type updater func(context.Context, *policy.PodDisruptionBudget) error
    79  
    80  type DisruptionController struct {
    81  	kubeClient clientset.Interface
    82  	mapper     apimeta.RESTMapper
    83  
    84  	scaleNamespacer scaleclient.ScalesGetter
    85  	discoveryClient discovery.DiscoveryInterface
    86  
    87  	pdbLister       policylisters.PodDisruptionBudgetLister
    88  	pdbListerSynced cache.InformerSynced
    89  
    90  	podLister       corelisters.PodLister
    91  	podListerSynced cache.InformerSynced
    92  
    93  	rcLister       corelisters.ReplicationControllerLister
    94  	rcListerSynced cache.InformerSynced
    95  
    96  	rsLister       appsv1listers.ReplicaSetLister
    97  	rsListerSynced cache.InformerSynced
    98  
    99  	dLister       appsv1listers.DeploymentLister
   100  	dListerSynced cache.InformerSynced
   101  
   102  	ssLister       appsv1listers.StatefulSetLister
   103  	ssListerSynced cache.InformerSynced
   104  
   105  	// PodDisruptionBudget keys that need to be synced.
   106  	queue        workqueue.RateLimitingInterface
   107  	recheckQueue workqueue.DelayingInterface
   108  
   109  	// pod keys that need to be synced due to a stale DisruptionTarget condition.
   110  	stalePodDisruptionQueue   workqueue.RateLimitingInterface
   111  	stalePodDisruptionTimeout time.Duration
   112  
   113  	broadcaster record.EventBroadcaster
   114  	recorder    record.EventRecorder
   115  
   116  	getUpdater func() updater
   117  
   118  	clock clock.Clock
   119  }
   120  
   121  // controllerAndScale is used to return (controller, scale) pairs from the
   122  // controller finder functions.
   123  type controllerAndScale struct {
   124  	types.UID
   125  	scale int32
   126  }
   127  
   128  // podControllerFinder is a function type that maps a pod to a list of
   129  // controllers and their scale.
   130  type podControllerFinder func(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error)
   131  
   132  func NewDisruptionController(
   133  	ctx context.Context,
   134  	podInformer coreinformers.PodInformer,
   135  	pdbInformer policyinformers.PodDisruptionBudgetInformer,
   136  	rcInformer coreinformers.ReplicationControllerInformer,
   137  	rsInformer appsv1informers.ReplicaSetInformer,
   138  	dInformer appsv1informers.DeploymentInformer,
   139  	ssInformer appsv1informers.StatefulSetInformer,
   140  	kubeClient clientset.Interface,
   141  	restMapper apimeta.RESTMapper,
   142  	scaleNamespacer scaleclient.ScalesGetter,
   143  	discoveryClient discovery.DiscoveryInterface,
   144  ) *DisruptionController {
   145  	return NewDisruptionControllerInternal(
   146  		ctx,
   147  		podInformer,
   148  		pdbInformer,
   149  		rcInformer,
   150  		rsInformer,
   151  		dInformer,
   152  		ssInformer,
   153  		kubeClient,
   154  		restMapper,
   155  		scaleNamespacer,
   156  		discoveryClient,
   157  		clock.RealClock{},
   158  		stalePodDisruptionTimeout)
   159  }
   160  
   161  // NewDisruptionControllerInternal allows to set a clock and
   162  // stalePodDisruptionTimeout
   163  // It is only supposed to be used by tests.
   164  func NewDisruptionControllerInternal(ctx context.Context,
   165  	podInformer coreinformers.PodInformer,
   166  	pdbInformer policyinformers.PodDisruptionBudgetInformer,
   167  	rcInformer coreinformers.ReplicationControllerInformer,
   168  	rsInformer appsv1informers.ReplicaSetInformer,
   169  	dInformer appsv1informers.DeploymentInformer,
   170  	ssInformer appsv1informers.StatefulSetInformer,
   171  	kubeClient clientset.Interface,
   172  	restMapper apimeta.RESTMapper,
   173  	scaleNamespacer scaleclient.ScalesGetter,
   174  	discoveryClient discovery.DiscoveryInterface,
   175  	clock clock.WithTicker,
   176  	stalePodDisruptionTimeout time.Duration,
   177  ) *DisruptionController {
   178  	logger := klog.FromContext(ctx)
   179  	dc := &DisruptionController{
   180  		kubeClient:                kubeClient,
   181  		queue:                     workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "disruption"), workqueue.DefaultControllerRateLimiter()),
   182  		recheckQueue:              workqueue.NewDelayingQueueWithCustomClock(clock, "disruption_recheck"),
   183  		stalePodDisruptionQueue:   workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "stale_pod_disruption"), workqueue.DefaultControllerRateLimiter()),
   184  		broadcaster:               record.NewBroadcaster(record.WithContext(ctx)),
   185  		stalePodDisruptionTimeout: stalePodDisruptionTimeout,
   186  	}
   187  	dc.recorder = dc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})
   188  
   189  	dc.getUpdater = func() updater { return dc.writePdbStatus }
   190  
   191  	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   192  		AddFunc: func(obj interface{}) {
   193  			dc.addPod(logger, obj)
   194  		},
   195  		UpdateFunc: func(oldObj, newObj interface{}) {
   196  			dc.updatePod(logger, oldObj, newObj)
   197  		},
   198  		DeleteFunc: func(obj interface{}) {
   199  			dc.deletePod(logger, obj)
   200  		},
   201  	})
   202  	dc.podLister = podInformer.Lister()
   203  	dc.podListerSynced = podInformer.Informer().HasSynced
   204  
   205  	pdbInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   206  		AddFunc: func(obj interface{}) {
   207  			dc.addDB(logger, obj)
   208  		},
   209  		UpdateFunc: func(oldObj, newObj interface{}) {
   210  			dc.updateDB(logger, oldObj, newObj)
   211  		},
   212  		DeleteFunc: func(obj interface{}) {
   213  			dc.removeDB(logger, obj)
   214  		},
   215  	})
   216  	dc.pdbLister = pdbInformer.Lister()
   217  	dc.pdbListerSynced = pdbInformer.Informer().HasSynced
   218  
   219  	dc.rcLister = rcInformer.Lister()
   220  	dc.rcListerSynced = rcInformer.Informer().HasSynced
   221  
   222  	dc.rsLister = rsInformer.Lister()
   223  	dc.rsListerSynced = rsInformer.Informer().HasSynced
   224  
   225  	dc.dLister = dInformer.Lister()
   226  	dc.dListerSynced = dInformer.Informer().HasSynced
   227  
   228  	dc.ssLister = ssInformer.Lister()
   229  	dc.ssListerSynced = ssInformer.Informer().HasSynced
   230  
   231  	dc.mapper = restMapper
   232  	dc.scaleNamespacer = scaleNamespacer
   233  	dc.discoveryClient = discoveryClient
   234  
   235  	dc.clock = clock
   236  
   237  	return dc
   238  }
   239  
   240  // The workload resources do implement the scale subresource, so it would
   241  // be possible to only check the scale subresource here. But since there is no
   242  // way to take advantage of listers with scale subresources, we use the workload
   243  // resources directly and only fall back to the scale subresource when needed.
   244  func (dc *DisruptionController) finders() []podControllerFinder {
   245  	return []podControllerFinder{dc.getPodReplicationController, dc.getPodDeployment, dc.getPodReplicaSet,
   246  		dc.getPodStatefulSet, dc.getScaleController}
   247  }
   248  
   249  var (
   250  	controllerKindRS  = v1beta1.SchemeGroupVersion.WithKind("ReplicaSet")
   251  	controllerKindSS  = apps.SchemeGroupVersion.WithKind("StatefulSet")
   252  	controllerKindRC  = v1.SchemeGroupVersion.WithKind("ReplicationController")
   253  	controllerKindDep = v1beta1.SchemeGroupVersion.WithKind("Deployment")
   254  )
   255  
   256  // getPodReplicaSet finds a replicaset which has no matching deployments.
   257  func (dc *DisruptionController) getPodReplicaSet(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
   258  	ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
   259  	if !ok || err != nil {
   260  		return nil, err
   261  	}
   262  	rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
   263  	if err != nil {
   264  		// The only possible error is NotFound, which is ok here.
   265  		return nil, nil
   266  	}
   267  	if rs.UID != controllerRef.UID {
   268  		return nil, nil
   269  	}
   270  	controllerRef = metav1.GetControllerOf(rs)
   271  	if controllerRef != nil && controllerRef.Kind == controllerKindDep.Kind {
   272  		// Skip RS if it's controlled by a Deployment.
   273  		return nil, nil
   274  	}
   275  	return &controllerAndScale{rs.UID, *(rs.Spec.Replicas)}, nil
   276  }
   277  
   278  // getPodStatefulSet returns the statefulset referenced by the provided controllerRef.
   279  func (dc *DisruptionController) getPodStatefulSet(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
   280  	ok, err := verifyGroupKind(controllerRef, controllerKindSS.Kind, []string{"apps"})
   281  	if !ok || err != nil {
   282  		return nil, err
   283  	}
   284  	ss, err := dc.ssLister.StatefulSets(namespace).Get(controllerRef.Name)
   285  	if err != nil {
   286  		// The only possible error is NotFound, which is ok here.
   287  		return nil, nil
   288  	}
   289  	if ss.UID != controllerRef.UID {
   290  		return nil, nil
   291  	}
   292  
   293  	return &controllerAndScale{ss.UID, *(ss.Spec.Replicas)}, nil
   294  }
   295  
   296  // getPodDeployments finds deployments for any replicasets which are being managed by deployments.
   297  func (dc *DisruptionController) getPodDeployment(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
   298  	ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
   299  	if !ok || err != nil {
   300  		return nil, err
   301  	}
   302  	rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
   303  	if err != nil {
   304  		// The only possible error is NotFound, which is ok here.
   305  		return nil, nil
   306  	}
   307  	if rs.UID != controllerRef.UID {
   308  		return nil, nil
   309  	}
   310  	controllerRef = metav1.GetControllerOf(rs)
   311  	if controllerRef == nil {
   312  		return nil, nil
   313  	}
   314  
   315  	ok, err = verifyGroupKind(controllerRef, controllerKindDep.Kind, []string{"apps", "extensions"})
   316  	if !ok || err != nil {
   317  		return nil, err
   318  	}
   319  	deployment, err := dc.dLister.Deployments(rs.Namespace).Get(controllerRef.Name)
   320  	if err != nil {
   321  		// The only possible error is NotFound, which is ok here.
   322  		return nil, nil
   323  	}
   324  	if deployment.UID != controllerRef.UID {
   325  		return nil, nil
   326  	}
   327  	return &controllerAndScale{deployment.UID, *(deployment.Spec.Replicas)}, nil
   328  }
   329  
   330  func (dc *DisruptionController) getPodReplicationController(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
   331  	ok, err := verifyGroupKind(controllerRef, controllerKindRC.Kind, []string{""})
   332  	if !ok || err != nil {
   333  		return nil, err
   334  	}
   335  	rc, err := dc.rcLister.ReplicationControllers(namespace).Get(controllerRef.Name)
   336  	if err != nil {
   337  		// The only possible error is NotFound, which is ok here.
   338  		return nil, nil
   339  	}
   340  	if rc.UID != controllerRef.UID {
   341  		return nil, nil
   342  	}
   343  	return &controllerAndScale{rc.UID, *(rc.Spec.Replicas)}, nil
   344  }
   345  
   346  func (dc *DisruptionController) getScaleController(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
   347  	gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
   348  	if err != nil {
   349  		return nil, err
   350  	}
   351  
   352  	gk := schema.GroupKind{
   353  		Group: gv.Group,
   354  		Kind:  controllerRef.Kind,
   355  	}
   356  
   357  	mapping, err := dc.mapper.RESTMapping(gk, gv.Version)
   358  	if err != nil {
   359  		return nil, err
   360  	}
   361  	gr := mapping.Resource.GroupResource()
   362  	scale, err := dc.scaleNamespacer.Scales(namespace).Get(ctx, gr, controllerRef.Name, metav1.GetOptions{})
   363  	if err != nil {
   364  		if errors.IsNotFound(err) {
   365  			// The IsNotFound error can mean either that the resource does not exist,
   366  			// or it exist but doesn't implement the scale subresource. We check which
   367  			// situation we are facing so we can give an appropriate error message.
   368  			isScale, err := dc.implementsScale(mapping.Resource)
   369  			if err != nil {
   370  				return nil, err
   371  			}
   372  			if !isScale {
   373  				return nil, fmt.Errorf("%s does not implement the scale subresource", gr.String())
   374  			}
   375  			return nil, nil
   376  		}
   377  		return nil, err
   378  	}
   379  	if scale.UID != controllerRef.UID {
   380  		return nil, nil
   381  	}
   382  	return &controllerAndScale{scale.UID, scale.Spec.Replicas}, nil
   383  }
   384  
   385  func (dc *DisruptionController) implementsScale(gvr schema.GroupVersionResource) (bool, error) {
   386  	resourceList, err := dc.discoveryClient.ServerResourcesForGroupVersion(gvr.GroupVersion().String())
   387  	if err != nil {
   388  		return false, err
   389  	}
   390  
   391  	scaleSubresourceName := fmt.Sprintf("%s/scale", gvr.Resource)
   392  	for _, resource := range resourceList.APIResources {
   393  		if resource.Name != scaleSubresourceName {
   394  			continue
   395  		}
   396  
   397  		for _, scaleGv := range scaleclient.NewScaleConverter().ScaleVersions() {
   398  			if resource.Group == scaleGv.Group &&
   399  				resource.Version == scaleGv.Version &&
   400  				resource.Kind == "Scale" {
   401  				return true, nil
   402  			}
   403  		}
   404  	}
   405  	return false, nil
   406  }
   407  
   408  func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, expectedGroups []string) (bool, error) {
   409  	gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
   410  	if err != nil {
   411  		return false, err
   412  	}
   413  
   414  	if controllerRef.Kind != expectedKind {
   415  		return false, nil
   416  	}
   417  
   418  	for _, group := range expectedGroups {
   419  		if group == gv.Group {
   420  			return true, nil
   421  		}
   422  	}
   423  
   424  	return false, nil
   425  }
   426  
   427  func (dc *DisruptionController) Run(ctx context.Context) {
   428  	defer utilruntime.HandleCrash()
   429  
   430  	logger := klog.FromContext(ctx)
   431  	// Start events processing pipeline.
   432  	if dc.kubeClient != nil {
   433  		logger.Info("Sending events to api server.")
   434  		dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")})
   435  	} else {
   436  		logger.Info("No api server defined - no events will be sent to API server.")
   437  	}
   438  	defer dc.broadcaster.Shutdown()
   439  
   440  	defer dc.queue.ShutDown()
   441  	defer dc.recheckQueue.ShutDown()
   442  	defer dc.stalePodDisruptionQueue.ShutDown()
   443  
   444  	logger.Info("Starting disruption controller")
   445  	defer logger.Info("Shutting down disruption controller")
   446  
   447  	if !cache.WaitForNamedCacheSync("disruption", ctx.Done(), dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
   448  		return
   449  	}
   450  
   451  	go wait.UntilWithContext(ctx, dc.worker, time.Second)
   452  	go wait.Until(dc.recheckWorker, time.Second, ctx.Done())
   453  	go wait.UntilWithContext(ctx, dc.stalePodDisruptionWorker, time.Second)
   454  
   455  	<-ctx.Done()
   456  }
   457  
   458  func (dc *DisruptionController) addDB(logger klog.Logger, obj interface{}) {
   459  	pdb := obj.(*policy.PodDisruptionBudget)
   460  	logger.V(4).Info("Add DB", "podDisruptionBudget", klog.KObj(pdb))
   461  	dc.enqueuePdb(logger, pdb)
   462  }
   463  
   464  func (dc *DisruptionController) updateDB(logger klog.Logger, old, cur interface{}) {
   465  	// TODO(mml) ignore updates where 'old' is equivalent to 'cur'.
   466  	pdb := cur.(*policy.PodDisruptionBudget)
   467  	logger.V(4).Info("Update DB", "podDisruptionBudget", klog.KObj(pdb))
   468  	dc.enqueuePdb(logger, pdb)
   469  }
   470  
   471  func (dc *DisruptionController) removeDB(logger klog.Logger, obj interface{}) {
   472  	pdb, ok := obj.(*policy.PodDisruptionBudget)
   473  	if !ok {
   474  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   475  		if !ok {
   476  			logger.Error(nil, "Couldn't get object from tombstone", "obj", obj)
   477  			return
   478  		}
   479  		pdb, ok = tombstone.Obj.(*policy.PodDisruptionBudget)
   480  		if !ok {
   481  			logger.Error(nil, "Tombstone contained object that is not a PDB", "obj", obj)
   482  			return
   483  		}
   484  	}
   485  	logger.V(4).Info("Remove DB", "podDisruptionBudget", klog.KObj(pdb))
   486  	dc.enqueuePdb(logger, pdb)
   487  }
   488  
   489  func (dc *DisruptionController) addPod(logger klog.Logger, obj interface{}) {
   490  	pod := obj.(*v1.Pod)
   491  	logger.V(4).Info("AddPod called on pod", "pod", klog.KObj(pod))
   492  	pdb := dc.getPdbForPod(logger, pod)
   493  	if pdb == nil {
   494  		logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod))
   495  	} else {
   496  		logger.V(4).Info("addPod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb))
   497  		dc.enqueuePdb(logger, pdb)
   498  	}
   499  	if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
   500  		dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter)
   501  	}
   502  }
   503  
   504  func (dc *DisruptionController) updatePod(logger klog.Logger, _, cur interface{}) {
   505  	pod := cur.(*v1.Pod)
   506  	logger.V(4).Info("UpdatePod called on pod", "pod", klog.KObj(pod))
   507  	pdb := dc.getPdbForPod(logger, pod)
   508  	if pdb == nil {
   509  		logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod))
   510  	} else {
   511  		logger.V(4).Info("updatePod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb))
   512  		dc.enqueuePdb(logger, pdb)
   513  	}
   514  	if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
   515  		dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter)
   516  	}
   517  }
   518  
   519  func (dc *DisruptionController) deletePod(logger klog.Logger, obj interface{}) {
   520  	pod, ok := obj.(*v1.Pod)
   521  	// When a delete is dropped, the relist will notice a pod in the store not
   522  	// in the list, leading to the insertion of a tombstone object which contains
   523  	// the deleted key/value. Note that this value might be stale. If the pod
   524  	// changed labels the new ReplicaSet will not be woken up till the periodic
   525  	// resync.
   526  	if !ok {
   527  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   528  		if !ok {
   529  			logger.Error(nil, "Couldn't get object from tombstone", "obj", obj)
   530  			return
   531  		}
   532  		pod, ok = tombstone.Obj.(*v1.Pod)
   533  		if !ok {
   534  			logger.Error(nil, "Tombstone contained object that is not a pod", "obj", obj)
   535  			return
   536  		}
   537  	}
   538  	logger.V(4).Info("DeletePod called on pod", "pod", klog.KObj(pod))
   539  	pdb := dc.getPdbForPod(logger, pod)
   540  	if pdb == nil {
   541  		logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod))
   542  		return
   543  	}
   544  	logger.V(4).Info("DeletePod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb))
   545  	dc.enqueuePdb(logger, pdb)
   546  }
   547  
   548  func (dc *DisruptionController) enqueuePdb(logger klog.Logger, pdb *policy.PodDisruptionBudget) {
   549  	key, err := controller.KeyFunc(pdb)
   550  	if err != nil {
   551  		logger.Error(err, "Couldn't get key for PodDisruptionBudget", "podDisruptionBudget", klog.KObj(pdb))
   552  		return
   553  	}
   554  	dc.queue.Add(key)
   555  }
   556  
   557  func (dc *DisruptionController) enqueuePdbForRecheck(logger klog.Logger, pdb *policy.PodDisruptionBudget, delay time.Duration) {
   558  	key, err := controller.KeyFunc(pdb)
   559  	if err != nil {
   560  		logger.Error(err, "Couldn't get key for PodDisruptionBudget", "podDisruptionBudget", klog.KObj(pdb))
   561  		return
   562  	}
   563  	dc.recheckQueue.AddAfter(key, delay)
   564  }
   565  
   566  func (dc *DisruptionController) enqueueStalePodDisruptionCleanup(logger klog.Logger, pod *v1.Pod, d time.Duration) {
   567  	key, err := controller.KeyFunc(pod)
   568  	if err != nil {
   569  		logger.Error(err, "Couldn't get key for Pod object", "pod", klog.KObj(pod))
   570  		return
   571  	}
   572  	dc.stalePodDisruptionQueue.AddAfter(key, d)
   573  	logger.V(4).Info("Enqueued pod to cleanup stale DisruptionTarget condition", "pod", klog.KObj(pod))
   574  }
   575  
   576  func (dc *DisruptionController) getPdbForPod(logger klog.Logger, pod *v1.Pod) *policy.PodDisruptionBudget {
   577  	// GetPodPodDisruptionBudgets returns an error only if no
   578  	// PodDisruptionBudgets are found.  We don't return that as an error to the
   579  	// caller.
   580  	pdbs, err := dc.pdbLister.GetPodPodDisruptionBudgets(pod)
   581  	if err != nil {
   582  		logger.V(4).Info("No PodDisruptionBudgets found for pod, PodDisruptionBudget controller will avoid syncing.", "pod", klog.KObj(pod))
   583  		return nil
   584  	}
   585  
   586  	if len(pdbs) > 1 {
   587  		msg := fmt.Sprintf("Pod %q/%q matches multiple PodDisruptionBudgets.  Chose %q arbitrarily.", pod.Namespace, pod.Name, pdbs[0].Name)
   588  		logger.Info(msg)
   589  		dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg)
   590  	}
   591  	return pdbs[0]
   592  }
   593  
   594  // This function returns pods using the PodDisruptionBudget object.
   595  // IMPORTANT NOTE : the returned pods should NOT be modified.
   596  func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*v1.Pod, error) {
   597  	sel, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
   598  	if err != nil {
   599  		return []*v1.Pod{}, err
   600  	}
   601  	pods, err := dc.podLister.Pods(pdb.Namespace).List(sel)
   602  	if err != nil {
   603  		return []*v1.Pod{}, err
   604  	}
   605  	return pods, nil
   606  }
   607  
   608  func (dc *DisruptionController) worker(ctx context.Context) {
   609  	for dc.processNextWorkItem(ctx) {
   610  	}
   611  }
   612  
   613  func (dc *DisruptionController) processNextWorkItem(ctx context.Context) bool {
   614  	dKey, quit := dc.queue.Get()
   615  	if quit {
   616  		return false
   617  	}
   618  	defer dc.queue.Done(dKey)
   619  
   620  	err := dc.sync(ctx, dKey.(string))
   621  	if err == nil {
   622  		dc.queue.Forget(dKey)
   623  		return true
   624  	}
   625  
   626  	utilruntime.HandleError(fmt.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", dKey.(string), err))
   627  	dc.queue.AddRateLimited(dKey)
   628  
   629  	return true
   630  }
   631  
   632  func (dc *DisruptionController) recheckWorker() {
   633  	for dc.processNextRecheckWorkItem() {
   634  	}
   635  }
   636  
   637  func (dc *DisruptionController) processNextRecheckWorkItem() bool {
   638  	dKey, quit := dc.recheckQueue.Get()
   639  	if quit {
   640  		return false
   641  	}
   642  	defer dc.recheckQueue.Done(dKey)
   643  	dc.queue.AddRateLimited(dKey)
   644  	return true
   645  }
   646  
   647  func (dc *DisruptionController) stalePodDisruptionWorker(ctx context.Context) {
   648  	for dc.processNextStalePodDisruptionWorkItem(ctx) {
   649  	}
   650  }
   651  
   652  func (dc *DisruptionController) processNextStalePodDisruptionWorkItem(ctx context.Context) bool {
   653  	key, quit := dc.stalePodDisruptionQueue.Get()
   654  	if quit {
   655  		return false
   656  	}
   657  	defer dc.stalePodDisruptionQueue.Done(key)
   658  	err := dc.syncStalePodDisruption(ctx, key.(string))
   659  	if err == nil {
   660  		dc.stalePodDisruptionQueue.Forget(key)
   661  		return true
   662  	}
   663  	utilruntime.HandleError(fmt.Errorf("error syncing Pod %v to clear DisruptionTarget condition, requeueing: %v", key.(string), err))
   664  	dc.stalePodDisruptionQueue.AddRateLimited(key)
   665  	return true
   666  }
   667  
   668  func (dc *DisruptionController) sync(ctx context.Context, key string) error {
   669  	logger := klog.FromContext(ctx)
   670  	startTime := dc.clock.Now()
   671  	defer func() {
   672  		logger.V(4).Info("Finished syncing PodDisruptionBudget", "key", key, "duration", dc.clock.Since(startTime))
   673  	}()
   674  
   675  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   676  	if err != nil {
   677  		return err
   678  	}
   679  	pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name)
   680  	if errors.IsNotFound(err) {
   681  		logger.V(4).Info("podDisruptionBudget has been deleted", "key", key)
   682  		return nil
   683  	}
   684  	if err != nil {
   685  		return err
   686  	}
   687  
   688  	err = dc.trySync(ctx, pdb)
   689  	// If the reason for failure was a conflict, then allow this PDB update to be
   690  	// requeued without triggering the failSafe logic.
   691  	if errors.IsConflict(err) {
   692  		return err
   693  	}
   694  	if err != nil {
   695  		logger.Error(err, "Failed to sync PDB", "podDisruptionBudget", klog.KRef(namespace, name))
   696  		return dc.failSafe(ctx, pdb, err)
   697  	}
   698  
   699  	return nil
   700  }
   701  
   702  func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisruptionBudget) error {
   703  	logger := klog.FromContext(ctx)
   704  	pods, err := dc.getPodsForPdb(pdb)
   705  	if err != nil {
   706  		dc.recorder.Eventf(pdb, v1.EventTypeWarning, "NoPods", "Failed to get pods: %v", err)
   707  		return err
   708  	}
   709  	if len(pods) == 0 {
   710  		dc.recorder.Eventf(pdb, v1.EventTypeNormal, "NoPods", "No matching pods found")
   711  	}
   712  
   713  	expectedCount, desiredHealthy, unmanagedPods, err := dc.getExpectedPodCount(ctx, pdb, pods)
   714  	if err != nil {
   715  		dc.recorder.Eventf(pdb, v1.EventTypeWarning, "CalculateExpectedPodCountFailed", "Failed to calculate the number of expected pods: %v", err)
   716  		return err
   717  	}
   718  	// We have unmamanged pods, instead of erroring and hotlooping in disruption controller, log and continue.
   719  	if len(unmanagedPods) > 0 {
   720  		logger.V(4).Info("Found unmanaged pods associated with this PDB", "pods", unmanagedPods)
   721  		dc.recorder.Eventf(pdb, v1.EventTypeWarning, "UnmanagedPods", "Pods selected by this PodDisruptionBudget (selector: %v) were found "+
   722  			"to be unmanaged. As a result, the status of the PDB cannot be calculated correctly, which may result in undefined behavior. "+
   723  			"To account for these pods please set \".spec.minAvailable\" "+
   724  			"field of the PDB to an integer value.", pdb.Spec.Selector)
   725  	}
   726  
   727  	currentTime := dc.clock.Now()
   728  	disruptedPods, recheckTime := dc.buildDisruptedPodMap(logger, pods, pdb, currentTime)
   729  	currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
   730  	err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)
   731  
   732  	if err == nil && recheckTime != nil {
   733  		// There is always at most one PDB waiting with a particular name in the queue,
   734  		// and each PDB in the queue is associated with the lowest timestamp
   735  		// that was supplied when a PDB with that name was added.
   736  		dc.enqueuePdbForRecheck(logger, pdb, recheckTime.Sub(currentTime))
   737  	}
   738  	return err
   739  }
   740  
   741  func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key string) error {
   742  	logger := klog.FromContext(ctx)
   743  	startTime := dc.clock.Now()
   744  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   745  	if err != nil {
   746  		return err
   747  	}
   748  	defer func() {
   749  		logger.V(4).Info("Finished syncing Pod to clear DisruptionTarget condition", "pod", klog.KRef(namespace, name), "duration", dc.clock.Since(startTime))
   750  	}()
   751  	pod, err := dc.podLister.Pods(namespace).Get(name)
   752  	if errors.IsNotFound(err) {
   753  		logger.V(4).Info("Skipping clearing DisruptionTarget condition because pod was deleted", "pod", klog.KObj(pod))
   754  		return nil
   755  	}
   756  	if err != nil {
   757  		return err
   758  	}
   759  
   760  	hasCond, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod)
   761  	if !hasCond {
   762  		return nil
   763  	}
   764  	if cleanAfter > 0 {
   765  		dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter)
   766  		return nil
   767  	}
   768  
   769  	newPod := pod.DeepCopy()
   770  	updated := apipod.UpdatePodCondition(&newPod.Status, &v1.PodCondition{
   771  		Type:   v1.DisruptionTarget,
   772  		Status: v1.ConditionFalse,
   773  	})
   774  	if !updated {
   775  		return nil
   776  	}
   777  	if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, newPod, metav1.UpdateOptions{}); err != nil {
   778  		return err
   779  	}
   780  	logger.V(2).Info("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod))
   781  	return nil
   782  }
   783  
   784  func (dc *DisruptionController) getExpectedPodCount(ctx context.Context, pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, unmanagedPods []string, err error) {
   785  	err = nil
   786  	// TODO(davidopp): consider making the way expectedCount and rules about
   787  	// permitted controller configurations (specifically, considering it an error
   788  	// if a pod covered by a PDB has 0 controllers or > 1 controller) should be
   789  	// handled the same way for integer and percentage minAvailable
   790  
   791  	if pdb.Spec.MaxUnavailable != nil {
   792  		expectedCount, unmanagedPods, err = dc.getExpectedScale(ctx, pdb, pods)
   793  		if err != nil {
   794  			return
   795  		}
   796  		var maxUnavailable int
   797  		maxUnavailable, err = intstr.GetScaledValueFromIntOrPercent(pdb.Spec.MaxUnavailable, int(expectedCount), true)
   798  		if err != nil {
   799  			return
   800  		}
   801  		desiredHealthy = expectedCount - int32(maxUnavailable)
   802  		if desiredHealthy < 0 {
   803  			desiredHealthy = 0
   804  		}
   805  	} else if pdb.Spec.MinAvailable != nil {
   806  		if pdb.Spec.MinAvailable.Type == intstr.Int {
   807  			desiredHealthy = pdb.Spec.MinAvailable.IntVal
   808  			expectedCount = int32(len(pods))
   809  		} else if pdb.Spec.MinAvailable.Type == intstr.String {
   810  			expectedCount, unmanagedPods, err = dc.getExpectedScale(ctx, pdb, pods)
   811  			if err != nil {
   812  				return
   813  			}
   814  
   815  			var minAvailable int
   816  			minAvailable, err = intstr.GetScaledValueFromIntOrPercent(pdb.Spec.MinAvailable, int(expectedCount), true)
   817  			if err != nil {
   818  				return
   819  			}
   820  			desiredHealthy = int32(minAvailable)
   821  		}
   822  	}
   823  	return
   824  }
   825  
   826  func (dc *DisruptionController) getExpectedScale(ctx context.Context, pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount int32, unmanagedPods []string, err error) {
   827  	// When the user specifies a fraction of pods that must be available, we
   828  	// use as the fraction's denominator
   829  	// SUM_{all c in C} scale(c)
   830  	// where C is the union of C_p1, C_p2, ..., C_pN
   831  	// and each C_pi is the set of controllers controlling the pod pi
   832  
   833  	// k8s only defines what will happens when 0 or 1 controllers control a
   834  	// given pod.  We explicitly exclude the 0 controllers case here, and we
   835  	// report an error if we find a pod with more than 1 controller.  Thus in
   836  	// practice each C_pi is a set of exactly 1 controller.
   837  
   838  	// A mapping from controllers to their scale.
   839  	controllerScale := map[types.UID]int32{}
   840  
   841  	// 1. Find the controller for each pod.
   842  
   843  	// As of now, we allow PDBs to be applied to pods via selectors, so there
   844  	// can be unmanaged pods(pods that don't have backing controllers) but still have PDBs associated.
   845  	// Such pods are to be collected and PDB backing them should be enqueued instead of immediately throwing
   846  	// a sync error. This ensures disruption controller is not frequently updating the status subresource and thus
   847  	// preventing excessive and expensive writes to etcd.
   848  	// With ControllerRef, a pod can only have 1 controller.
   849  	for _, pod := range pods {
   850  		controllerRef := metav1.GetControllerOf(pod)
   851  		if controllerRef == nil {
   852  			unmanagedPods = append(unmanagedPods, pod.Name)
   853  			continue
   854  		}
   855  
   856  		// If we already know the scale of the controller there is no need to do anything.
   857  		if _, found := controllerScale[controllerRef.UID]; found {
   858  			continue
   859  		}
   860  
   861  		// Check all the supported controllers to find the desired scale.
   862  		foundController := false
   863  		for _, finder := range dc.finders() {
   864  			var controllerNScale *controllerAndScale
   865  			controllerNScale, err = finder(ctx, controllerRef, pod.Namespace)
   866  			if err != nil {
   867  				return
   868  			}
   869  			if controllerNScale != nil {
   870  				controllerScale[controllerNScale.UID] = controllerNScale.scale
   871  				foundController = true
   872  				break
   873  			}
   874  		}
   875  		if !foundController {
   876  			err = fmt.Errorf("found no controllers for pod %q", pod.Name)
   877  			return
   878  		}
   879  	}
   880  
   881  	// 2. Add up all the controllers.
   882  	expectedCount = 0
   883  	for _, count := range controllerScale {
   884  		expectedCount += count
   885  	}
   886  
   887  	return
   888  }
   889  
   890  func countHealthyPods(pods []*v1.Pod, disruptedPods map[string]metav1.Time, currentTime time.Time) (currentHealthy int32) {
   891  	for _, pod := range pods {
   892  		// Pod is being deleted.
   893  		if pod.DeletionTimestamp != nil {
   894  			continue
   895  		}
   896  		// Pod is expected to be deleted soon.
   897  		if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) {
   898  			continue
   899  		}
   900  		if apipod.IsPodReady(pod) {
   901  			currentHealthy++
   902  		}
   903  	}
   904  
   905  	return
   906  }
   907  
   908  // Builds new PodDisruption map, possibly removing items that refer to non-existing, already deleted
   909  // or not-deleted at all items. Also returns an information when this check should be repeated.
   910  func (dc *DisruptionController) buildDisruptedPodMap(logger klog.Logger, pods []*v1.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]metav1.Time, *time.Time) {
   911  	disruptedPods := pdb.Status.DisruptedPods
   912  	result := make(map[string]metav1.Time)
   913  	var recheckTime *time.Time
   914  
   915  	if disruptedPods == nil {
   916  		return result, recheckTime
   917  	}
   918  	for _, pod := range pods {
   919  		if pod.DeletionTimestamp != nil {
   920  			// Already being deleted.
   921  			continue
   922  		}
   923  		disruptionTime, found := disruptedPods[pod.Name]
   924  		if !found {
   925  			// Pod not on the list.
   926  			continue
   927  		}
   928  		expectedDeletion := disruptionTime.Time.Add(DeletionTimeout)
   929  		if expectedDeletion.Before(currentTime) {
   930  			logger.V(1).Info("pod was expected to be deleted but it wasn't, updating PDB",
   931  				"pod", klog.KObj(pod), "deletionTime", disruptionTime, "podDisruptionBudget", klog.KObj(pdb))
   932  			dc.recorder.Eventf(pod, v1.EventTypeWarning, "NotDeleted", "Pod was expected by PDB %s/%s to be deleted but it wasn't",
   933  				pdb.Namespace, pdb.Namespace)
   934  		} else {
   935  			if recheckTime == nil || expectedDeletion.Before(*recheckTime) {
   936  				recheckTime = &expectedDeletion
   937  			}
   938  			result[pod.Name] = disruptionTime
   939  		}
   940  	}
   941  	return result, recheckTime
   942  }
   943  
   944  // failSafe is an attempt to at least update the DisruptionsAllowed field to
   945  // 0 if everything else has failed.  This is one place we
   946  // implement the  "fail open" part of the design since if we manage to update
   947  // this field correctly, we will prevent the /evict handler from approving an
   948  // eviction when it may be unsafe to do so.
   949  func (dc *DisruptionController) failSafe(ctx context.Context, pdb *policy.PodDisruptionBudget, err error) error {
   950  	newPdb := pdb.DeepCopy()
   951  	newPdb.Status.DisruptionsAllowed = 0
   952  
   953  	if newPdb.Status.Conditions == nil {
   954  		newPdb.Status.Conditions = make([]metav1.Condition, 0)
   955  	}
   956  	apimeta.SetStatusCondition(&newPdb.Status.Conditions, metav1.Condition{
   957  		Type:               policy.DisruptionAllowedCondition,
   958  		Status:             metav1.ConditionFalse,
   959  		Reason:             policy.SyncFailedReason,
   960  		Message:            err.Error(),
   961  		ObservedGeneration: newPdb.Status.ObservedGeneration,
   962  	})
   963  
   964  	return dc.getUpdater()(ctx, newPdb)
   965  }
   966  
   967  func (dc *DisruptionController) updatePdbStatus(ctx context.Context, pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32,
   968  	disruptedPods map[string]metav1.Time) error {
   969  
   970  	// We require expectedCount to be > 0 so that PDBs which currently match no
   971  	// pods are in a safe state when their first pods appear but this controller
   972  	// has not updated their status yet.  This isn't the only race, but it's a
   973  	// common one that's easy to detect.
   974  	disruptionsAllowed := currentHealthy - desiredHealthy
   975  	if expectedCount <= 0 || disruptionsAllowed <= 0 {
   976  		disruptionsAllowed = 0
   977  	}
   978  
   979  	if pdb.Status.CurrentHealthy == currentHealthy &&
   980  		pdb.Status.DesiredHealthy == desiredHealthy &&
   981  		pdb.Status.ExpectedPods == expectedCount &&
   982  		pdb.Status.DisruptionsAllowed == disruptionsAllowed &&
   983  		apiequality.Semantic.DeepEqual(pdb.Status.DisruptedPods, disruptedPods) &&
   984  		pdb.Status.ObservedGeneration == pdb.Generation &&
   985  		pdbhelper.ConditionsAreUpToDate(pdb) {
   986  		return nil
   987  	}
   988  
   989  	newPdb := pdb.DeepCopy()
   990  	newPdb.Status = policy.PodDisruptionBudgetStatus{
   991  		CurrentHealthy:     currentHealthy,
   992  		DesiredHealthy:     desiredHealthy,
   993  		ExpectedPods:       expectedCount,
   994  		DisruptionsAllowed: disruptionsAllowed,
   995  		DisruptedPods:      disruptedPods,
   996  		ObservedGeneration: pdb.Generation,
   997  		Conditions:         newPdb.Status.Conditions,
   998  	}
   999  
  1000  	pdbhelper.UpdateDisruptionAllowedCondition(newPdb)
  1001  
  1002  	return dc.getUpdater()(ctx, newPdb)
  1003  }
  1004  
  1005  func (dc *DisruptionController) writePdbStatus(ctx context.Context, pdb *policy.PodDisruptionBudget) error {
  1006  	// If this update fails, don't retry it. Allow the failure to get handled &
  1007  	// retried in `processNextWorkItem()`.
  1008  	_, err := dc.kubeClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(ctx, pdb, metav1.UpdateOptions{})
  1009  	return err
  1010  }
  1011  
  1012  func (dc *DisruptionController) nonTerminatingPodHasStaleDisruptionCondition(pod *v1.Pod) (bool, time.Duration) {
  1013  	if pod.DeletionTimestamp != nil {
  1014  		return false, 0
  1015  	}
  1016  	_, cond := apipod.GetPodCondition(&pod.Status, v1.DisruptionTarget)
  1017  	// Pod disruption conditions added by kubelet are never considered stale because the condition might take
  1018  	// arbitrarily long before the pod is terminating (has deletion timestamp). Also, pod conditions present
  1019  	// on pods in terminal phase are not stale to avoid unnecessary status updates.
  1020  	if cond == nil || cond.Status != v1.ConditionTrue || cond.Reason == v1.PodReasonTerminationByKubelet || apipod.IsPodPhaseTerminal(pod.Status.Phase) {
  1021  		return false, 0
  1022  	}
  1023  	waitFor := dc.stalePodDisruptionTimeout - dc.clock.Since(cond.LastTransitionTime.Time)
  1024  	if waitFor < 0 {
  1025  		waitFor = 0
  1026  	}
  1027  	return true, waitFor
  1028  }
  1029  

View as plain text