...

Source file src/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set.go

Documentation: k8s.io/kubernetes/pkg/controller/statefulset

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package statefulset
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"time"
    24  
    25  	apps "k8s.io/api/apps/v1"
    26  	v1 "k8s.io/api/core/v1"
    27  	"k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/labels"
    30  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	appsinformers "k8s.io/client-go/informers/apps/v1"
    33  	coreinformers "k8s.io/client-go/informers/core/v1"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	"k8s.io/client-go/kubernetes/scheme"
    36  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    37  	appslisters "k8s.io/client-go/listers/apps/v1"
    38  	corelisters "k8s.io/client-go/listers/core/v1"
    39  	"k8s.io/client-go/tools/cache"
    40  	"k8s.io/client-go/tools/record"
    41  	"k8s.io/client-go/util/workqueue"
    42  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    43  	"k8s.io/kubernetes/pkg/controller"
    44  	"k8s.io/kubernetes/pkg/controller/history"
    45  
    46  	"k8s.io/klog/v2"
    47  )
    48  
    49  // controllerKind contains the schema.GroupVersionKind for this controller type.
    50  var controllerKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
    51  
    52  // StatefulSetController controls statefulsets.
    53  type StatefulSetController struct {
    54  	// client interface
    55  	kubeClient clientset.Interface
    56  	// control returns an interface capable of syncing a stateful set.
    57  	// Abstracted out for testing.
    58  	control StatefulSetControlInterface
    59  	// podControl is used for patching pods.
    60  	podControl controller.PodControlInterface
    61  	// podLister is able to list/get pods from a shared informer's store
    62  	podLister corelisters.PodLister
    63  	// podListerSynced returns true if the pod shared informer has synced at least once
    64  	podListerSynced cache.InformerSynced
    65  	// setLister is able to list/get stateful sets from a shared informer's store
    66  	setLister appslisters.StatefulSetLister
    67  	// setListerSynced returns true if the stateful set shared informer has synced at least once
    68  	setListerSynced cache.InformerSynced
    69  	// pvcListerSynced returns true if the pvc shared informer has synced at least once
    70  	pvcListerSynced cache.InformerSynced
    71  	// revListerSynced returns true if the rev shared informer has synced at least once
    72  	revListerSynced cache.InformerSynced
    73  	// StatefulSets that need to be synced.
    74  	queue workqueue.RateLimitingInterface
    75  	// eventBroadcaster is the core of event processing pipeline.
    76  	eventBroadcaster record.EventBroadcaster
    77  }
    78  
    79  // NewStatefulSetController creates a new statefulset controller.
    80  func NewStatefulSetController(
    81  	ctx context.Context,
    82  	podInformer coreinformers.PodInformer,
    83  	setInformer appsinformers.StatefulSetInformer,
    84  	pvcInformer coreinformers.PersistentVolumeClaimInformer,
    85  	revInformer appsinformers.ControllerRevisionInformer,
    86  	kubeClient clientset.Interface,
    87  ) *StatefulSetController {
    88  	logger := klog.FromContext(ctx)
    89  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
    90  	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
    91  	ssc := &StatefulSetController{
    92  		kubeClient: kubeClient,
    93  		control: NewDefaultStatefulSetControl(
    94  			NewStatefulPodControl(
    95  				kubeClient,
    96  				podInformer.Lister(),
    97  				pvcInformer.Lister(),
    98  				recorder),
    99  			NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
   100  			history.NewHistory(kubeClient, revInformer.Lister()),
   101  			recorder,
   102  		),
   103  		pvcListerSynced: pvcInformer.Informer().HasSynced,
   104  		revListerSynced: revInformer.Informer().HasSynced,
   105  		queue:           workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
   106  		podControl:      controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
   107  
   108  		eventBroadcaster: eventBroadcaster,
   109  	}
   110  
   111  	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   112  		// lookup the statefulset and enqueue
   113  		AddFunc: func(obj interface{}) {
   114  			ssc.addPod(logger, obj)
   115  		},
   116  		// lookup current and old statefulset if labels changed
   117  		UpdateFunc: func(oldObj, newObj interface{}) {
   118  			ssc.updatePod(logger, oldObj, newObj)
   119  		},
   120  		// lookup statefulset accounting for deletion tombstones
   121  		DeleteFunc: func(obj interface{}) {
   122  			ssc.deletePod(logger, obj)
   123  		},
   124  	})
   125  	ssc.podLister = podInformer.Lister()
   126  	ssc.podListerSynced = podInformer.Informer().HasSynced
   127  
   128  	setInformer.Informer().AddEventHandler(
   129  		cache.ResourceEventHandlerFuncs{
   130  			AddFunc: ssc.enqueueStatefulSet,
   131  			UpdateFunc: func(old, cur interface{}) {
   132  				oldPS := old.(*apps.StatefulSet)
   133  				curPS := cur.(*apps.StatefulSet)
   134  				if oldPS.Status.Replicas != curPS.Status.Replicas {
   135  					logger.V(4).Info("Observed updated replica count for StatefulSet", "statefulSet", klog.KObj(curPS), "oldReplicas", oldPS.Status.Replicas, "newReplicas", curPS.Status.Replicas)
   136  				}
   137  				ssc.enqueueStatefulSet(cur)
   138  			},
   139  			DeleteFunc: ssc.enqueueStatefulSet,
   140  		},
   141  	)
   142  	ssc.setLister = setInformer.Lister()
   143  	ssc.setListerSynced = setInformer.Informer().HasSynced
   144  
   145  	// TODO: Watch volumes
   146  	return ssc
   147  }
   148  
   149  // Run runs the statefulset controller.
   150  func (ssc *StatefulSetController) Run(ctx context.Context, workers int) {
   151  	defer utilruntime.HandleCrash()
   152  
   153  	// Start events processing pipeline.
   154  	ssc.eventBroadcaster.StartStructuredLogging(3)
   155  	ssc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ssc.kubeClient.CoreV1().Events("")})
   156  	defer ssc.eventBroadcaster.Shutdown()
   157  
   158  	defer ssc.queue.ShutDown()
   159  
   160  	logger := klog.FromContext(ctx)
   161  	logger.Info("Starting stateful set controller")
   162  	defer logger.Info("Shutting down statefulset controller")
   163  
   164  	if !cache.WaitForNamedCacheSync("stateful set", ctx.Done(), ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
   165  		return
   166  	}
   167  
   168  	for i := 0; i < workers; i++ {
   169  		go wait.UntilWithContext(ctx, ssc.worker, time.Second)
   170  	}
   171  
   172  	<-ctx.Done()
   173  }
   174  
   175  // addPod adds the statefulset for the pod to the sync queue
   176  func (ssc *StatefulSetController) addPod(logger klog.Logger, obj interface{}) {
   177  	pod := obj.(*v1.Pod)
   178  
   179  	if pod.DeletionTimestamp != nil {
   180  		// on a restart of the controller manager, it's possible a new pod shows up in a state that
   181  		// is already pending deletion. Prevent the pod from being a creation observation.
   182  		ssc.deletePod(logger, pod)
   183  		return
   184  	}
   185  
   186  	// If it has a ControllerRef, that's all that matters.
   187  	if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
   188  		set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
   189  		if set == nil {
   190  			return
   191  		}
   192  		logger.V(4).Info("Pod created with labels", "pod", klog.KObj(pod), "labels", pod.Labels)
   193  		ssc.enqueueStatefulSet(set)
   194  		return
   195  	}
   196  
   197  	// Otherwise, it's an orphan. Get a list of all matching controllers and sync
   198  	// them to see if anyone wants to adopt it.
   199  	sets := ssc.getStatefulSetsForPod(pod)
   200  	if len(sets) == 0 {
   201  		return
   202  	}
   203  	logger.V(4).Info("Orphan Pod created with labels", "pod", klog.KObj(pod), "labels", pod.Labels)
   204  	for _, set := range sets {
   205  		ssc.enqueueStatefulSet(set)
   206  	}
   207  }
   208  
   209  // updatePod adds the statefulset for the current and old pods to the sync queue.
   210  func (ssc *StatefulSetController) updatePod(logger klog.Logger, old, cur interface{}) {
   211  	curPod := cur.(*v1.Pod)
   212  	oldPod := old.(*v1.Pod)
   213  	if curPod.ResourceVersion == oldPod.ResourceVersion {
   214  		// In the event of a re-list we may receive update events for all known pods.
   215  		// Two different versions of the same pod will always have different RVs.
   216  		return
   217  	}
   218  
   219  	labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
   220  
   221  	curControllerRef := metav1.GetControllerOf(curPod)
   222  	oldControllerRef := metav1.GetControllerOf(oldPod)
   223  	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
   224  	if controllerRefChanged && oldControllerRef != nil {
   225  		// The ControllerRef was changed. Sync the old controller, if any.
   226  		if set := ssc.resolveControllerRef(oldPod.Namespace, oldControllerRef); set != nil {
   227  			ssc.enqueueStatefulSet(set)
   228  		}
   229  	}
   230  
   231  	// If it has a ControllerRef, that's all that matters.
   232  	if curControllerRef != nil {
   233  		set := ssc.resolveControllerRef(curPod.Namespace, curControllerRef)
   234  		if set == nil {
   235  			return
   236  		}
   237  		logger.V(4).Info("Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta)
   238  		ssc.enqueueStatefulSet(set)
   239  		// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
   240  		// the Pod status which in turn will trigger a requeue of the owning replica set thus
   241  		// having its status updated with the newly available replica.
   242  		if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && set.Spec.MinReadySeconds > 0 {
   243  			logger.V(2).Info("StatefulSet will be enqueued after minReadySeconds for availability check", "statefulSet", klog.KObj(set), "minReadySeconds", set.Spec.MinReadySeconds)
   244  			// Add a second to avoid milliseconds skew in AddAfter.
   245  			// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
   246  			ssc.enqueueSSAfter(set, (time.Duration(set.Spec.MinReadySeconds)*time.Second)+time.Second)
   247  		}
   248  		return
   249  	}
   250  
   251  	// Otherwise, it's an orphan. If anything changed, sync matching controllers
   252  	// to see if anyone wants to adopt it now.
   253  	if labelChanged || controllerRefChanged {
   254  		sets := ssc.getStatefulSetsForPod(curPod)
   255  		if len(sets) == 0 {
   256  			return
   257  		}
   258  		logger.V(4).Info("Orphan Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta)
   259  		for _, set := range sets {
   260  			ssc.enqueueStatefulSet(set)
   261  		}
   262  	}
   263  }
   264  
   265  // deletePod enqueues the statefulset for the pod accounting for deletion tombstones.
   266  func (ssc *StatefulSetController) deletePod(logger klog.Logger, obj interface{}) {
   267  	pod, ok := obj.(*v1.Pod)
   268  
   269  	// When a delete is dropped, the relist will notice a pod in the store not
   270  	// in the list, leading to the insertion of a tombstone object which contains
   271  	// the deleted key/value. Note that this value might be stale.
   272  	if !ok {
   273  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   274  		if !ok {
   275  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
   276  			return
   277  		}
   278  		pod, ok = tombstone.Obj.(*v1.Pod)
   279  		if !ok {
   280  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
   281  			return
   282  		}
   283  	}
   284  
   285  	controllerRef := metav1.GetControllerOf(pod)
   286  	if controllerRef == nil {
   287  		// No controller should care about orphans being deleted.
   288  		return
   289  	}
   290  	set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
   291  	if set == nil {
   292  		return
   293  	}
   294  	logger.V(4).Info("Pod deleted.", "pod", klog.KObj(pod), "caller", utilruntime.GetCaller())
   295  	ssc.enqueueStatefulSet(set)
   296  }
   297  
   298  // getPodsForStatefulSet returns the Pods that a given StatefulSet should manage.
   299  // It also reconciles ControllerRef by adopting/orphaning.
   300  //
   301  // NOTE: Returned Pods are pointers to objects from the cache.
   302  // If you need to modify one, you need to copy it first.
   303  func (ssc *StatefulSetController) getPodsForStatefulSet(ctx context.Context, set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) {
   304  	// List all pods to include the pods that don't match the selector anymore but
   305  	// has a ControllerRef pointing to this StatefulSet.
   306  	pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything())
   307  	if err != nil {
   308  		return nil, err
   309  	}
   310  
   311  	filter := func(pod *v1.Pod) bool {
   312  		// Only claim if it matches our StatefulSet name. Otherwise release/ignore.
   313  		return isMemberOf(set, pod)
   314  	}
   315  
   316  	cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, ssc.canAdoptFunc(ctx, set))
   317  	return cm.ClaimPods(ctx, pods, filter)
   318  }
   319  
   320  // If any adoptions are attempted, we should first recheck for deletion with
   321  // an uncached quorum read sometime after listing Pods/ControllerRevisions (see #42639).
   322  func (ssc *StatefulSetController) canAdoptFunc(ctx context.Context, set *apps.StatefulSet) func(ctx2 context.Context) error {
   323  	return controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
   324  		fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(ctx, set.Name, metav1.GetOptions{})
   325  		if err != nil {
   326  			return nil, err
   327  		}
   328  		if fresh.UID != set.UID {
   329  			return nil, fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
   330  		}
   331  		return fresh, nil
   332  	})
   333  }
   334  
   335  // adoptOrphanRevisions adopts any orphaned ControllerRevisions matched by set's Selector.
   336  func (ssc *StatefulSetController) adoptOrphanRevisions(ctx context.Context, set *apps.StatefulSet) error {
   337  	revisions, err := ssc.control.ListRevisions(set)
   338  	if err != nil {
   339  		return err
   340  	}
   341  	orphanRevisions := make([]*apps.ControllerRevision, 0)
   342  	for i := range revisions {
   343  		if metav1.GetControllerOf(revisions[i]) == nil {
   344  			orphanRevisions = append(orphanRevisions, revisions[i])
   345  		}
   346  	}
   347  	if len(orphanRevisions) > 0 {
   348  		canAdoptErr := ssc.canAdoptFunc(ctx, set)(ctx)
   349  		if canAdoptErr != nil {
   350  			return fmt.Errorf("can't adopt ControllerRevisions: %v", canAdoptErr)
   351  		}
   352  		return ssc.control.AdoptOrphanRevisions(set, orphanRevisions)
   353  	}
   354  	return nil
   355  }
   356  
   357  // getStatefulSetsForPod returns a list of StatefulSets that potentially match
   358  // a given pod.
   359  func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet {
   360  	sets, err := ssc.setLister.GetPodStatefulSets(pod)
   361  	if err != nil {
   362  		return nil
   363  	}
   364  	// More than one set is selecting the same Pod
   365  	if len(sets) > 1 {
   366  		// ControllerRef will ensure we don't do anything crazy, but more than one
   367  		// item in this list nevertheless constitutes user error.
   368  		setNames := []string{}
   369  		for _, s := range sets {
   370  			setNames = append(setNames, s.Name)
   371  		}
   372  		utilruntime.HandleError(
   373  			fmt.Errorf(
   374  				"user error: more than one StatefulSet is selecting pods with labels: %+v. Sets: %v",
   375  				pod.Labels, setNames))
   376  	}
   377  	return sets
   378  }
   379  
   380  // resolveControllerRef returns the controller referenced by a ControllerRef,
   381  // or nil if the ControllerRef could not be resolved to a matching controller
   382  // of the correct Kind.
   383  func (ssc *StatefulSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.StatefulSet {
   384  	// We can't look up by UID, so look up by Name and then verify UID.
   385  	// Don't even try to look up by Name if it's the wrong Kind.
   386  	if controllerRef.Kind != controllerKind.Kind {
   387  		return nil
   388  	}
   389  	set, err := ssc.setLister.StatefulSets(namespace).Get(controllerRef.Name)
   390  	if err != nil {
   391  		return nil
   392  	}
   393  	if set.UID != controllerRef.UID {
   394  		// The controller we found with this Name is not the same one that the
   395  		// ControllerRef points to.
   396  		return nil
   397  	}
   398  	return set
   399  }
   400  
   401  // enqueueStatefulSet enqueues the given statefulset in the work queue.
   402  func (ssc *StatefulSetController) enqueueStatefulSet(obj interface{}) {
   403  	key, err := controller.KeyFunc(obj)
   404  	if err != nil {
   405  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
   406  		return
   407  	}
   408  	ssc.queue.Add(key)
   409  }
   410  
   411  // enqueueStatefulSet enqueues the given statefulset in the work queue after given time
   412  func (ssc *StatefulSetController) enqueueSSAfter(ss *apps.StatefulSet, duration time.Duration) {
   413  	key, err := controller.KeyFunc(ss)
   414  	if err != nil {
   415  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ss, err))
   416  		return
   417  	}
   418  	ssc.queue.AddAfter(key, duration)
   419  }
   420  
   421  // processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never
   422  // invoked concurrently with the same key.
   423  func (ssc *StatefulSetController) processNextWorkItem(ctx context.Context) bool {
   424  	key, quit := ssc.queue.Get()
   425  	if quit {
   426  		return false
   427  	}
   428  	defer ssc.queue.Done(key)
   429  	if err := ssc.sync(ctx, key.(string)); err != nil {
   430  		utilruntime.HandleError(fmt.Errorf("error syncing StatefulSet %v, requeuing: %v", key.(string), err))
   431  		ssc.queue.AddRateLimited(key)
   432  	} else {
   433  		ssc.queue.Forget(key)
   434  	}
   435  	return true
   436  }
   437  
   438  // worker runs a worker goroutine that invokes processNextWorkItem until the controller's queue is closed
   439  func (ssc *StatefulSetController) worker(ctx context.Context) {
   440  	for ssc.processNextWorkItem(ctx) {
   441  	}
   442  }
   443  
   444  // sync syncs the given statefulset.
   445  func (ssc *StatefulSetController) sync(ctx context.Context, key string) error {
   446  	startTime := time.Now()
   447  	logger := klog.FromContext(ctx)
   448  	defer func() {
   449  		logger.V(4).Info("Finished syncing statefulset", "key", key, "time", time.Since(startTime))
   450  	}()
   451  
   452  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   453  	if err != nil {
   454  		return err
   455  	}
   456  	set, err := ssc.setLister.StatefulSets(namespace).Get(name)
   457  	if errors.IsNotFound(err) {
   458  		logger.Info("StatefulSet has been deleted", "key", key)
   459  		return nil
   460  	}
   461  	if err != nil {
   462  		utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
   463  		return err
   464  	}
   465  
   466  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   467  	if err != nil {
   468  		utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
   469  		// This is a non-transient error, so don't retry.
   470  		return nil
   471  	}
   472  
   473  	if err := ssc.adoptOrphanRevisions(ctx, set); err != nil {
   474  		return err
   475  	}
   476  
   477  	pods, err := ssc.getPodsForStatefulSet(ctx, set, selector)
   478  	if err != nil {
   479  		return err
   480  	}
   481  
   482  	return ssc.syncStatefulSet(ctx, set, pods)
   483  }
   484  
   485  // syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).
   486  func (ssc *StatefulSetController) syncStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) error {
   487  	logger := klog.FromContext(ctx)
   488  	logger.V(4).Info("Syncing StatefulSet with pods", "statefulSet", klog.KObj(set), "pods", len(pods))
   489  	var status *apps.StatefulSetStatus
   490  	var err error
   491  	status, err = ssc.control.UpdateStatefulSet(ctx, set, pods)
   492  	if err != nil {
   493  		return err
   494  	}
   495  	logger.V(4).Info("Successfully synced StatefulSet", "statefulSet", klog.KObj(set))
   496  	// One more sync to handle the clock skew. This is also helping in requeuing right after status update
   497  	if set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
   498  		ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
   499  	}
   500  
   501  	return nil
   502  }
   503  

View as plain text