1
16
17 package statefulset
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "time"
24
25 apps "k8s.io/api/apps/v1"
26 v1 "k8s.io/api/core/v1"
27 "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/labels"
30 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31 "k8s.io/apimachinery/pkg/util/wait"
32 appsinformers "k8s.io/client-go/informers/apps/v1"
33 coreinformers "k8s.io/client-go/informers/core/v1"
34 clientset "k8s.io/client-go/kubernetes"
35 "k8s.io/client-go/kubernetes/scheme"
36 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
37 appslisters "k8s.io/client-go/listers/apps/v1"
38 corelisters "k8s.io/client-go/listers/core/v1"
39 "k8s.io/client-go/tools/cache"
40 "k8s.io/client-go/tools/record"
41 "k8s.io/client-go/util/workqueue"
42 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
43 "k8s.io/kubernetes/pkg/controller"
44 "k8s.io/kubernetes/pkg/controller/history"
45
46 "k8s.io/klog/v2"
47 )
48
49
50 var controllerKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
51
52
53 type StatefulSetController struct {
54
55 kubeClient clientset.Interface
56
57
58 control StatefulSetControlInterface
59
60 podControl controller.PodControlInterface
61
62 podLister corelisters.PodLister
63
64 podListerSynced cache.InformerSynced
65
66 setLister appslisters.StatefulSetLister
67
68 setListerSynced cache.InformerSynced
69
70 pvcListerSynced cache.InformerSynced
71
72 revListerSynced cache.InformerSynced
73
74 queue workqueue.RateLimitingInterface
75
76 eventBroadcaster record.EventBroadcaster
77 }
78
79
80 func NewStatefulSetController(
81 ctx context.Context,
82 podInformer coreinformers.PodInformer,
83 setInformer appsinformers.StatefulSetInformer,
84 pvcInformer coreinformers.PersistentVolumeClaimInformer,
85 revInformer appsinformers.ControllerRevisionInformer,
86 kubeClient clientset.Interface,
87 ) *StatefulSetController {
88 logger := klog.FromContext(ctx)
89 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
90 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
91 ssc := &StatefulSetController{
92 kubeClient: kubeClient,
93 control: NewDefaultStatefulSetControl(
94 NewStatefulPodControl(
95 kubeClient,
96 podInformer.Lister(),
97 pvcInformer.Lister(),
98 recorder),
99 NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
100 history.NewHistory(kubeClient, revInformer.Lister()),
101 recorder,
102 ),
103 pvcListerSynced: pvcInformer.Informer().HasSynced,
104 revListerSynced: revInformer.Informer().HasSynced,
105 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
106 podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
107
108 eventBroadcaster: eventBroadcaster,
109 }
110
111 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
112
113 AddFunc: func(obj interface{}) {
114 ssc.addPod(logger, obj)
115 },
116
117 UpdateFunc: func(oldObj, newObj interface{}) {
118 ssc.updatePod(logger, oldObj, newObj)
119 },
120
121 DeleteFunc: func(obj interface{}) {
122 ssc.deletePod(logger, obj)
123 },
124 })
125 ssc.podLister = podInformer.Lister()
126 ssc.podListerSynced = podInformer.Informer().HasSynced
127
128 setInformer.Informer().AddEventHandler(
129 cache.ResourceEventHandlerFuncs{
130 AddFunc: ssc.enqueueStatefulSet,
131 UpdateFunc: func(old, cur interface{}) {
132 oldPS := old.(*apps.StatefulSet)
133 curPS := cur.(*apps.StatefulSet)
134 if oldPS.Status.Replicas != curPS.Status.Replicas {
135 logger.V(4).Info("Observed updated replica count for StatefulSet", "statefulSet", klog.KObj(curPS), "oldReplicas", oldPS.Status.Replicas, "newReplicas", curPS.Status.Replicas)
136 }
137 ssc.enqueueStatefulSet(cur)
138 },
139 DeleteFunc: ssc.enqueueStatefulSet,
140 },
141 )
142 ssc.setLister = setInformer.Lister()
143 ssc.setListerSynced = setInformer.Informer().HasSynced
144
145
146 return ssc
147 }
148
149
150 func (ssc *StatefulSetController) Run(ctx context.Context, workers int) {
151 defer utilruntime.HandleCrash()
152
153
154 ssc.eventBroadcaster.StartStructuredLogging(3)
155 ssc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ssc.kubeClient.CoreV1().Events("")})
156 defer ssc.eventBroadcaster.Shutdown()
157
158 defer ssc.queue.ShutDown()
159
160 logger := klog.FromContext(ctx)
161 logger.Info("Starting stateful set controller")
162 defer logger.Info("Shutting down statefulset controller")
163
164 if !cache.WaitForNamedCacheSync("stateful set", ctx.Done(), ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
165 return
166 }
167
168 for i := 0; i < workers; i++ {
169 go wait.UntilWithContext(ctx, ssc.worker, time.Second)
170 }
171
172 <-ctx.Done()
173 }
174
175
176 func (ssc *StatefulSetController) addPod(logger klog.Logger, obj interface{}) {
177 pod := obj.(*v1.Pod)
178
179 if pod.DeletionTimestamp != nil {
180
181
182 ssc.deletePod(logger, pod)
183 return
184 }
185
186
187 if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
188 set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
189 if set == nil {
190 return
191 }
192 logger.V(4).Info("Pod created with labels", "pod", klog.KObj(pod), "labels", pod.Labels)
193 ssc.enqueueStatefulSet(set)
194 return
195 }
196
197
198
199 sets := ssc.getStatefulSetsForPod(pod)
200 if len(sets) == 0 {
201 return
202 }
203 logger.V(4).Info("Orphan Pod created with labels", "pod", klog.KObj(pod), "labels", pod.Labels)
204 for _, set := range sets {
205 ssc.enqueueStatefulSet(set)
206 }
207 }
208
209
210 func (ssc *StatefulSetController) updatePod(logger klog.Logger, old, cur interface{}) {
211 curPod := cur.(*v1.Pod)
212 oldPod := old.(*v1.Pod)
213 if curPod.ResourceVersion == oldPod.ResourceVersion {
214
215
216 return
217 }
218
219 labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
220
221 curControllerRef := metav1.GetControllerOf(curPod)
222 oldControllerRef := metav1.GetControllerOf(oldPod)
223 controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
224 if controllerRefChanged && oldControllerRef != nil {
225
226 if set := ssc.resolveControllerRef(oldPod.Namespace, oldControllerRef); set != nil {
227 ssc.enqueueStatefulSet(set)
228 }
229 }
230
231
232 if curControllerRef != nil {
233 set := ssc.resolveControllerRef(curPod.Namespace, curControllerRef)
234 if set == nil {
235 return
236 }
237 logger.V(4).Info("Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta)
238 ssc.enqueueStatefulSet(set)
239
240
241
242 if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && set.Spec.MinReadySeconds > 0 {
243 logger.V(2).Info("StatefulSet will be enqueued after minReadySeconds for availability check", "statefulSet", klog.KObj(set), "minReadySeconds", set.Spec.MinReadySeconds)
244
245
246 ssc.enqueueSSAfter(set, (time.Duration(set.Spec.MinReadySeconds)*time.Second)+time.Second)
247 }
248 return
249 }
250
251
252
253 if labelChanged || controllerRefChanged {
254 sets := ssc.getStatefulSetsForPod(curPod)
255 if len(sets) == 0 {
256 return
257 }
258 logger.V(4).Info("Orphan Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta)
259 for _, set := range sets {
260 ssc.enqueueStatefulSet(set)
261 }
262 }
263 }
264
265
266 func (ssc *StatefulSetController) deletePod(logger klog.Logger, obj interface{}) {
267 pod, ok := obj.(*v1.Pod)
268
269
270
271
272 if !ok {
273 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
274 if !ok {
275 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
276 return
277 }
278 pod, ok = tombstone.Obj.(*v1.Pod)
279 if !ok {
280 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
281 return
282 }
283 }
284
285 controllerRef := metav1.GetControllerOf(pod)
286 if controllerRef == nil {
287
288 return
289 }
290 set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
291 if set == nil {
292 return
293 }
294 logger.V(4).Info("Pod deleted.", "pod", klog.KObj(pod), "caller", utilruntime.GetCaller())
295 ssc.enqueueStatefulSet(set)
296 }
297
298
299
300
301
302
303 func (ssc *StatefulSetController) getPodsForStatefulSet(ctx context.Context, set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) {
304
305
306 pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything())
307 if err != nil {
308 return nil, err
309 }
310
311 filter := func(pod *v1.Pod) bool {
312
313 return isMemberOf(set, pod)
314 }
315
316 cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, ssc.canAdoptFunc(ctx, set))
317 return cm.ClaimPods(ctx, pods, filter)
318 }
319
320
321
322 func (ssc *StatefulSetController) canAdoptFunc(ctx context.Context, set *apps.StatefulSet) func(ctx2 context.Context) error {
323 return controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
324 fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(ctx, set.Name, metav1.GetOptions{})
325 if err != nil {
326 return nil, err
327 }
328 if fresh.UID != set.UID {
329 return nil, fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
330 }
331 return fresh, nil
332 })
333 }
334
335
336 func (ssc *StatefulSetController) adoptOrphanRevisions(ctx context.Context, set *apps.StatefulSet) error {
337 revisions, err := ssc.control.ListRevisions(set)
338 if err != nil {
339 return err
340 }
341 orphanRevisions := make([]*apps.ControllerRevision, 0)
342 for i := range revisions {
343 if metav1.GetControllerOf(revisions[i]) == nil {
344 orphanRevisions = append(orphanRevisions, revisions[i])
345 }
346 }
347 if len(orphanRevisions) > 0 {
348 canAdoptErr := ssc.canAdoptFunc(ctx, set)(ctx)
349 if canAdoptErr != nil {
350 return fmt.Errorf("can't adopt ControllerRevisions: %v", canAdoptErr)
351 }
352 return ssc.control.AdoptOrphanRevisions(set, orphanRevisions)
353 }
354 return nil
355 }
356
357
358
359 func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet {
360 sets, err := ssc.setLister.GetPodStatefulSets(pod)
361 if err != nil {
362 return nil
363 }
364
365 if len(sets) > 1 {
366
367
368 setNames := []string{}
369 for _, s := range sets {
370 setNames = append(setNames, s.Name)
371 }
372 utilruntime.HandleError(
373 fmt.Errorf(
374 "user error: more than one StatefulSet is selecting pods with labels: %+v. Sets: %v",
375 pod.Labels, setNames))
376 }
377 return sets
378 }
379
380
381
382
383 func (ssc *StatefulSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.StatefulSet {
384
385
386 if controllerRef.Kind != controllerKind.Kind {
387 return nil
388 }
389 set, err := ssc.setLister.StatefulSets(namespace).Get(controllerRef.Name)
390 if err != nil {
391 return nil
392 }
393 if set.UID != controllerRef.UID {
394
395
396 return nil
397 }
398 return set
399 }
400
401
402 func (ssc *StatefulSetController) enqueueStatefulSet(obj interface{}) {
403 key, err := controller.KeyFunc(obj)
404 if err != nil {
405 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
406 return
407 }
408 ssc.queue.Add(key)
409 }
410
411
412 func (ssc *StatefulSetController) enqueueSSAfter(ss *apps.StatefulSet, duration time.Duration) {
413 key, err := controller.KeyFunc(ss)
414 if err != nil {
415 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ss, err))
416 return
417 }
418 ssc.queue.AddAfter(key, duration)
419 }
420
421
422
423 func (ssc *StatefulSetController) processNextWorkItem(ctx context.Context) bool {
424 key, quit := ssc.queue.Get()
425 if quit {
426 return false
427 }
428 defer ssc.queue.Done(key)
429 if err := ssc.sync(ctx, key.(string)); err != nil {
430 utilruntime.HandleError(fmt.Errorf("error syncing StatefulSet %v, requeuing: %v", key.(string), err))
431 ssc.queue.AddRateLimited(key)
432 } else {
433 ssc.queue.Forget(key)
434 }
435 return true
436 }
437
438
439 func (ssc *StatefulSetController) worker(ctx context.Context) {
440 for ssc.processNextWorkItem(ctx) {
441 }
442 }
443
444
445 func (ssc *StatefulSetController) sync(ctx context.Context, key string) error {
446 startTime := time.Now()
447 logger := klog.FromContext(ctx)
448 defer func() {
449 logger.V(4).Info("Finished syncing statefulset", "key", key, "time", time.Since(startTime))
450 }()
451
452 namespace, name, err := cache.SplitMetaNamespaceKey(key)
453 if err != nil {
454 return err
455 }
456 set, err := ssc.setLister.StatefulSets(namespace).Get(name)
457 if errors.IsNotFound(err) {
458 logger.Info("StatefulSet has been deleted", "key", key)
459 return nil
460 }
461 if err != nil {
462 utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
463 return err
464 }
465
466 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
467 if err != nil {
468 utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
469
470 return nil
471 }
472
473 if err := ssc.adoptOrphanRevisions(ctx, set); err != nil {
474 return err
475 }
476
477 pods, err := ssc.getPodsForStatefulSet(ctx, set, selector)
478 if err != nil {
479 return err
480 }
481
482 return ssc.syncStatefulSet(ctx, set, pods)
483 }
484
485
486 func (ssc *StatefulSetController) syncStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) error {
487 logger := klog.FromContext(ctx)
488 logger.V(4).Info("Syncing StatefulSet with pods", "statefulSet", klog.KObj(set), "pods", len(pods))
489 var status *apps.StatefulSetStatus
490 var err error
491 status, err = ssc.control.UpdateStatefulSet(ctx, set, pods)
492 if err != nil {
493 return err
494 }
495 logger.V(4).Info("Successfully synced StatefulSet", "statefulSet", klog.KObj(set))
496
497 if set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
498 ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
499 }
500
501 return nil
502 }
503
View as plain text