1
16
17 package daemon
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "sort"
24 "sync"
25 "time"
26
27 apps "k8s.io/api/apps/v1"
28 v1 "k8s.io/api/core/v1"
29 apiequality "k8s.io/apimachinery/pkg/api/equality"
30 apierrors "k8s.io/apimachinery/pkg/api/errors"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/labels"
33 utilerrors "k8s.io/apimachinery/pkg/util/errors"
34 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
35 "k8s.io/apimachinery/pkg/util/wait"
36 appsinformers "k8s.io/client-go/informers/apps/v1"
37 coreinformers "k8s.io/client-go/informers/core/v1"
38 clientset "k8s.io/client-go/kubernetes"
39 "k8s.io/client-go/kubernetes/scheme"
40 unversionedapps "k8s.io/client-go/kubernetes/typed/apps/v1"
41 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
42 appslisters "k8s.io/client-go/listers/apps/v1"
43 corelisters "k8s.io/client-go/listers/core/v1"
44 "k8s.io/client-go/tools/cache"
45 "k8s.io/client-go/tools/record"
46 "k8s.io/client-go/util/flowcontrol"
47 "k8s.io/client-go/util/workqueue"
48 v1helper "k8s.io/component-helpers/scheduling/corev1"
49 "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
50 "k8s.io/klog/v2"
51 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
52 "k8s.io/kubernetes/pkg/controller"
53 "k8s.io/kubernetes/pkg/controller/daemon/util"
54 )
55
56 const (
57
58
59 BurstReplicas = 250
60
61
62 StatusUpdateRetries = 1
63
64
65 BackoffGCInterval = 1 * time.Minute
66 )
67
68
69 const (
70
71 SelectingAllReason = "SelectingAll"
72
73 FailedPlacementReason = "FailedPlacement"
74
75 FailedDaemonPodReason = "FailedDaemonPod"
76
77 SucceededDaemonPodReason = "SucceededDaemonPod"
78 )
79
80
81 var controllerKind = apps.SchemeGroupVersion.WithKind("DaemonSet")
82
83
84
85 type DaemonSetsController struct {
86 kubeClient clientset.Interface
87
88 eventBroadcaster record.EventBroadcaster
89 eventRecorder record.EventRecorder
90
91 podControl controller.PodControlInterface
92 crControl controller.ControllerRevisionControlInterface
93
94
95
96 burstReplicas int
97
98
99 syncHandler func(ctx context.Context, dsKey string) error
100
101 enqueueDaemonSet func(ds *apps.DaemonSet)
102
103 expectations controller.ControllerExpectationsInterface
104
105 dsLister appslisters.DaemonSetLister
106
107
108 dsStoreSynced cache.InformerSynced
109
110 historyLister appslisters.ControllerRevisionLister
111
112
113 historyStoreSynced cache.InformerSynced
114
115 podLister corelisters.PodLister
116
117
118 podStoreSynced cache.InformerSynced
119
120 nodeLister corelisters.NodeLister
121
122
123 nodeStoreSynced cache.InformerSynced
124
125
126 queue workqueue.RateLimitingInterface
127
128 failedPodsBackoff *flowcontrol.Backoff
129 }
130
131
132 func NewDaemonSetsController(
133 ctx context.Context,
134 daemonSetInformer appsinformers.DaemonSetInformer,
135 historyInformer appsinformers.ControllerRevisionInformer,
136 podInformer coreinformers.PodInformer,
137 nodeInformer coreinformers.NodeInformer,
138 kubeClient clientset.Interface,
139 failedPodsBackoff *flowcontrol.Backoff,
140 ) (*DaemonSetsController, error) {
141 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
142 logger := klog.FromContext(ctx)
143 dsc := &DaemonSetsController{
144 kubeClient: kubeClient,
145 eventBroadcaster: eventBroadcaster,
146 eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
147 podControl: controller.RealPodControl{
148 KubeClient: kubeClient,
149 Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
150 },
151 crControl: controller.RealControllerRevisionControl{
152 KubeClient: kubeClient,
153 },
154 burstReplicas: BurstReplicas,
155 expectations: controller.NewControllerExpectations(),
156 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
157 }
158
159 daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
160 AddFunc: func(obj interface{}) {
161 dsc.addDaemonset(logger, obj)
162 },
163 UpdateFunc: func(oldObj, newObj interface{}) {
164 dsc.updateDaemonset(logger, oldObj, newObj)
165 },
166 DeleteFunc: func(obj interface{}) {
167 dsc.deleteDaemonset(logger, obj)
168 },
169 })
170 dsc.dsLister = daemonSetInformer.Lister()
171 dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
172
173 historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
174 AddFunc: func(obj interface{}) {
175 dsc.addHistory(logger, obj)
176 },
177 UpdateFunc: func(oldObj, newObj interface{}) {
178 dsc.updateHistory(logger, oldObj, newObj)
179 },
180 DeleteFunc: func(obj interface{}) {
181 dsc.deleteHistory(logger, obj)
182 },
183 })
184 dsc.historyLister = historyInformer.Lister()
185 dsc.historyStoreSynced = historyInformer.Informer().HasSynced
186
187
188
189 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
190 AddFunc: func(obj interface{}) {
191 dsc.addPod(logger, obj)
192 },
193 UpdateFunc: func(oldObj, newObj interface{}) {
194 dsc.updatePod(logger, oldObj, newObj)
195 },
196 DeleteFunc: func(obj interface{}) {
197 dsc.deletePod(logger, obj)
198 },
199 })
200 dsc.podLister = podInformer.Lister()
201 dsc.podStoreSynced = podInformer.Informer().HasSynced
202
203 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
204 AddFunc: func(obj interface{}) {
205 dsc.addNode(logger, obj)
206 },
207 UpdateFunc: func(oldObj, newObj interface{}) {
208 dsc.updateNode(logger, oldObj, newObj)
209 },
210 },
211 )
212 dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
213 dsc.nodeLister = nodeInformer.Lister()
214
215 dsc.syncHandler = dsc.syncDaemonSet
216 dsc.enqueueDaemonSet = dsc.enqueue
217
218 dsc.failedPodsBackoff = failedPodsBackoff
219
220 return dsc, nil
221 }
222
223 func (dsc *DaemonSetsController) addDaemonset(logger klog.Logger, obj interface{}) {
224 ds := obj.(*apps.DaemonSet)
225 logger.V(4).Info("Adding daemon set", "daemonset", klog.KObj(ds))
226 dsc.enqueueDaemonSet(ds)
227 }
228
229 func (dsc *DaemonSetsController) updateDaemonset(logger klog.Logger, cur, old interface{}) {
230 oldDS := old.(*apps.DaemonSet)
231 curDS := cur.(*apps.DaemonSet)
232
233
234 if curDS.UID != oldDS.UID {
235 key, err := controller.KeyFunc(oldDS)
236 if err != nil {
237 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldDS, err))
238 return
239 }
240 dsc.deleteDaemonset(logger, cache.DeletedFinalStateUnknown{
241 Key: key,
242 Obj: oldDS,
243 })
244 }
245
246 logger.V(4).Info("Updating daemon set", "daemonset", klog.KObj(oldDS))
247 dsc.enqueueDaemonSet(curDS)
248 }
249
250 func (dsc *DaemonSetsController) deleteDaemonset(logger klog.Logger, obj interface{}) {
251 ds, ok := obj.(*apps.DaemonSet)
252 if !ok {
253 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
254 if !ok {
255 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
256 return
257 }
258 ds, ok = tombstone.Obj.(*apps.DaemonSet)
259 if !ok {
260 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a DaemonSet %#v", obj))
261 return
262 }
263 }
264 logger.V(4).Info("Deleting daemon set", "daemonset", klog.KObj(ds))
265
266 key, err := controller.KeyFunc(ds)
267 if err != nil {
268 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ds, err))
269 return
270 }
271
272
273 dsc.expectations.DeleteExpectations(logger, key)
274
275 dsc.queue.Add(key)
276 }
277
278
279 func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
280 defer utilruntime.HandleCrash()
281
282 dsc.eventBroadcaster.StartStructuredLogging(3)
283 dsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dsc.kubeClient.CoreV1().Events("")})
284 defer dsc.eventBroadcaster.Shutdown()
285
286 defer dsc.queue.ShutDown()
287
288 logger := klog.FromContext(ctx)
289 logger.Info("Starting daemon sets controller")
290 defer logger.Info("Shutting down daemon sets controller")
291
292 if !cache.WaitForNamedCacheSync("daemon sets", ctx.Done(), dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
293 return
294 }
295
296 for i := 0; i < workers; i++ {
297 go wait.UntilWithContext(ctx, dsc.runWorker, time.Second)
298 }
299
300 go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, ctx.Done())
301
302 <-ctx.Done()
303 }
304
305 func (dsc *DaemonSetsController) runWorker(ctx context.Context) {
306 for dsc.processNextWorkItem(ctx) {
307 }
308 }
309
310
311 func (dsc *DaemonSetsController) processNextWorkItem(ctx context.Context) bool {
312 dsKey, quit := dsc.queue.Get()
313 if quit {
314 return false
315 }
316 defer dsc.queue.Done(dsKey)
317
318 err := dsc.syncHandler(ctx, dsKey.(string))
319 if err == nil {
320 dsc.queue.Forget(dsKey)
321 return true
322 }
323
324 utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
325 dsc.queue.AddRateLimited(dsKey)
326
327 return true
328 }
329
330 func (dsc *DaemonSetsController) enqueue(ds *apps.DaemonSet) {
331 key, err := controller.KeyFunc(ds)
332 if err != nil {
333 utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
334 return
335 }
336
337
338 dsc.queue.Add(key)
339 }
340
341 func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after time.Duration) {
342 key, err := controller.KeyFunc(obj)
343 if err != nil {
344 utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
345 return
346 }
347
348
349 dsc.queue.AddAfter(key, after)
350 }
351
352
353 func (dsc *DaemonSetsController) getDaemonSetsForPod(pod *v1.Pod) []*apps.DaemonSet {
354 sets, err := dsc.dsLister.GetPodDaemonSets(pod)
355 if err != nil {
356 return nil
357 }
358 if len(sets) > 1 {
359
360
361 utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels))
362 }
363 return sets
364 }
365
366
367
368 func (dsc *DaemonSetsController) getDaemonSetsForHistory(logger klog.Logger, history *apps.ControllerRevision) []*apps.DaemonSet {
369 daemonSets, err := dsc.dsLister.GetHistoryDaemonSets(history)
370 if err != nil || len(daemonSets) == 0 {
371 return nil
372 }
373 if len(daemonSets) > 1 {
374
375
376 logger.V(4).Info("Found more than one DaemonSet selecting the ControllerRevision. This is potentially a user error",
377 "controllerRevision", klog.KObj(history), "labels", history.Labels)
378 }
379 return daemonSets
380 }
381
382
383
384 func (dsc *DaemonSetsController) addHistory(logger klog.Logger, obj interface{}) {
385 history := obj.(*apps.ControllerRevision)
386 if history.DeletionTimestamp != nil {
387
388
389 dsc.deleteHistory(logger, history)
390 return
391 }
392
393
394 if controllerRef := metav1.GetControllerOf(history); controllerRef != nil {
395 ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
396 if ds == nil {
397 return
398 }
399 logger.V(4).Info("Observed a ControllerRevision", "controllerRevision", klog.KObj(history))
400 return
401 }
402
403
404
405 daemonSets := dsc.getDaemonSetsForHistory(logger, history)
406 if len(daemonSets) == 0 {
407 return
408 }
409 logger.V(4).Info("Orphan ControllerRevision added", "controllerRevision", klog.KObj(history))
410 for _, ds := range daemonSets {
411 dsc.enqueueDaemonSet(ds)
412 }
413 }
414
415
416
417
418 func (dsc *DaemonSetsController) updateHistory(logger klog.Logger, old, cur interface{}) {
419 curHistory := cur.(*apps.ControllerRevision)
420 oldHistory := old.(*apps.ControllerRevision)
421 if curHistory.ResourceVersion == oldHistory.ResourceVersion {
422
423 return
424 }
425
426 curControllerRef := metav1.GetControllerOf(curHistory)
427 oldControllerRef := metav1.GetControllerOf(oldHistory)
428 controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
429 if controllerRefChanged && oldControllerRef != nil {
430
431 if ds := dsc.resolveControllerRef(oldHistory.Namespace, oldControllerRef); ds != nil {
432 dsc.enqueueDaemonSet(ds)
433 }
434 }
435
436
437 if curControllerRef != nil {
438 ds := dsc.resolveControllerRef(curHistory.Namespace, curControllerRef)
439 if ds == nil {
440 return
441 }
442 logger.V(4).Info("Observed an update to a ControllerRevision", "controllerRevision", klog.KObj(curHistory))
443 dsc.enqueueDaemonSet(ds)
444 return
445 }
446
447
448
449 labelChanged := !reflect.DeepEqual(curHistory.Labels, oldHistory.Labels)
450 if labelChanged || controllerRefChanged {
451 daemonSets := dsc.getDaemonSetsForHistory(logger, curHistory)
452 if len(daemonSets) == 0 {
453 return
454 }
455 logger.V(4).Info("Orphan ControllerRevision updated", "controllerRevision", klog.KObj(curHistory))
456 for _, ds := range daemonSets {
457 dsc.enqueueDaemonSet(ds)
458 }
459 }
460 }
461
462
463
464
465 func (dsc *DaemonSetsController) deleteHistory(logger klog.Logger, obj interface{}) {
466 history, ok := obj.(*apps.ControllerRevision)
467
468
469
470
471
472 if !ok {
473 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
474 if !ok {
475 utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
476 return
477 }
478 history, ok = tombstone.Obj.(*apps.ControllerRevision)
479 if !ok {
480 utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ControllerRevision %#v", obj))
481 return
482 }
483 }
484
485 controllerRef := metav1.GetControllerOf(history)
486 if controllerRef == nil {
487
488 return
489 }
490 ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
491 if ds == nil {
492 return
493 }
494 logger.V(4).Info("ControllerRevision deleted", "controllerRevision", klog.KObj(history))
495 dsc.enqueueDaemonSet(ds)
496 }
497
498 func (dsc *DaemonSetsController) addPod(logger klog.Logger, obj interface{}) {
499 pod := obj.(*v1.Pod)
500
501 if pod.DeletionTimestamp != nil {
502
503
504 dsc.deletePod(logger, pod)
505 return
506 }
507
508
509 if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
510 ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
511 if ds == nil {
512 return
513 }
514 dsKey, err := controller.KeyFunc(ds)
515 if err != nil {
516 return
517 }
518 logger.V(4).Info("Pod added", "pod", klog.KObj(pod))
519 dsc.expectations.CreationObserved(logger, dsKey)
520 dsc.enqueueDaemonSet(ds)
521 return
522 }
523
524
525
526
527
528 dss := dsc.getDaemonSetsForPod(pod)
529 if len(dss) == 0 {
530 return
531 }
532 logger.V(4).Info("Orphan Pod added", "pod", klog.KObj(pod))
533 for _, ds := range dss {
534 dsc.enqueueDaemonSet(ds)
535 }
536 }
537
538
539
540
541 func (dsc *DaemonSetsController) updatePod(logger klog.Logger, old, cur interface{}) {
542 curPod := cur.(*v1.Pod)
543 oldPod := old.(*v1.Pod)
544 if curPod.ResourceVersion == oldPod.ResourceVersion {
545
546
547 return
548 }
549
550 if curPod.DeletionTimestamp != nil {
551
552
553
554
555 dsc.deletePod(logger, curPod)
556 return
557 }
558
559 curControllerRef := metav1.GetControllerOf(curPod)
560 oldControllerRef := metav1.GetControllerOf(oldPod)
561 controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
562 if controllerRefChanged && oldControllerRef != nil {
563
564 if ds := dsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); ds != nil {
565 dsc.enqueueDaemonSet(ds)
566 }
567 }
568
569
570 if curControllerRef != nil {
571 ds := dsc.resolveControllerRef(curPod.Namespace, curControllerRef)
572 if ds == nil {
573 return
574 }
575 logger.V(4).Info("Pod updated", "pod", klog.KObj(curPod))
576 dsc.enqueueDaemonSet(ds)
577 changedToReady := !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod)
578
579 if changedToReady && ds.Spec.MinReadySeconds > 0 {
580
581
582 dsc.enqueueDaemonSetAfter(ds, (time.Duration(ds.Spec.MinReadySeconds)*time.Second)+time.Second)
583 }
584 return
585 }
586
587
588
589 dss := dsc.getDaemonSetsForPod(curPod)
590 if len(dss) == 0 {
591 return
592 }
593 logger.V(4).Info("Orphan Pod updated", "pod", klog.KObj(curPod))
594 labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
595 if labelChanged || controllerRefChanged {
596 for _, ds := range dss {
597 dsc.enqueueDaemonSet(ds)
598 }
599 }
600 }
601
602 func (dsc *DaemonSetsController) deletePod(logger klog.Logger, obj interface{}) {
603 pod, ok := obj.(*v1.Pod)
604
605
606
607
608
609 if !ok {
610 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
611 if !ok {
612 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
613 return
614 }
615 pod, ok = tombstone.Obj.(*v1.Pod)
616 if !ok {
617 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
618 return
619 }
620 }
621
622 controllerRef := metav1.GetControllerOf(pod)
623 if controllerRef == nil {
624
625 return
626 }
627 ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
628 if ds == nil {
629 return
630 }
631 dsKey, err := controller.KeyFunc(ds)
632 if err != nil {
633 return
634 }
635 logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod))
636 dsc.expectations.DeletionObserved(logger, dsKey)
637 dsc.enqueueDaemonSet(ds)
638 }
639
640 func (dsc *DaemonSetsController) addNode(logger klog.Logger, obj interface{}) {
641
642 dsList, err := dsc.dsLister.List(labels.Everything())
643 if err != nil {
644 logger.V(4).Info("Error enqueueing daemon sets", "err", err)
645 return
646 }
647 node := obj.(*v1.Node)
648 for _, ds := range dsList {
649 if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); shouldRun {
650 dsc.enqueueDaemonSet(ds)
651 }
652 }
653 }
654
655
656
657 func shouldIgnoreNodeUpdate(oldNode, curNode v1.Node) bool {
658 return apiequality.Semantic.DeepEqual(oldNode.Labels, curNode.Labels) &&
659 apiequality.Semantic.DeepEqual(oldNode.Spec.Taints, curNode.Spec.Taints)
660 }
661
662 func (dsc *DaemonSetsController) updateNode(logger klog.Logger, old, cur interface{}) {
663 oldNode := old.(*v1.Node)
664 curNode := cur.(*v1.Node)
665 if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
666 return
667 }
668
669 dsList, err := dsc.dsLister.List(labels.Everything())
670 if err != nil {
671 logger.V(4).Info("Error listing daemon sets", "err", err)
672 return
673 }
674
675 for _, ds := range dsList {
676
677 oldShouldRun, oldShouldContinueRunning := NodeShouldRunDaemonPod(oldNode, ds)
678 currentShouldRun, currentShouldContinueRunning := NodeShouldRunDaemonPod(curNode, ds)
679 if (oldShouldRun != currentShouldRun) || (oldShouldContinueRunning != currentShouldContinueRunning) {
680 dsc.enqueueDaemonSet(ds)
681 }
682 }
683 }
684
685
686
687
688
689 func (dsc *DaemonSetsController) getDaemonPods(ctx context.Context, ds *apps.DaemonSet) ([]*v1.Pod, error) {
690 selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
691 if err != nil {
692 return nil, err
693 }
694
695
696
697 pods, err := dsc.podLister.Pods(ds.Namespace).List(labels.Everything())
698 if err != nil {
699 return nil, err
700 }
701
702
703 dsNotDeleted := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
704 fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{})
705 if err != nil {
706 return nil, err
707 }
708 if fresh.UID != ds.UID {
709 return nil, fmt.Errorf("original DaemonSet %v/%v is gone: got uid %v, wanted %v", ds.Namespace, ds.Name, fresh.UID, ds.UID)
710 }
711 return fresh, nil
712 })
713
714
715 cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind, dsNotDeleted)
716 return cm.ClaimPods(ctx, pods)
717 }
718
719
720
721
722
723 func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *apps.DaemonSet, includeDeletedTerminal bool) (map[string][]*v1.Pod, error) {
724 claimedPods, err := dsc.getDaemonPods(ctx, ds)
725 if err != nil {
726 return nil, err
727 }
728
729 nodeToDaemonPods := make(map[string][]*v1.Pod)
730 logger := klog.FromContext(ctx)
731 for _, pod := range claimedPods {
732 if !includeDeletedTerminal && podutil.IsPodTerminal(pod) && pod.DeletionTimestamp != nil {
733
734
735
736 continue
737 }
738 nodeName, err := util.GetTargetNodeName(pod)
739 if err != nil {
740 logger.V(4).Info("Failed to get target node name of Pod in DaemonSet",
741 "pod", klog.KObj(pod), "daemonset", klog.KObj(ds))
742 continue
743 }
744
745 nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
746 }
747
748 return nodeToDaemonPods, nil
749 }
750
751
752
753
754 func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.DaemonSet {
755
756
757 if controllerRef.Kind != controllerKind.Kind {
758 return nil
759 }
760 ds, err := dsc.dsLister.DaemonSets(namespace).Get(controllerRef.Name)
761 if err != nil {
762 return nil
763 }
764 if ds.UID != controllerRef.UID {
765
766
767 return nil
768 }
769 return ds
770 }
771
772
773
774
775
776 func (dsc *DaemonSetsController) podsShouldBeOnNode(
777 logger klog.Logger,
778 node *v1.Node,
779 nodeToDaemonPods map[string][]*v1.Pod,
780 ds *apps.DaemonSet,
781 hash string,
782 ) (nodesNeedingDaemonPods, podsToDelete []string) {
783
784 shouldRun, shouldContinueRunning := NodeShouldRunDaemonPod(node, ds)
785 daemonPods, exists := nodeToDaemonPods[node.Name]
786
787 switch {
788 case shouldRun && !exists:
789
790 nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
791 case shouldContinueRunning:
792
793
794 var daemonPodsRunning []*v1.Pod
795 for _, pod := range daemonPods {
796 if pod.DeletionTimestamp != nil {
797 continue
798 }
799 if pod.Status.Phase == v1.PodFailed {
800
801
802 backoffKey := failedPodsBackoffKey(ds, node.Name)
803
804 now := dsc.failedPodsBackoff.Clock.Now()
805 inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
806 if inBackoff {
807 delay := dsc.failedPodsBackoff.Get(backoffKey)
808 logger.V(4).Info("Deleting failed pod on node has been limited by backoff",
809 "pod", klog.KObj(pod), "node", klog.KObj(node), "currentDelay", delay)
810 dsc.enqueueDaemonSetAfter(ds, delay)
811 continue
812 }
813
814 dsc.failedPodsBackoff.Next(backoffKey, now)
815
816 msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
817 logger.V(2).Info("Found failed daemon pod on node, will try to kill it", "pod", klog.KObj(pod), "node", klog.KObj(node))
818
819 dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
820 podsToDelete = append(podsToDelete, pod.Name)
821 } else if pod.Status.Phase == v1.PodSucceeded {
822 msg := fmt.Sprintf("Found succeeded daemon pod %s/%s on node %s, will try to delete it", pod.Namespace, pod.Name, node.Name)
823 logger.V(2).Info("Found succeeded daemon pod on node, will try to delete it", "pod", klog.KObj(pod), "node", klog.KObj(node))
824
825 dsc.eventRecorder.Eventf(ds, v1.EventTypeNormal, SucceededDaemonPodReason, msg)
826 podsToDelete = append(podsToDelete, pod.Name)
827 } else {
828 daemonPodsRunning = append(daemonPodsRunning, pod)
829 }
830 }
831
832
833 if !util.AllowsSurge(ds) {
834 if len(daemonPodsRunning) <= 1 {
835
836 break
837 }
838
839 sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
840 for i := 1; i < len(daemonPodsRunning); i++ {
841 podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
842 }
843 break
844 }
845
846 if len(daemonPodsRunning) <= 1 {
847
848 if len(daemonPodsRunning) == 0 && shouldRun {
849
850 nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
851 }
852 break
853 }
854
855
856
857
858 var oldestNewPod, oldestOldPod *v1.Pod
859 sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
860 for _, pod := range daemonPodsRunning {
861 if pod.Labels[apps.ControllerRevisionHashLabelKey] == hash {
862 if oldestNewPod == nil {
863 oldestNewPod = pod
864 continue
865 }
866 } else {
867 if oldestOldPod == nil {
868 oldestOldPod = pod
869 continue
870 }
871 }
872 podsToDelete = append(podsToDelete, pod.Name)
873 }
874 if oldestNewPod != nil && oldestOldPod != nil {
875 switch {
876 case !podutil.IsPodReady(oldestOldPod):
877 logger.V(5).Info("Pod from daemonset is no longer ready and will be replaced with newer pod", "oldPod", klog.KObj(oldestOldPod), "daemonset", klog.KObj(ds), "newPod", klog.KObj(oldestNewPod))
878 podsToDelete = append(podsToDelete, oldestOldPod.Name)
879 case podutil.IsPodAvailable(oldestNewPod, ds.Spec.MinReadySeconds, metav1.Time{Time: dsc.failedPodsBackoff.Clock.Now()}):
880 logger.V(5).Info("Pod from daemonset is now ready and will replace older pod", "newPod", klog.KObj(oldestNewPod), "daemonset", klog.KObj(ds), "oldPod", klog.KObj(oldestOldPod))
881 podsToDelete = append(podsToDelete, oldestOldPod.Name)
882 }
883 }
884
885 case !shouldContinueRunning && exists:
886
887 for _, pod := range daemonPods {
888 if pod.DeletionTimestamp != nil {
889 continue
890 }
891 podsToDelete = append(podsToDelete, pod.Name)
892 }
893 }
894
895 return nodesNeedingDaemonPods, podsToDelete
896 }
897
898 func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash, key string, old []*apps.ControllerRevision) error {
899 err := dsc.manage(ctx, ds, nodeList, hash)
900 if err != nil {
901 return err
902 }
903
904
905 if dsc.expectations.SatisfiedExpectations(klog.FromContext(ctx), key) {
906 switch ds.Spec.UpdateStrategy.Type {
907 case apps.OnDeleteDaemonSetStrategyType:
908 case apps.RollingUpdateDaemonSetStrategyType:
909 err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
910 }
911 if err != nil {
912 return err
913 }
914 }
915
916 err = dsc.cleanupHistory(ctx, ds, old)
917 if err != nil {
918 return fmt.Errorf("failed to clean up revisions of DaemonSet: %w", err)
919 }
920
921 return nil
922 }
923
924
925
926
927
928 func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
929
930 nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
931 if err != nil {
932 return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
933 }
934
935
936
937 logger := klog.FromContext(ctx)
938 var nodesNeedingDaemonPods, podsToDelete []string
939 for _, node := range nodeList {
940 nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
941 logger, node, nodeToDaemonPods, ds, hash)
942
943 nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
944 podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
945 }
946
947
948
949 podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
950
951
952 if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
953 return err
954 }
955
956 return nil
957 }
958
959
960
961 func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
962
963 logger := klog.FromContext(ctx)
964 dsKey, err := controller.KeyFunc(ds)
965 if err != nil {
966 return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
967 }
968
969 createDiff := len(nodesNeedingDaemonPods)
970 deleteDiff := len(podsToDelete)
971
972 if createDiff > dsc.burstReplicas {
973 createDiff = dsc.burstReplicas
974 }
975 if deleteDiff > dsc.burstReplicas {
976 deleteDiff = dsc.burstReplicas
977 }
978
979 dsc.expectations.SetExpectations(logger, dsKey, createDiff, deleteDiff)
980
981
982 errCh := make(chan error, createDiff+deleteDiff)
983
984 logger.V(4).Info("Nodes needing daemon pods for daemon set, creating", "daemonset", klog.KObj(ds), "needCount", nodesNeedingDaemonPods, "createCount", createDiff)
985 createWait := sync.WaitGroup{}
986
987
988 generation, err := util.GetTemplateGeneration(ds)
989 if err != nil {
990 generation = nil
991 }
992 template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
993
994
995
996
997
998
999
1000
1001 batchSize := min(createDiff, controller.SlowStartInitialBatchSize)
1002 for pos := 0; createDiff > pos; batchSize, pos = min(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
1003 errorCount := len(errCh)
1004 createWait.Add(batchSize)
1005 for i := pos; i < pos+batchSize; i++ {
1006 go func(ix int) {
1007 defer createWait.Done()
1008
1009 podTemplate := template.DeepCopy()
1010
1011
1012
1013 podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
1014 podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
1015
1016 err := dsc.podControl.CreatePods(ctx, ds.Namespace, podTemplate,
1017 ds, metav1.NewControllerRef(ds, controllerKind))
1018
1019 if err != nil {
1020 if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
1021
1022
1023 return
1024 }
1025 }
1026 if err != nil {
1027 logger.V(2).Info("Failed creation, decrementing expectations for daemon set", "daemonset", klog.KObj(ds))
1028 dsc.expectations.CreationObserved(logger, dsKey)
1029 errCh <- err
1030 utilruntime.HandleError(err)
1031 }
1032 }(i)
1033 }
1034 createWait.Wait()
1035
1036 skippedPods := createDiff - (batchSize + pos)
1037 if errorCount < len(errCh) && skippedPods > 0 {
1038 logger.V(2).Info("Slow-start failure. Skipping creation pods, decrementing expectations for daemon set", "skippedPods", skippedPods, "daemonset", klog.KObj(ds))
1039 dsc.expectations.LowerExpectations(logger, dsKey, skippedPods, 0)
1040
1041
1042 break
1043 }
1044 }
1045
1046 logger.V(4).Info("Pods to delete for daemon set, deleting", "daemonset", klog.KObj(ds), "toDeleteCount", podsToDelete, "deleteCount", deleteDiff)
1047 deleteWait := sync.WaitGroup{}
1048 deleteWait.Add(deleteDiff)
1049 for i := 0; i < deleteDiff; i++ {
1050 go func(ix int) {
1051 defer deleteWait.Done()
1052 if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
1053 dsc.expectations.DeletionObserved(logger, dsKey)
1054 if !apierrors.IsNotFound(err) {
1055 logger.V(2).Info("Failed deletion, decremented expectations for daemon set", "daemonset", klog.KObj(ds))
1056 errCh <- err
1057 utilruntime.HandleError(err)
1058 }
1059 }
1060 }(i)
1061 }
1062 deleteWait.Wait()
1063
1064
1065 errors := []error{}
1066 close(errCh)
1067 for err := range errCh {
1068 errors = append(errors, err)
1069 }
1070 return utilerrors.NewAggregate(errors)
1071 }
1072
1073 func storeDaemonSetStatus(
1074 ctx context.Context,
1075 dsClient unversionedapps.DaemonSetInterface,
1076 ds *apps.DaemonSet, desiredNumberScheduled,
1077 currentNumberScheduled,
1078 numberMisscheduled,
1079 numberReady,
1080 updatedNumberScheduled,
1081 numberAvailable,
1082 numberUnavailable int,
1083 updateObservedGen bool) error {
1084 if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled &&
1085 int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled &&
1086 int(ds.Status.NumberMisscheduled) == numberMisscheduled &&
1087 int(ds.Status.NumberReady) == numberReady &&
1088 int(ds.Status.UpdatedNumberScheduled) == updatedNumberScheduled &&
1089 int(ds.Status.NumberAvailable) == numberAvailable &&
1090 int(ds.Status.NumberUnavailable) == numberUnavailable &&
1091 ds.Status.ObservedGeneration >= ds.Generation {
1092 return nil
1093 }
1094
1095 toUpdate := ds.DeepCopy()
1096
1097 var updateErr, getErr error
1098 for i := 0; ; i++ {
1099 if updateObservedGen {
1100 toUpdate.Status.ObservedGeneration = ds.Generation
1101 }
1102 toUpdate.Status.DesiredNumberScheduled = int32(desiredNumberScheduled)
1103 toUpdate.Status.CurrentNumberScheduled = int32(currentNumberScheduled)
1104 toUpdate.Status.NumberMisscheduled = int32(numberMisscheduled)
1105 toUpdate.Status.NumberReady = int32(numberReady)
1106 toUpdate.Status.UpdatedNumberScheduled = int32(updatedNumberScheduled)
1107 toUpdate.Status.NumberAvailable = int32(numberAvailable)
1108 toUpdate.Status.NumberUnavailable = int32(numberUnavailable)
1109
1110 if _, updateErr = dsClient.UpdateStatus(ctx, toUpdate, metav1.UpdateOptions{}); updateErr == nil {
1111 return nil
1112 }
1113
1114
1115 if i >= StatusUpdateRetries {
1116 break
1117 }
1118
1119 if toUpdate, getErr = dsClient.Get(ctx, ds.Name, metav1.GetOptions{}); getErr != nil {
1120
1121
1122 return getErr
1123 }
1124 }
1125 return updateErr
1126 }
1127
1128 func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error {
1129 logger := klog.FromContext(ctx)
1130 logger.V(4).Info("Updating daemon set status")
1131 nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
1132 if err != nil {
1133 return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
1134 }
1135
1136 var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
1137 now := dsc.failedPodsBackoff.Clock.Now()
1138 for _, node := range nodeList {
1139 shouldRun, _ := NodeShouldRunDaemonPod(node, ds)
1140 scheduled := len(nodeToDaemonPods[node.Name]) > 0
1141
1142 if shouldRun {
1143 desiredNumberScheduled++
1144 if !scheduled {
1145 continue
1146 }
1147
1148 currentNumberScheduled++
1149
1150 daemonPods, _ := nodeToDaemonPods[node.Name]
1151 sort.Sort(podByCreationTimestampAndPhase(daemonPods))
1152 pod := daemonPods[0]
1153 if podutil.IsPodReady(pod) {
1154 numberReady++
1155 if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
1156 numberAvailable++
1157 }
1158 }
1159
1160
1161 generation, err := util.GetTemplateGeneration(ds)
1162 if err != nil {
1163 generation = nil
1164 }
1165 if util.IsPodUpdated(pod, hash, generation) {
1166 updatedNumberScheduled++
1167 }
1168 } else {
1169 if scheduled {
1170 numberMisscheduled++
1171 }
1172 }
1173 }
1174 numberUnavailable := desiredNumberScheduled - numberAvailable
1175
1176 err = storeDaemonSetStatus(ctx, dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
1177 if err != nil {
1178 return fmt.Errorf("error storing status for daemon set %#v: %w", ds, err)
1179 }
1180
1181
1182 if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable {
1183 dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second)
1184 }
1185 return nil
1186 }
1187
1188 func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) error {
1189 logger := klog.FromContext(ctx)
1190 startTime := dsc.failedPodsBackoff.Clock.Now()
1191
1192 defer func() {
1193 logger.V(4).Info("Finished syncing daemon set", "daemonset", key, "time", dsc.failedPodsBackoff.Clock.Now().Sub(startTime))
1194 }()
1195
1196 namespace, name, err := cache.SplitMetaNamespaceKey(key)
1197 if err != nil {
1198 return err
1199 }
1200 ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
1201 if apierrors.IsNotFound(err) {
1202 logger.V(3).Info("Daemon set has been deleted", "daemonset", key)
1203 dsc.expectations.DeleteExpectations(logger, key)
1204 return nil
1205 }
1206 if err != nil {
1207 return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
1208 }
1209
1210 nodeList, err := dsc.nodeLister.List(labels.Everything())
1211 if err != nil {
1212 return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
1213 }
1214
1215 everything := metav1.LabelSelector{}
1216 if reflect.DeepEqual(ds.Spec.Selector, &everything) {
1217 dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
1218 return nil
1219 }
1220
1221
1222
1223
1224 dsKey, err := controller.KeyFunc(ds)
1225 if err != nil {
1226 return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
1227 }
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237 if ds.DeletionTimestamp != nil {
1238 return nil
1239 }
1240
1241
1242 cur, old, err := dsc.constructHistory(ctx, ds)
1243 if err != nil {
1244 return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
1245 }
1246 hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
1247
1248 if !dsc.expectations.SatisfiedExpectations(logger, dsKey) {
1249
1250 return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
1251 }
1252
1253 err = dsc.updateDaemonSet(ctx, ds, nodeList, hash, dsKey, old)
1254 statusErr := dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
1255 switch {
1256 case err != nil && statusErr != nil:
1257
1258
1259 logger.Error(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds))
1260 return err
1261 case err != nil:
1262 return err
1263 case statusErr != nil:
1264 return statusErr
1265 }
1266
1267 return nil
1268 }
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278 func NodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool) {
1279 pod := NewPod(ds, node.Name)
1280
1281
1282 if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
1283 return false, false
1284 }
1285
1286 taints := node.Spec.Taints
1287 fitsNodeName, fitsNodeAffinity, fitsTaints := predicates(pod, node, taints)
1288 if !fitsNodeName || !fitsNodeAffinity {
1289 return false, false
1290 }
1291
1292 if !fitsTaints {
1293
1294 _, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
1295 return t.Effect == v1.TaintEffectNoExecute
1296 })
1297 return false, !hasUntoleratedTaint
1298 }
1299
1300 return true, true
1301 }
1302
1303
1304 func predicates(pod *v1.Pod, node *v1.Node, taints []v1.Taint) (fitsNodeName, fitsNodeAffinity, fitsTaints bool) {
1305 fitsNodeName = len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == node.Name
1306
1307 fitsNodeAffinity, _ = nodeaffinity.GetRequiredNodeAffinity(pod).Match(node)
1308 _, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
1309 return t.Effect == v1.TaintEffectNoExecute || t.Effect == v1.TaintEffectNoSchedule
1310 })
1311 fitsTaints = !hasUntoleratedTaint
1312 return
1313 }
1314
1315
1316 func NewPod(ds *apps.DaemonSet, nodeName string) *v1.Pod {
1317 newPod := &v1.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta}
1318 newPod.Namespace = ds.Namespace
1319 newPod.Spec.NodeName = nodeName
1320
1321
1322 util.AddOrUpdateDaemonPodTolerations(&newPod.Spec)
1323
1324 return newPod
1325 }
1326
1327 type podByCreationTimestampAndPhase []*v1.Pod
1328
1329 func (o podByCreationTimestampAndPhase) Len() int { return len(o) }
1330 func (o podByCreationTimestampAndPhase) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
1331
1332 func (o podByCreationTimestampAndPhase) Less(i, j int) bool {
1333
1334 if len(o[i].Spec.NodeName) != 0 && len(o[j].Spec.NodeName) == 0 {
1335 return true
1336 }
1337
1338 if len(o[i].Spec.NodeName) == 0 && len(o[j].Spec.NodeName) != 0 {
1339 return false
1340 }
1341
1342 if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
1343 return o[i].Name < o[j].Name
1344 }
1345 return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
1346 }
1347
1348 func failedPodsBackoffKey(ds *apps.DaemonSet, nodeName string) string {
1349 return fmt.Sprintf("%s/%d/%s", ds.UID, ds.Status.ObservedGeneration, nodeName)
1350 }
1351
1352
1353
1354 func getUnscheduledPodsWithoutNode(runningNodesList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) []string {
1355 var results []string
1356 isNodeRunning := make(map[string]bool, len(runningNodesList))
1357 for _, node := range runningNodesList {
1358 isNodeRunning[node.Name] = true
1359 }
1360
1361 for n, pods := range nodeToDaemonPods {
1362 if isNodeRunning[n] {
1363 continue
1364 }
1365 for _, pod := range pods {
1366 if len(pod.Spec.NodeName) == 0 {
1367 results = append(results, pod.Name)
1368 }
1369 }
1370 }
1371
1372 return results
1373 }
1374
View as plain text