1
16
17
18
19
20
21
22 package nodelifecycle
23
24 import (
25 "context"
26 "fmt"
27 "sync"
28 "time"
29
30 "k8s.io/klog/v2"
31
32 coordv1 "k8s.io/api/coordination/v1"
33 v1 "k8s.io/api/core/v1"
34 apiequality "k8s.io/apimachinery/pkg/api/equality"
35 apierrors "k8s.io/apimachinery/pkg/api/errors"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 "k8s.io/apimachinery/pkg/labels"
38 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
39 "k8s.io/apimachinery/pkg/util/wait"
40 utilfeature "k8s.io/apiserver/pkg/util/feature"
41 appsv1informers "k8s.io/client-go/informers/apps/v1"
42 coordinformers "k8s.io/client-go/informers/coordination/v1"
43 coreinformers "k8s.io/client-go/informers/core/v1"
44 clientset "k8s.io/client-go/kubernetes"
45 "k8s.io/client-go/kubernetes/scheme"
46 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
47 appsv1listers "k8s.io/client-go/listers/apps/v1"
48 coordlisters "k8s.io/client-go/listers/coordination/v1"
49 corelisters "k8s.io/client-go/listers/core/v1"
50 "k8s.io/client-go/tools/cache"
51 "k8s.io/client-go/tools/record"
52 "k8s.io/client-go/util/flowcontrol"
53 "k8s.io/client-go/util/workqueue"
54 nodetopology "k8s.io/component-helpers/node/topology"
55 kubeletapis "k8s.io/kubelet/pkg/apis"
56 "k8s.io/kubernetes/pkg/controller"
57 "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
58 "k8s.io/kubernetes/pkg/controller/tainteviction"
59 controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
60 "k8s.io/kubernetes/pkg/features"
61 taintutils "k8s.io/kubernetes/pkg/util/taints"
62 )
63
64 func init() {
65
66 Register()
67 }
68
69 var (
70
71 UnreachableTaintTemplate = &v1.Taint{
72 Key: v1.TaintNodeUnreachable,
73 Effect: v1.TaintEffectNoExecute,
74 }
75
76
77
78 NotReadyTaintTemplate = &v1.Taint{
79 Key: v1.TaintNodeNotReady,
80 Effect: v1.TaintEffectNoExecute,
81 }
82
83
84
85
86
87 nodeConditionToTaintKeyStatusMap = map[v1.NodeConditionType]map[v1.ConditionStatus]string{
88 v1.NodeReady: {
89 v1.ConditionFalse: v1.TaintNodeNotReady,
90 v1.ConditionUnknown: v1.TaintNodeUnreachable,
91 },
92 v1.NodeMemoryPressure: {
93 v1.ConditionTrue: v1.TaintNodeMemoryPressure,
94 },
95 v1.NodeDiskPressure: {
96 v1.ConditionTrue: v1.TaintNodeDiskPressure,
97 },
98 v1.NodeNetworkUnavailable: {
99 v1.ConditionTrue: v1.TaintNodeNetworkUnavailable,
100 },
101 v1.NodePIDPressure: {
102 v1.ConditionTrue: v1.TaintNodePIDPressure,
103 },
104 }
105
106 taintKeyToNodeConditionMap = map[string]v1.NodeConditionType{
107 v1.TaintNodeNotReady: v1.NodeReady,
108 v1.TaintNodeUnreachable: v1.NodeReady,
109 v1.TaintNodeNetworkUnavailable: v1.NodeNetworkUnavailable,
110 v1.TaintNodeMemoryPressure: v1.NodeMemoryPressure,
111 v1.TaintNodeDiskPressure: v1.NodeDiskPressure,
112 v1.TaintNodePIDPressure: v1.NodePIDPressure,
113 }
114 )
115
116
117 type ZoneState string
118
119 const (
120 stateInitial = ZoneState("Initial")
121 stateNormal = ZoneState("Normal")
122 stateFullDisruption = ZoneState("FullDisruption")
123 statePartialDisruption = ZoneState("PartialDisruption")
124 )
125
126 const (
127
128 retrySleepTime = 20 * time.Millisecond
129 nodeNameKeyIndex = "spec.nodeName"
130
131
132 podUpdateWorkerSize = 4
133
134 nodeUpdateWorkerSize = 8
135
136
137
138
139 taintEvictionController = "taint-eviction-controller"
140 )
141
142
143
144
145
146
147
148 var labelReconcileInfo = []struct {
149 primaryKey string
150 secondaryKey string
151 ensureSecondaryExists bool
152 }{
153 {
154
155
156 primaryKey: v1.LabelOSStable,
157 secondaryKey: kubeletapis.LabelOS,
158 ensureSecondaryExists: true,
159 },
160 {
161
162
163 primaryKey: v1.LabelArchStable,
164 secondaryKey: kubeletapis.LabelArch,
165 ensureSecondaryExists: true,
166 },
167 }
168
169 type nodeHealthData struct {
170 probeTimestamp metav1.Time
171 readyTransitionTimestamp metav1.Time
172 status *v1.NodeStatus
173 lease *coordv1.Lease
174 }
175
176 func (n *nodeHealthData) deepCopy() *nodeHealthData {
177 if n == nil {
178 return nil
179 }
180 return &nodeHealthData{
181 probeTimestamp: n.probeTimestamp,
182 readyTransitionTimestamp: n.readyTransitionTimestamp,
183 status: n.status.DeepCopy(),
184 lease: n.lease.DeepCopy(),
185 }
186 }
187
188 type nodeHealthMap struct {
189 lock sync.RWMutex
190 nodeHealths map[string]*nodeHealthData
191 }
192
193 func newNodeHealthMap() *nodeHealthMap {
194 return &nodeHealthMap{
195 nodeHealths: make(map[string]*nodeHealthData),
196 }
197 }
198
199
200
201 func (n *nodeHealthMap) getDeepCopy(name string) *nodeHealthData {
202 n.lock.RLock()
203 defer n.lock.RUnlock()
204 return n.nodeHealths[name].deepCopy()
205 }
206
207 func (n *nodeHealthMap) set(name string, data *nodeHealthData) {
208 n.lock.Lock()
209 defer n.lock.Unlock()
210 n.nodeHealths[name] = data
211 }
212
213 type podUpdateItem struct {
214 namespace string
215 name string
216 }
217
218
219 type Controller struct {
220 taintManager *tainteviction.Controller
221
222 podLister corelisters.PodLister
223 podInformerSynced cache.InformerSynced
224 kubeClient clientset.Interface
225
226
227
228 now func() metav1.Time
229
230 enterPartialDisruptionFunc func(nodeNum int) float32
231 enterFullDisruptionFunc func(nodeNum int) float32
232 computeZoneStateFunc func(nodeConditions []*v1.NodeCondition) (int, ZoneState)
233
234 knownNodeSet map[string]*v1.Node
235
236 nodeHealthMap *nodeHealthMap
237
238
239 evictorLock sync.Mutex
240
241 zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue
242
243 nodesToRetry sync.Map
244
245 zoneStates map[string]ZoneState
246
247 daemonSetStore appsv1listers.DaemonSetLister
248 daemonSetInformerSynced cache.InformerSynced
249
250 leaseLister coordlisters.LeaseLister
251 leaseInformerSynced cache.InformerSynced
252 nodeLister corelisters.NodeLister
253 nodeInformerSynced cache.InformerSynced
254
255 getPodsAssignedToNode func(nodeName string) ([]*v1.Pod, error)
256
257 broadcaster record.EventBroadcaster
258 recorder record.EventRecorder
259
260
261
262
263
264 nodeMonitorPeriod time.Duration
265
266
267
268 nodeStartupGracePeriod time.Duration
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289 nodeMonitorGracePeriod time.Duration
290
291
292
293 nodeUpdateWorkerSize int
294
295 evictionLimiterQPS float32
296 secondaryEvictionLimiterQPS float32
297 largeClusterThreshold int32
298 unhealthyZoneThreshold float32
299
300 nodeUpdateQueue workqueue.Interface
301 podUpdateQueue workqueue.RateLimitingInterface
302 }
303
304
305 func NewNodeLifecycleController(
306 ctx context.Context,
307 leaseInformer coordinformers.LeaseInformer,
308 podInformer coreinformers.PodInformer,
309 nodeInformer coreinformers.NodeInformer,
310 daemonSetInformer appsv1informers.DaemonSetInformer,
311 kubeClient clientset.Interface,
312 nodeMonitorPeriod time.Duration,
313 nodeStartupGracePeriod time.Duration,
314 nodeMonitorGracePeriod time.Duration,
315 evictionLimiterQPS float32,
316 secondaryEvictionLimiterQPS float32,
317 largeClusterThreshold int32,
318 unhealthyZoneThreshold float32,
319 ) (*Controller, error) {
320 logger := klog.FromContext(ctx)
321 if kubeClient == nil {
322 logger.Error(nil, "kubeClient is nil when starting nodelifecycle Controller")
323 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
324 }
325
326 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
327 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"})
328
329 nc := &Controller{
330 kubeClient: kubeClient,
331 now: metav1.Now,
332 knownNodeSet: make(map[string]*v1.Node),
333 nodeHealthMap: newNodeHealthMap(),
334 broadcaster: eventBroadcaster,
335 recorder: recorder,
336 nodeMonitorPeriod: nodeMonitorPeriod,
337 nodeStartupGracePeriod: nodeStartupGracePeriod,
338 nodeMonitorGracePeriod: nodeMonitorGracePeriod,
339 nodeUpdateWorkerSize: nodeUpdateWorkerSize,
340 zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue),
341 nodesToRetry: sync.Map{},
342 zoneStates: make(map[string]ZoneState),
343 evictionLimiterQPS: evictionLimiterQPS,
344 secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
345 largeClusterThreshold: largeClusterThreshold,
346 unhealthyZoneThreshold: unhealthyZoneThreshold,
347 nodeUpdateQueue: workqueue.NewNamed("node_lifecycle_controller"),
348 podUpdateQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_lifecycle_controller_pods"),
349 }
350
351 nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
352 nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
353 nc.computeZoneStateFunc = nc.ComputeZoneState
354
355 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
356 AddFunc: func(obj interface{}) {
357 pod := obj.(*v1.Pod)
358 nc.podUpdated(nil, pod)
359 },
360 UpdateFunc: func(prev, obj interface{}) {
361 prevPod := prev.(*v1.Pod)
362 newPod := obj.(*v1.Pod)
363 nc.podUpdated(prevPod, newPod)
364 },
365 DeleteFunc: func(obj interface{}) {
366 pod, isPod := obj.(*v1.Pod)
367
368 if !isPod {
369 deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
370 if !ok {
371 logger.Error(nil, "Received unexpected object", "object", obj)
372 return
373 }
374 pod, ok = deletedState.Obj.(*v1.Pod)
375 if !ok {
376 logger.Error(nil, "DeletedFinalStateUnknown contained non-Pod object", "object", deletedState.Obj)
377 return
378 }
379 }
380 nc.podUpdated(pod, nil)
381 },
382 })
383 nc.podInformerSynced = podInformer.Informer().HasSynced
384 podInformer.Informer().AddIndexers(cache.Indexers{
385 nodeNameKeyIndex: func(obj interface{}) ([]string, error) {
386 pod, ok := obj.(*v1.Pod)
387 if !ok {
388 return []string{}, nil
389 }
390 if len(pod.Spec.NodeName) == 0 {
391 return []string{}, nil
392 }
393 return []string{pod.Spec.NodeName}, nil
394 },
395 })
396
397 podIndexer := podInformer.Informer().GetIndexer()
398 nc.getPodsAssignedToNode = func(nodeName string) ([]*v1.Pod, error) {
399 objs, err := podIndexer.ByIndex(nodeNameKeyIndex, nodeName)
400 if err != nil {
401 return nil, err
402 }
403 pods := make([]*v1.Pod, 0, len(objs))
404 for _, obj := range objs {
405 pod, ok := obj.(*v1.Pod)
406 if !ok {
407 continue
408 }
409 pods = append(pods, pod)
410 }
411 return pods, nil
412 }
413 nc.podLister = podInformer.Lister()
414 nc.nodeLister = nodeInformer.Lister()
415
416 if !utilfeature.DefaultFeatureGate.Enabled(features.SeparateTaintEvictionController) {
417 logger.Info("Running TaintEvictionController as part of NodeLifecyleController")
418 tm, err := tainteviction.New(ctx, kubeClient, podInformer, nodeInformer, taintEvictionController)
419 if err != nil {
420 return nil, err
421 }
422 nc.taintManager = tm
423 }
424
425 logger.Info("Controller will reconcile labels")
426 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
427 AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
428 nc.nodeUpdateQueue.Add(node.Name)
429 return nil
430 }),
431 UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
432 nc.nodeUpdateQueue.Add(newNode.Name)
433 return nil
434 }),
435 DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
436 nc.nodesToRetry.Delete(node.Name)
437 return nil
438 }),
439 })
440
441 nc.leaseLister = leaseInformer.Lister()
442 nc.leaseInformerSynced = leaseInformer.Informer().HasSynced
443
444 nc.nodeInformerSynced = nodeInformer.Informer().HasSynced
445
446 nc.daemonSetStore = daemonSetInformer.Lister()
447 nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced
448
449 return nc, nil
450 }
451
452
453 func (nc *Controller) Run(ctx context.Context) {
454 defer utilruntime.HandleCrash()
455
456
457 nc.broadcaster.StartStructuredLogging(3)
458 logger := klog.FromContext(ctx)
459 logger.Info("Sending events to api server")
460 nc.broadcaster.StartRecordingToSink(
461 &v1core.EventSinkImpl{
462 Interface: v1core.New(nc.kubeClient.CoreV1().RESTClient()).Events(""),
463 })
464 defer nc.broadcaster.Shutdown()
465
466
467 defer nc.nodeUpdateQueue.ShutDown()
468 defer nc.podUpdateQueue.ShutDown()
469
470 logger.Info("Starting node controller")
471 defer logger.Info("Shutting down node controller")
472
473 if !cache.WaitForNamedCacheSync("taint", ctx.Done(), nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
474 return
475 }
476
477 if !utilfeature.DefaultFeatureGate.Enabled(features.SeparateTaintEvictionController) {
478 logger.Info("Starting", "controller", taintEvictionController)
479 go nc.taintManager.Run(ctx)
480 }
481
482
483 for i := 0; i < nodeUpdateWorkerSize; i++ {
484
485
486
487
488 go wait.UntilWithContext(ctx, nc.doNodeProcessingPassWorker, time.Second)
489 }
490
491 for i := 0; i < podUpdateWorkerSize; i++ {
492 go wait.UntilWithContext(ctx, nc.doPodProcessingWorker, time.Second)
493 }
494
495
496
497 go wait.UntilWithContext(ctx, nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod)
498
499
500 go wait.UntilWithContext(ctx, func(ctx context.Context) {
501 if err := nc.monitorNodeHealth(ctx); err != nil {
502 logger.Error(err, "Error monitoring node health")
503 }
504 }, nc.nodeMonitorPeriod)
505
506 <-ctx.Done()
507 }
508
509 func (nc *Controller) doNodeProcessingPassWorker(ctx context.Context) {
510 logger := klog.FromContext(ctx)
511 for {
512 obj, shutdown := nc.nodeUpdateQueue.Get()
513
514
515 if shutdown {
516 return
517 }
518 nodeName := obj.(string)
519 if err := nc.doNoScheduleTaintingPass(ctx, nodeName); err != nil {
520 logger.Error(err, "Failed to taint NoSchedule on node, requeue it", "node", klog.KRef("", nodeName))
521
522 }
523
524
525 if err := nc.reconcileNodeLabels(ctx, nodeName); err != nil {
526 logger.Error(err, "Failed to reconcile labels for node, requeue it", "node", klog.KRef("", nodeName))
527
528 }
529 nc.nodeUpdateQueue.Done(nodeName)
530 }
531 }
532
533 func (nc *Controller) doNoScheduleTaintingPass(ctx context.Context, nodeName string) error {
534 node, err := nc.nodeLister.Get(nodeName)
535 if err != nil {
536
537 if apierrors.IsNotFound(err) {
538 return nil
539 }
540 return err
541 }
542
543
544 var taints []v1.Taint
545 for _, condition := range node.Status.Conditions {
546 if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
547 if taintKey, found := taintMap[condition.Status]; found {
548 taints = append(taints, v1.Taint{
549 Key: taintKey,
550 Effect: v1.TaintEffectNoSchedule,
551 })
552 }
553 }
554 }
555 if node.Spec.Unschedulable {
556
557 taints = append(taints, v1.Taint{
558 Key: v1.TaintNodeUnschedulable,
559 Effect: v1.TaintEffectNoSchedule,
560 })
561 }
562
563
564 nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
565
566 if t.Effect != v1.TaintEffectNoSchedule {
567 return false
568 }
569
570 if t.Key == v1.TaintNodeUnschedulable {
571 return true
572 }
573
574 _, found := taintKeyToNodeConditionMap[t.Key]
575 return found
576 })
577 taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
578
579 if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
580 return nil
581 }
582 if !controllerutil.SwapNodeControllerTaint(ctx, nc.kubeClient, taintsToAdd, taintsToDel, node) {
583 return fmt.Errorf("failed to swap taints of node %+v", node)
584 }
585 return nil
586 }
587
588 func (nc *Controller) doNoExecuteTaintingPass(ctx context.Context) {
589
590
591
592 var zoneNoExecuteTainterKeys []string
593 func() {
594 nc.evictorLock.Lock()
595 defer nc.evictorLock.Unlock()
596
597 zoneNoExecuteTainterKeys = make([]string, 0, len(nc.zoneNoExecuteTainter))
598 for k := range nc.zoneNoExecuteTainter {
599 zoneNoExecuteTainterKeys = append(zoneNoExecuteTainterKeys, k)
600 }
601 }()
602 logger := klog.FromContext(ctx)
603 for _, k := range zoneNoExecuteTainterKeys {
604 var zoneNoExecuteTainterWorker *scheduler.RateLimitedTimedQueue
605 func() {
606 nc.evictorLock.Lock()
607 defer nc.evictorLock.Unlock()
608
609
610
611
612 zoneNoExecuteTainterWorker = nc.zoneNoExecuteTainter[k]
613 }()
614
615 zoneNoExecuteTainterWorker.Try(logger, func(value scheduler.TimedValue) (bool, time.Duration) {
616 node, err := nc.nodeLister.Get(value.Value)
617 if apierrors.IsNotFound(err) {
618 logger.Info("Node no longer present in nodeLister", "node", klog.KRef("", value.Value))
619 return true, 0
620 } else if err != nil {
621 logger.Info("Failed to get Node from the nodeLister", "node", klog.KRef("", value.Value), "err", err)
622
623 return false, 50 * time.Millisecond
624 }
625 _, condition := controllerutil.GetNodeCondition(&node.Status, v1.NodeReady)
626 if condition == nil {
627 logger.Info("Failed to get NodeCondition from the node status", "node", klog.KRef("", value.Value))
628
629 return false, 50 * time.Millisecond
630 }
631
632 taintToAdd := v1.Taint{}
633 oppositeTaint := v1.Taint{}
634 switch condition.Status {
635 case v1.ConditionFalse:
636 taintToAdd = *NotReadyTaintTemplate
637 oppositeTaint = *UnreachableTaintTemplate
638 case v1.ConditionUnknown:
639 taintToAdd = *UnreachableTaintTemplate
640 oppositeTaint = *NotReadyTaintTemplate
641 default:
642
643 logger.V(4).Info("Node was in a taint queue, but it's ready now. Ignoring taint request", "node", klog.KRef("", value.Value))
644 return true, 0
645 }
646 result := controllerutil.SwapNodeControllerTaint(ctx, nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node)
647 if result {
648
649 zone := nodetopology.GetZoneKey(node)
650 evictionsTotal.WithLabelValues(zone).Inc()
651 }
652
653 return result, 0
654 })
655 }
656 }
657
658
659
660
661
662
663 func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
664 start := nc.now()
665 defer func() {
666 updateAllNodesHealthDuration.Observe(time.Since(start.Time).Seconds())
667 }()
668
669
670
671 nodes, err := nc.nodeLister.List(labels.Everything())
672 if err != nil {
673 return err
674 }
675 added, deleted, newZoneRepresentatives := nc.classifyNodes(nodes)
676 logger := klog.FromContext(ctx)
677 for i := range newZoneRepresentatives {
678 nc.addPodEvictorForNewZone(logger, newZoneRepresentatives[i])
679 }
680 for i := range added {
681 logger.V(1).Info("Controller observed a new Node", "node", klog.KRef("", added[i].Name))
682 controllerutil.RecordNodeEvent(ctx, nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
683 nc.knownNodeSet[added[i].Name] = added[i]
684 nc.addPodEvictorForNewZone(logger, added[i])
685 nc.markNodeAsReachable(ctx, added[i])
686 }
687
688 for i := range deleted {
689 logger.V(1).Info("Controller observed a Node deletion", "node", klog.KRef("", deleted[i].Name))
690 controllerutil.RecordNodeEvent(ctx, nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
691 delete(nc.knownNodeSet, deleted[i].Name)
692 }
693
694 var zoneToNodeConditionsLock sync.Mutex
695 zoneToNodeConditions := map[string][]*v1.NodeCondition{}
696 updateNodeFunc := func(piece int) {
697 start := nc.now()
698 defer func() {
699 updateNodeHealthDuration.Observe(time.Since(start.Time).Seconds())
700 }()
701
702 var observedReadyCondition v1.NodeCondition
703 var currentReadyCondition *v1.NodeCondition
704 node := nodes[piece].DeepCopy()
705
706 if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeHealthUpdateRetry, func() (bool, error) {
707 var err error
708 _, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeHealth(ctx, node)
709 if err == nil {
710 return true, nil
711 }
712 name := node.Name
713 node, err = nc.kubeClient.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
714 if err != nil {
715 logger.Error(nil, "Failed while getting a Node to retry updating node health. Probably Node was deleted", "node", klog.KRef("", name))
716 return false, err
717 }
718 return false, nil
719 }); err != nil {
720 logger.Error(err, "Update health of Node from Controller error, Skipping - no pods will be evicted", "node", klog.KObj(node))
721 return
722 }
723
724
725 if !isNodeExcludedFromDisruptionChecks(node) {
726 zoneToNodeConditionsLock.Lock()
727 zoneToNodeConditions[nodetopology.GetZoneKey(node)] = append(zoneToNodeConditions[nodetopology.GetZoneKey(node)], currentReadyCondition)
728 zoneToNodeConditionsLock.Unlock()
729 }
730
731 if currentReadyCondition != nil {
732 pods, err := nc.getPodsAssignedToNode(node.Name)
733 if err != nil {
734 utilruntime.HandleError(fmt.Errorf("unable to list pods of node %v: %v", node.Name, err))
735 if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
736
737
738
739 nc.nodesToRetry.Store(node.Name, struct{}{})
740 }
741 return
742 }
743 nc.processTaintBaseEviction(ctx, node, &observedReadyCondition)
744
745 _, needsRetry := nc.nodesToRetry.Load(node.Name)
746 switch {
747 case currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue:
748
749 controllerutil.RecordNodeStatusChange(logger, nc.recorder, node, "NodeNotReady")
750 fallthrough
751 case needsRetry && observedReadyCondition.Status != v1.ConditionTrue:
752 if err = controllerutil.MarkPodsNotReady(ctx, nc.kubeClient, nc.recorder, pods, node.Name); err != nil {
753 utilruntime.HandleError(fmt.Errorf("unable to mark all pods NotReady on node %v: %v; queuing for retry", node.Name, err))
754 nc.nodesToRetry.Store(node.Name, struct{}{})
755 return
756 }
757 }
758 }
759 nc.nodesToRetry.Delete(node.Name)
760 }
761
762
763
764
765
766
767 workqueue.ParallelizeUntil(ctx, nc.nodeUpdateWorkerSize, len(nodes), updateNodeFunc)
768
769 nc.handleDisruption(ctx, zoneToNodeConditions, nodes)
770
771 return nil
772 }
773
774 func (nc *Controller) processTaintBaseEviction(ctx context.Context, node *v1.Node, observedReadyCondition *v1.NodeCondition) {
775 decisionTimestamp := nc.now()
776
777 logger := klog.FromContext(ctx)
778 switch observedReadyCondition.Status {
779 case v1.ConditionFalse:
780
781 if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
782 taintToAdd := *NotReadyTaintTemplate
783 if !controllerutil.SwapNodeControllerTaint(ctx, nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
784 logger.Error(nil, "Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle")
785 }
786 } else if nc.markNodeForTainting(node, v1.ConditionFalse) {
787 logger.V(2).Info("Node is NotReady. Adding it to the Taint queue", "node", klog.KObj(node), "timeStamp", decisionTimestamp)
788 }
789 case v1.ConditionUnknown:
790
791 if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
792 taintToAdd := *UnreachableTaintTemplate
793 if !controllerutil.SwapNodeControllerTaint(ctx, nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
794 logger.Error(nil, "Failed to instantly swap NotReadyTaint to UnreachableTaint. Will try again in the next cycle")
795 }
796 } else if nc.markNodeForTainting(node, v1.ConditionUnknown) {
797 logger.V(2).Info("Node is unresponsive. Adding it to the Taint queue", "node", klog.KObj(node), "timeStamp", decisionTimestamp)
798 }
799 case v1.ConditionTrue:
800 removed, err := nc.markNodeAsReachable(ctx, node)
801 if err != nil {
802 logger.Error(nil, "Failed to remove taints from node. Will retry in next iteration", "node", klog.KObj(node))
803 }
804 if removed {
805 logger.V(2).Info("Node is healthy again, removing all taints", "node", klog.KObj(node))
806 }
807 }
808 }
809
810
811
812 const labelNodeDisruptionExclusion = "node.kubernetes.io/exclude-disruption"
813
814 func isNodeExcludedFromDisruptionChecks(node *v1.Node) bool {
815 if _, ok := node.Labels[labelNodeDisruptionExclusion]; ok {
816 return true
817 }
818 return false
819 }
820
821
822
823 func (nc *Controller) tryUpdateNodeHealth(ctx context.Context, node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
824 nodeHealth := nc.nodeHealthMap.getDeepCopy(node.Name)
825 defer func() {
826 nc.nodeHealthMap.set(node.Name, nodeHealth)
827 }()
828
829 var gracePeriod time.Duration
830 var observedReadyCondition v1.NodeCondition
831 _, currentReadyCondition := controllerutil.GetNodeCondition(&node.Status, v1.NodeReady)
832 if currentReadyCondition == nil {
833
834
835
836 observedReadyCondition = v1.NodeCondition{
837 Type: v1.NodeReady,
838 Status: v1.ConditionUnknown,
839 LastHeartbeatTime: node.CreationTimestamp,
840 LastTransitionTime: node.CreationTimestamp,
841 }
842 gracePeriod = nc.nodeStartupGracePeriod
843 if nodeHealth != nil {
844 nodeHealth.status = &node.Status
845 } else {
846 nodeHealth = &nodeHealthData{
847 status: &node.Status,
848 probeTimestamp: node.CreationTimestamp,
849 readyTransitionTimestamp: node.CreationTimestamp,
850 }
851 }
852 } else {
853
854 observedReadyCondition = *currentReadyCondition
855 gracePeriod = nc.nodeMonitorGracePeriod
856 }
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871 var savedCondition *v1.NodeCondition
872 var savedLease *coordv1.Lease
873 if nodeHealth != nil {
874 _, savedCondition = controllerutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
875 savedLease = nodeHealth.lease
876 }
877 logger := klog.FromContext(ctx)
878 if nodeHealth == nil {
879 logger.Info("Missing timestamp for Node. Assuming now as a timestamp", "node", klog.KObj(node))
880 nodeHealth = &nodeHealthData{
881 status: &node.Status,
882 probeTimestamp: nc.now(),
883 readyTransitionTimestamp: nc.now(),
884 }
885 } else if savedCondition == nil && currentReadyCondition != nil {
886 logger.V(1).Info("Creating timestamp entry for newly observed Node", "node", klog.KObj(node))
887 nodeHealth = &nodeHealthData{
888 status: &node.Status,
889 probeTimestamp: nc.now(),
890 readyTransitionTimestamp: nc.now(),
891 }
892 } else if savedCondition != nil && currentReadyCondition == nil {
893 logger.Error(nil, "ReadyCondition was removed from Status of Node", "node", klog.KObj(node))
894
895 nodeHealth = &nodeHealthData{
896 status: &node.Status,
897 probeTimestamp: nc.now(),
898 readyTransitionTimestamp: nc.now(),
899 }
900 } else if savedCondition != nil && currentReadyCondition != nil && savedCondition.LastHeartbeatTime != currentReadyCondition.LastHeartbeatTime {
901 var transitionTime metav1.Time
902
903
904 if savedCondition.LastTransitionTime != currentReadyCondition.LastTransitionTime {
905 logger.V(3).Info("ReadyCondition for Node transitioned from savedCondition to currentReadyCondition", "node", klog.KObj(node), "savedCondition", savedCondition, "currentReadyCondition", currentReadyCondition)
906 transitionTime = nc.now()
907 } else {
908 transitionTime = nodeHealth.readyTransitionTimestamp
909 }
910 if loggerV := logger.V(5); loggerV.Enabled() {
911 loggerV.Info("Node ReadyCondition updated. Updating timestamp", "node", klog.KObj(node), "nodeHealthStatus", nodeHealth.status, "nodeStatus", node.Status)
912 } else {
913 logger.V(3).Info("Node ReadyCondition updated. Updating timestamp", "node", klog.KObj(node))
914 }
915 nodeHealth = &nodeHealthData{
916 status: &node.Status,
917 probeTimestamp: nc.now(),
918 readyTransitionTimestamp: transitionTime,
919 }
920 }
921
922
923
924
925 observedLease, _ := nc.leaseLister.Leases(v1.NamespaceNodeLease).Get(node.Name)
926 if observedLease != nil && (savedLease == nil || savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime)) {
927 nodeHealth.lease = observedLease
928 nodeHealth.probeTimestamp = nc.now()
929 }
930
931 if nc.now().After(nodeHealth.probeTimestamp.Add(gracePeriod)) {
932
933
934
935 nodeConditionTypes := []v1.NodeConditionType{
936 v1.NodeReady,
937 v1.NodeMemoryPressure,
938 v1.NodeDiskPressure,
939 v1.NodePIDPressure,
940
941
942 }
943
944 nowTimestamp := nc.now()
945 for _, nodeConditionType := range nodeConditionTypes {
946 _, currentCondition := controllerutil.GetNodeCondition(&node.Status, nodeConditionType)
947 if currentCondition == nil {
948 logger.V(2).Info("Condition of node was never updated by kubelet", "nodeConditionType", nodeConditionType, "node", klog.KObj(node))
949 node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
950 Type: nodeConditionType,
951 Status: v1.ConditionUnknown,
952 Reason: "NodeStatusNeverUpdated",
953 Message: "Kubelet never posted node status.",
954 LastHeartbeatTime: node.CreationTimestamp,
955 LastTransitionTime: nowTimestamp,
956 })
957 } else {
958 logger.V(2).Info("Node hasn't been updated",
959 "node", klog.KObj(node), "duration", nc.now().Time.Sub(nodeHealth.probeTimestamp.Time), "nodeConditionType", nodeConditionType, "currentCondition", currentCondition)
960 if currentCondition.Status != v1.ConditionUnknown {
961 currentCondition.Status = v1.ConditionUnknown
962 currentCondition.Reason = "NodeStatusUnknown"
963 currentCondition.Message = "Kubelet stopped posting node status."
964 currentCondition.LastTransitionTime = nowTimestamp
965 }
966 }
967 }
968
969 _, currentReadyCondition = controllerutil.GetNodeCondition(&node.Status, v1.NodeReady)
970
971 if !apiequality.Semantic.DeepEqual(currentReadyCondition, &observedReadyCondition) {
972 if _, err := nc.kubeClient.CoreV1().Nodes().UpdateStatus(ctx, node, metav1.UpdateOptions{}); err != nil {
973 logger.Error(err, "Error updating node", "node", klog.KObj(node))
974 return gracePeriod, observedReadyCondition, currentReadyCondition, err
975 }
976 nodeHealth = &nodeHealthData{
977 status: &node.Status,
978 probeTimestamp: nodeHealth.probeTimestamp,
979 readyTransitionTimestamp: nc.now(),
980 lease: observedLease,
981 }
982 return gracePeriod, observedReadyCondition, currentReadyCondition, nil
983 }
984 }
985
986 return gracePeriod, observedReadyCondition, currentReadyCondition, nil
987 }
988
989 func (nc *Controller) handleDisruption(ctx context.Context, zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
990 newZoneStates := map[string]ZoneState{}
991 allAreFullyDisrupted := true
992 logger := klog.FromContext(ctx)
993 for k, v := range zoneToNodeConditions {
994 zoneSize.WithLabelValues(k).Set(float64(len(v)))
995 unhealthy, newState := nc.computeZoneStateFunc(v)
996 zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
997 unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
998 if newState != stateFullDisruption {
999 allAreFullyDisrupted = false
1000 }
1001 newZoneStates[k] = newState
1002 if _, had := nc.zoneStates[k]; !had {
1003 logger.Error(nil, "Setting initial state for unseen zone", "zone", k)
1004 nc.zoneStates[k] = stateInitial
1005 }
1006 }
1007
1008 allWasFullyDisrupted := true
1009 for k, v := range nc.zoneStates {
1010 if _, have := zoneToNodeConditions[k]; !have {
1011 zoneSize.WithLabelValues(k).Set(0)
1012 zoneHealth.WithLabelValues(k).Set(100)
1013 unhealthyNodes.WithLabelValues(k).Set(0)
1014 delete(nc.zoneStates, k)
1015 continue
1016 }
1017 if v != stateFullDisruption {
1018 allWasFullyDisrupted = false
1019 break
1020 }
1021 }
1022
1023
1024
1025
1026
1027
1028 if !allAreFullyDisrupted || !allWasFullyDisrupted {
1029
1030 if allAreFullyDisrupted {
1031 logger.Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode")
1032 for i := range nodes {
1033 _, err := nc.markNodeAsReachable(ctx, nodes[i])
1034 if err != nil {
1035 logger.Error(nil, "Failed to remove taints from Node", "node", klog.KObj(nodes[i]))
1036 }
1037 }
1038
1039 for k := range nc.zoneStates {
1040 nc.zoneNoExecuteTainter[k].SwapLimiter(0)
1041 }
1042 for k := range nc.zoneStates {
1043 nc.zoneStates[k] = stateFullDisruption
1044 }
1045
1046 return
1047 }
1048
1049 if allWasFullyDisrupted {
1050 logger.Info("Controller detected that some Nodes are Ready. Exiting master disruption mode")
1051
1052 now := nc.now()
1053 for i := range nodes {
1054 v := nc.nodeHealthMap.getDeepCopy(nodes[i].Name)
1055 v.probeTimestamp = now
1056 v.readyTransitionTimestamp = now
1057 nc.nodeHealthMap.set(nodes[i].Name, v)
1058 }
1059
1060 for k := range nc.zoneStates {
1061 nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
1062 nc.zoneStates[k] = newZoneStates[k]
1063 }
1064 return
1065 }
1066
1067
1068 for k, v := range nc.zoneStates {
1069 newState := newZoneStates[k]
1070 if v == newState {
1071 continue
1072 }
1073 logger.Info("Controller detected that zone is now in new state", "zone", k, "newState", newState)
1074 nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
1075 nc.zoneStates[k] = newState
1076 }
1077 }
1078 }
1079
1080 func (nc *Controller) podUpdated(oldPod, newPod *v1.Pod) {
1081 if newPod == nil {
1082 return
1083 }
1084 if len(newPod.Spec.NodeName) != 0 && (oldPod == nil || newPod.Spec.NodeName != oldPod.Spec.NodeName) {
1085 podItem := podUpdateItem{newPod.Namespace, newPod.Name}
1086 nc.podUpdateQueue.Add(podItem)
1087 }
1088 }
1089
1090 func (nc *Controller) doPodProcessingWorker(ctx context.Context) {
1091 for {
1092 obj, shutdown := nc.podUpdateQueue.Get()
1093
1094
1095 if shutdown {
1096 return
1097 }
1098
1099 podItem := obj.(podUpdateItem)
1100 nc.processPod(ctx, podItem)
1101 }
1102 }
1103
1104
1105
1106
1107
1108 func (nc *Controller) processPod(ctx context.Context, podItem podUpdateItem) {
1109 defer nc.podUpdateQueue.Done(podItem)
1110 pod, err := nc.podLister.Pods(podItem.namespace).Get(podItem.name)
1111 logger := klog.FromContext(ctx)
1112 if err != nil {
1113 if apierrors.IsNotFound(err) {
1114
1115 return
1116 }
1117 logger.Info("Failed to read pod", "pod", klog.KRef(podItem.namespace, podItem.name), "err", err)
1118 nc.podUpdateQueue.AddRateLimited(podItem)
1119 return
1120 }
1121
1122 nodeName := pod.Spec.NodeName
1123
1124 nodeHealth := nc.nodeHealthMap.getDeepCopy(nodeName)
1125 if nodeHealth == nil {
1126
1127 return
1128 }
1129
1130 _, err = nc.nodeLister.Get(nodeName)
1131 if err != nil {
1132 logger.Info("Failed to read node", "node", klog.KRef("", nodeName), "err", err)
1133 nc.podUpdateQueue.AddRateLimited(podItem)
1134 return
1135 }
1136
1137 _, currentReadyCondition := controllerutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
1138 if currentReadyCondition == nil {
1139
1140
1141
1142 return
1143 }
1144
1145 pods := []*v1.Pod{pod}
1146 if currentReadyCondition.Status != v1.ConditionTrue {
1147 if err := controllerutil.MarkPodsNotReady(ctx, nc.kubeClient, nc.recorder, pods, nodeName); err != nil {
1148 logger.Info("Unable to mark pod NotReady on node", "pod", klog.KRef(podItem.namespace, podItem.name), "node", klog.KRef("", nodeName), "err", err)
1149 nc.podUpdateQueue.AddRateLimited(podItem)
1150 }
1151 }
1152 }
1153
1154 func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) {
1155 switch state {
1156 case stateNormal:
1157 nc.zoneNoExecuteTainter[zone].SwapLimiter(nc.evictionLimiterQPS)
1158 case statePartialDisruption:
1159 nc.zoneNoExecuteTainter[zone].SwapLimiter(
1160 nc.enterPartialDisruptionFunc(zoneSize))
1161 case stateFullDisruption:
1162 nc.zoneNoExecuteTainter[zone].SwapLimiter(
1163 nc.enterFullDisruptionFunc(zoneSize))
1164 }
1165 }
1166
1167
1168
1169
1170
1171 func (nc *Controller) classifyNodes(allNodes []*v1.Node) (added, deleted, newZoneRepresentatives []*v1.Node) {
1172 for i := range allNodes {
1173 if _, has := nc.knownNodeSet[allNodes[i].Name]; !has {
1174 added = append(added, allNodes[i])
1175 } else {
1176
1177 zone := nodetopology.GetZoneKey(allNodes[i])
1178 if _, found := nc.zoneStates[zone]; !found {
1179 newZoneRepresentatives = append(newZoneRepresentatives, allNodes[i])
1180 }
1181 }
1182 }
1183
1184
1185
1186 if len(nc.knownNodeSet)+len(added) != len(allNodes) {
1187 knowSetCopy := map[string]*v1.Node{}
1188 for k, v := range nc.knownNodeSet {
1189 knowSetCopy[k] = v
1190 }
1191 for i := range allNodes {
1192 delete(knowSetCopy, allNodes[i].Name)
1193 }
1194 for i := range knowSetCopy {
1195 deleted = append(deleted, knowSetCopy[i])
1196 }
1197 }
1198 return
1199 }
1200
1201
1202
1203 func (nc *Controller) HealthyQPSFunc(nodeNum int) float32 {
1204 return nc.evictionLimiterQPS
1205 }
1206
1207
1208
1209 func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 {
1210 if int32(nodeNum) > nc.largeClusterThreshold {
1211 return nc.secondaryEvictionLimiterQPS
1212 }
1213 return 0
1214 }
1215
1216
1217 func (nc *Controller) addPodEvictorForNewZone(logger klog.Logger, node *v1.Node) {
1218 nc.evictorLock.Lock()
1219 defer nc.evictorLock.Unlock()
1220 zone := nodetopology.GetZoneKey(node)
1221 if _, found := nc.zoneStates[zone]; !found {
1222 nc.zoneStates[zone] = stateInitial
1223 nc.zoneNoExecuteTainter[zone] =
1224 scheduler.NewRateLimitedTimedQueue(
1225 flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
1226
1227 logger.Info("Initializing eviction metric for zone", "zone", zone)
1228 evictionsTotal.WithLabelValues(zone).Add(0)
1229 }
1230 }
1231
1232 func (nc *Controller) markNodeForTainting(node *v1.Node, status v1.ConditionStatus) bool {
1233 nc.evictorLock.Lock()
1234 defer nc.evictorLock.Unlock()
1235 if status == v1.ConditionFalse {
1236 if !taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
1237 nc.zoneNoExecuteTainter[nodetopology.GetZoneKey(node)].Remove(node.Name)
1238 }
1239 }
1240
1241 if status == v1.ConditionUnknown {
1242 if !taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
1243 nc.zoneNoExecuteTainter[nodetopology.GetZoneKey(node)].Remove(node.Name)
1244 }
1245 }
1246
1247 return nc.zoneNoExecuteTainter[nodetopology.GetZoneKey(node)].Add(node.Name, string(node.UID))
1248 }
1249
1250 func (nc *Controller) markNodeAsReachable(ctx context.Context, node *v1.Node) (bool, error) {
1251 err := controller.RemoveTaintOffNode(ctx, nc.kubeClient, node.Name, node, UnreachableTaintTemplate)
1252 logger := klog.FromContext(ctx)
1253 if err != nil {
1254 logger.Error(err, "Failed to remove taint from node", "node", klog.KObj(node))
1255 return false, err
1256 }
1257 err = controller.RemoveTaintOffNode(ctx, nc.kubeClient, node.Name, node, NotReadyTaintTemplate)
1258 if err != nil {
1259 logger.Error(err, "Failed to remove taint from node", "node", klog.KObj(node))
1260 return false, err
1261 }
1262 nc.evictorLock.Lock()
1263 defer nc.evictorLock.Unlock()
1264
1265 return nc.zoneNoExecuteTainter[nodetopology.GetZoneKey(node)].Remove(node.Name), nil
1266 }
1267
1268
1269
1270
1271
1272
1273 func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, ZoneState) {
1274 readyNodes := 0
1275 notReadyNodes := 0
1276 for i := range nodeReadyConditions {
1277 if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == v1.ConditionTrue {
1278 readyNodes++
1279 } else {
1280 notReadyNodes++
1281 }
1282 }
1283 switch {
1284 case readyNodes == 0 && notReadyNodes > 0:
1285 return notReadyNodes, stateFullDisruption
1286 case notReadyNodes > 2 && float32(notReadyNodes)/float32(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold:
1287 return notReadyNodes, statePartialDisruption
1288 default:
1289 return notReadyNodes, stateNormal
1290 }
1291 }
1292
1293
1294 func (nc *Controller) reconcileNodeLabels(ctx context.Context, nodeName string) error {
1295 node, err := nc.nodeLister.Get(nodeName)
1296 if err != nil {
1297
1298 if apierrors.IsNotFound(err) {
1299 return nil
1300 }
1301 return err
1302 }
1303
1304 if node.Labels == nil {
1305
1306 return nil
1307 }
1308
1309 labelsToUpdate := map[string]string{}
1310 for _, r := range labelReconcileInfo {
1311 primaryValue, primaryExists := node.Labels[r.primaryKey]
1312 secondaryValue, secondaryExists := node.Labels[r.secondaryKey]
1313
1314 if !primaryExists {
1315
1316
1317
1318 continue
1319 }
1320 if secondaryExists && primaryValue != secondaryValue {
1321
1322
1323 labelsToUpdate[r.secondaryKey] = primaryValue
1324
1325 } else if !secondaryExists && r.ensureSecondaryExists {
1326
1327 labelsToUpdate[r.secondaryKey] = primaryValue
1328 }
1329 }
1330
1331 if len(labelsToUpdate) == 0 {
1332 return nil
1333 }
1334 if !controllerutil.AddOrUpdateLabelsOnNode(ctx, nc.kubeClient, labelsToUpdate, node) {
1335 return fmt.Errorf("failed update labels for node %+v", node)
1336 }
1337 return nil
1338 }
1339
View as plain text