1
16
17 package tainteviction
18
19 import (
20 "context"
21 "fmt"
22 "hash/fnv"
23 "io"
24 "math"
25 "sync"
26 "time"
27
28 v1 "k8s.io/api/core/v1"
29 apierrors "k8s.io/apimachinery/pkg/api/errors"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/types"
32 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
33 "k8s.io/apiserver/pkg/util/feature"
34 corev1informers "k8s.io/client-go/informers/core/v1"
35 clientset "k8s.io/client-go/kubernetes"
36 "k8s.io/client-go/kubernetes/scheme"
37 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
38 corelisters "k8s.io/client-go/listers/core/v1"
39 "k8s.io/client-go/tools/cache"
40 "k8s.io/client-go/tools/record"
41 "k8s.io/client-go/util/workqueue"
42 "k8s.io/klog/v2"
43 apipod "k8s.io/kubernetes/pkg/api/v1/pod"
44 "k8s.io/kubernetes/pkg/apis/core/helper"
45 v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
46 "k8s.io/kubernetes/pkg/controller/tainteviction/metrics"
47 controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
48 "k8s.io/kubernetes/pkg/features"
49 utilpod "k8s.io/kubernetes/pkg/util/pod"
50 )
51
52 const (
53
54
55
56
57 NodeUpdateChannelSize = 10
58
59 UpdateWorkerSize = 8
60 podUpdateChannelSize = 1
61 retries = 5
62 )
63
64 type nodeUpdateItem struct {
65 nodeName string
66 }
67
68 type podUpdateItem struct {
69 podName string
70 podNamespace string
71 nodeName string
72 }
73
74 func hash(val string, max int) int {
75 hasher := fnv.New32a()
76 io.WriteString(hasher, val)
77 return int(hasher.Sum32() % uint32(max))
78 }
79
80
81 type GetPodsByNodeNameFunc func(nodeName string) ([]*v1.Pod, error)
82
83
84
85 type Controller struct {
86 name string
87
88 client clientset.Interface
89 broadcaster record.EventBroadcaster
90 recorder record.EventRecorder
91 podLister corelisters.PodLister
92 podListerSynced cache.InformerSynced
93 nodeLister corelisters.NodeLister
94 nodeListerSynced cache.InformerSynced
95 getPodsAssignedToNode GetPodsByNodeNameFunc
96
97 taintEvictionQueue *TimedWorkerQueue
98
99 taintedNodesLock sync.Mutex
100 taintedNodes map[string][]v1.Taint
101
102 nodeUpdateChannels []chan nodeUpdateItem
103 podUpdateChannels []chan podUpdateItem
104
105 nodeUpdateQueue workqueue.Interface
106 podUpdateQueue workqueue.Interface
107 }
108
109 func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName), controllerName string) func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
110 return func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
111 ns := args.NamespacedName.Namespace
112 name := args.NamespacedName.Name
113 klog.FromContext(ctx).Info("Deleting pod", "controller", controllerName, "pod", args.NamespacedName)
114 if emitEventFunc != nil {
115 emitEventFunc(args.NamespacedName)
116 }
117 var err error
118 for i := 0; i < retries; i++ {
119 err = addConditionAndDeletePod(ctx, c, name, ns)
120 if err == nil {
121 metrics.PodDeletionsTotal.Inc()
122 metrics.PodDeletionsLatency.Observe(float64(time.Since(fireAt) * time.Second))
123 break
124 }
125 time.Sleep(10 * time.Millisecond)
126 }
127 return err
128 }
129 }
130
131 func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name, ns string) (err error) {
132 if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
133 pod, err := c.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
134 if err != nil {
135 return err
136 }
137 newStatus := pod.Status.DeepCopy()
138 updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
139 Type: v1.DisruptionTarget,
140 Status: v1.ConditionTrue,
141 Reason: "DeletionByTaintManager",
142 Message: "Taint manager: deleting due to NoExecute taint",
143 })
144 if updated {
145 if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
146 return err
147 }
148 }
149 }
150 return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
151 }
152
153 func getNoExecuteTaints(taints []v1.Taint) []v1.Taint {
154 result := []v1.Taint{}
155 for i := range taints {
156 if taints[i].Effect == v1.TaintEffectNoExecute {
157 result = append(result, taints[i])
158 }
159 }
160 return result
161 }
162
163
164 func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
165 minTolerationTime := int64(math.MaxInt64)
166 if len(tolerations) == 0 {
167 return 0
168 }
169
170 for i := range tolerations {
171 if tolerations[i].TolerationSeconds != nil {
172 tolerationSeconds := *(tolerations[i].TolerationSeconds)
173 if tolerationSeconds <= 0 {
174 return 0
175 } else if tolerationSeconds < minTolerationTime {
176 minTolerationTime = tolerationSeconds
177 }
178 }
179 }
180
181 if minTolerationTime == int64(math.MaxInt64) {
182 return -1
183 }
184 return time.Duration(minTolerationTime) * time.Second
185 }
186
187
188 func New(ctx context.Context, c clientset.Interface, podInformer corev1informers.PodInformer, nodeInformer corev1informers.NodeInformer, controllerName string) (*Controller, error) {
189 logger := klog.FromContext(ctx)
190 metrics.Register()
191 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
192 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
193
194 podIndexer := podInformer.Informer().GetIndexer()
195
196 tm := &Controller{
197 name: controllerName,
198
199 client: c,
200 broadcaster: eventBroadcaster,
201 recorder: recorder,
202 podLister: podInformer.Lister(),
203 podListerSynced: podInformer.Informer().HasSynced,
204 nodeLister: nodeInformer.Lister(),
205 nodeListerSynced: nodeInformer.Informer().HasSynced,
206 getPodsAssignedToNode: func(nodeName string) ([]*v1.Pod, error) {
207 objs, err := podIndexer.ByIndex("spec.nodeName", nodeName)
208 if err != nil {
209 return nil, err
210 }
211 pods := make([]*v1.Pod, 0, len(objs))
212 for _, obj := range objs {
213 pod, ok := obj.(*v1.Pod)
214 if !ok {
215 continue
216 }
217 pods = append(pods, pod)
218 }
219 return pods, nil
220 },
221 taintedNodes: make(map[string][]v1.Taint),
222
223 nodeUpdateQueue: workqueue.NewWithConfig(workqueue.QueueConfig{Name: "noexec_taint_node"}),
224 podUpdateQueue: workqueue.NewWithConfig(workqueue.QueueConfig{Name: "noexec_taint_pod"}),
225 }
226 tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent, tm.name))
227
228 _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
229 AddFunc: func(obj interface{}) {
230 pod := obj.(*v1.Pod)
231 tm.PodUpdated(nil, pod)
232 },
233 UpdateFunc: func(prev, obj interface{}) {
234 prevPod := prev.(*v1.Pod)
235 newPod := obj.(*v1.Pod)
236 tm.PodUpdated(prevPod, newPod)
237 },
238 DeleteFunc: func(obj interface{}) {
239 pod, isPod := obj.(*v1.Pod)
240
241 if !isPod {
242 deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
243 if !ok {
244 logger.Error(nil, "Received unexpected object", "object", obj)
245 return
246 }
247 pod, ok = deletedState.Obj.(*v1.Pod)
248 if !ok {
249 logger.Error(nil, "DeletedFinalStateUnknown contained non-Pod object", "object", deletedState.Obj)
250 return
251 }
252 }
253 tm.PodUpdated(pod, nil)
254 },
255 })
256 if err != nil {
257 return nil, fmt.Errorf("unable to add pod event handler: %w", err)
258 }
259
260 _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
261 AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
262 tm.NodeUpdated(nil, node)
263 return nil
264 }),
265 UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
266 tm.NodeUpdated(oldNode, newNode)
267 return nil
268 }),
269 DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
270 tm.NodeUpdated(node, nil)
271 return nil
272 }),
273 })
274 if err != nil {
275 return nil, fmt.Errorf("unable to add node event handler: %w", err)
276 }
277
278 return tm, nil
279 }
280
281
282 func (tc *Controller) Run(ctx context.Context) {
283 defer utilruntime.HandleCrash()
284 logger := klog.FromContext(ctx)
285 logger.Info("Starting", "controller", tc.name)
286 defer logger.Info("Shutting down controller", "controller", tc.name)
287
288
289 tc.broadcaster.StartStructuredLogging(3)
290 if tc.client != nil {
291 logger.Info("Sending events to api server")
292 tc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: tc.client.CoreV1().Events("")})
293 } else {
294 logger.Error(nil, "kubeClient is nil", "controller", tc.name)
295 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
296 }
297 defer tc.broadcaster.Shutdown()
298 defer tc.nodeUpdateQueue.ShutDown()
299 defer tc.podUpdateQueue.ShutDown()
300
301
302 if !cache.WaitForNamedCacheSync(tc.name, ctx.Done(), tc.podListerSynced, tc.nodeListerSynced) {
303 return
304 }
305
306 for i := 0; i < UpdateWorkerSize; i++ {
307 tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
308 tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
309 }
310
311
312
313 go func(stopCh <-chan struct{}) {
314 for {
315 item, shutdown := tc.nodeUpdateQueue.Get()
316 if shutdown {
317 break
318 }
319 nodeUpdate := item.(nodeUpdateItem)
320 hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
321 select {
322 case <-stopCh:
323 tc.nodeUpdateQueue.Done(item)
324 return
325 case tc.nodeUpdateChannels[hash] <- nodeUpdate:
326
327 }
328 }
329 }(ctx.Done())
330
331 go func(stopCh <-chan struct{}) {
332 for {
333 item, shutdown := tc.podUpdateQueue.Get()
334 if shutdown {
335 break
336 }
337
338
339
340
341 podUpdate := item.(podUpdateItem)
342 hash := hash(podUpdate.nodeName, UpdateWorkerSize)
343 select {
344 case <-stopCh:
345 tc.podUpdateQueue.Done(item)
346 return
347 case tc.podUpdateChannels[hash] <- podUpdate:
348
349 }
350 }
351 }(ctx.Done())
352
353 wg := sync.WaitGroup{}
354 wg.Add(UpdateWorkerSize)
355 for i := 0; i < UpdateWorkerSize; i++ {
356 go tc.worker(ctx, i, wg.Done, ctx.Done())
357 }
358 wg.Wait()
359 }
360
361 func (tc *Controller) worker(ctx context.Context, worker int, done func(), stopCh <-chan struct{}) {
362 defer done()
363
364
365
366
367
368 for {
369 select {
370 case <-stopCh:
371 return
372 case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
373 tc.handleNodeUpdate(ctx, nodeUpdate)
374 tc.nodeUpdateQueue.Done(nodeUpdate)
375 case podUpdate := <-tc.podUpdateChannels[worker]:
376
377 priority:
378 for {
379 select {
380 case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
381 tc.handleNodeUpdate(ctx, nodeUpdate)
382 tc.nodeUpdateQueue.Done(nodeUpdate)
383 default:
384 break priority
385 }
386 }
387
388 tc.handlePodUpdate(ctx, podUpdate)
389 tc.podUpdateQueue.Done(podUpdate)
390 }
391 }
392 }
393
394
395 func (tc *Controller) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) {
396 podName := ""
397 podNamespace := ""
398 nodeName := ""
399 oldTolerations := []v1.Toleration{}
400 if oldPod != nil {
401 podName = oldPod.Name
402 podNamespace = oldPod.Namespace
403 nodeName = oldPod.Spec.NodeName
404 oldTolerations = oldPod.Spec.Tolerations
405 }
406 newTolerations := []v1.Toleration{}
407 if newPod != nil {
408 podName = newPod.Name
409 podNamespace = newPod.Namespace
410 nodeName = newPod.Spec.NodeName
411 newTolerations = newPod.Spec.Tolerations
412 }
413
414 if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName {
415 return
416 }
417 updateItem := podUpdateItem{
418 podName: podName,
419 podNamespace: podNamespace,
420 nodeName: nodeName,
421 }
422
423 tc.podUpdateQueue.Add(updateItem)
424 }
425
426
427 func (tc *Controller) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) {
428 nodeName := ""
429 oldTaints := []v1.Taint{}
430 if oldNode != nil {
431 nodeName = oldNode.Name
432 oldTaints = getNoExecuteTaints(oldNode.Spec.Taints)
433 }
434
435 newTaints := []v1.Taint{}
436 if newNode != nil {
437 nodeName = newNode.Name
438 newTaints = getNoExecuteTaints(newNode.Spec.Taints)
439 }
440
441 if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) {
442 return
443 }
444 updateItem := nodeUpdateItem{
445 nodeName: nodeName,
446 }
447
448 tc.nodeUpdateQueue.Add(updateItem)
449 }
450
451 func (tc *Controller) cancelWorkWithEvent(logger klog.Logger, nsName types.NamespacedName) {
452 if tc.taintEvictionQueue.CancelWork(logger, nsName.String()) {
453 tc.emitCancelPodDeletionEvent(nsName)
454 }
455 }
456
457 func (tc *Controller) processPodOnNode(
458 ctx context.Context,
459 podNamespacedName types.NamespacedName,
460 nodeName string,
461 tolerations []v1.Toleration,
462 taints []v1.Taint,
463 now time.Time,
464 ) {
465 logger := klog.FromContext(ctx)
466 if len(taints) == 0 {
467 tc.cancelWorkWithEvent(logger, podNamespacedName)
468 }
469 allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
470 if !allTolerated {
471 logger.V(2).Info("Not all taints are tolerated after update for pod on node", "pod", podNamespacedName.String(), "node", klog.KRef("", nodeName))
472
473 tc.cancelWorkWithEvent(logger, podNamespacedName)
474 tc.taintEvictionQueue.AddWork(ctx, NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), now, now)
475 return
476 }
477 minTolerationTime := getMinTolerationTime(usedTolerations)
478
479 if minTolerationTime < 0 {
480 logger.V(4).Info("Current tolerations for pod tolerate forever, cancelling any scheduled deletion", "pod", podNamespacedName.String())
481 tc.cancelWorkWithEvent(logger, podNamespacedName)
482 return
483 }
484
485 startTime := now
486 triggerTime := startTime.Add(minTolerationTime)
487 scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
488 if scheduledEviction != nil {
489 startTime = scheduledEviction.CreatedAt
490 if startTime.Add(minTolerationTime).Before(triggerTime) {
491 return
492 }
493 tc.cancelWorkWithEvent(logger, podNamespacedName)
494 }
495 tc.taintEvictionQueue.AddWork(ctx, NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
496 }
497
498 func (tc *Controller) handlePodUpdate(ctx context.Context, podUpdate podUpdateItem) {
499 pod, err := tc.podLister.Pods(podUpdate.podNamespace).Get(podUpdate.podName)
500 logger := klog.FromContext(ctx)
501 if err != nil {
502 if apierrors.IsNotFound(err) {
503
504 podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
505 logger.V(4).Info("Noticed pod deletion", "pod", podNamespacedName)
506 tc.cancelWorkWithEvent(logger, podNamespacedName)
507 return
508 }
509 utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
510 return
511 }
512
513
514 if pod.Spec.NodeName != podUpdate.nodeName {
515 return
516 }
517
518
519 podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
520 logger.V(4).Info("Noticed pod update", "pod", podNamespacedName)
521 nodeName := pod.Spec.NodeName
522 if nodeName == "" {
523 return
524 }
525 taints, ok := func() ([]v1.Taint, bool) {
526 tc.taintedNodesLock.Lock()
527 defer tc.taintedNodesLock.Unlock()
528 taints, ok := tc.taintedNodes[nodeName]
529 return taints, ok
530 }()
531
532
533 if !ok {
534 return
535 }
536 tc.processPodOnNode(ctx, podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
537 }
538
539 func (tc *Controller) handleNodeUpdate(ctx context.Context, nodeUpdate nodeUpdateItem) {
540 node, err := tc.nodeLister.Get(nodeUpdate.nodeName)
541 logger := klog.FromContext(ctx)
542 if err != nil {
543 if apierrors.IsNotFound(err) {
544
545 logger.V(4).Info("Noticed node deletion", "node", klog.KRef("", nodeUpdate.nodeName))
546 tc.taintedNodesLock.Lock()
547 defer tc.taintedNodesLock.Unlock()
548 delete(tc.taintedNodes, nodeUpdate.nodeName)
549 return
550 }
551 utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
552 return
553 }
554
555
556 logger.V(4).Info("Noticed node update", "node", klog.KObj(node))
557 taints := getNoExecuteTaints(node.Spec.Taints)
558 func() {
559 tc.taintedNodesLock.Lock()
560 defer tc.taintedNodesLock.Unlock()
561 logger.V(4).Info("Updating known taints on node", "node", klog.KObj(node), "taints", taints)
562 if len(taints) == 0 {
563 delete(tc.taintedNodes, node.Name)
564 } else {
565 tc.taintedNodes[node.Name] = taints
566 }
567 }()
568
569
570
571
572 pods, err := tc.getPodsAssignedToNode(node.Name)
573 if err != nil {
574 logger.Error(err, "Failed to get pods assigned to node", "node", klog.KObj(node))
575 return
576 }
577 if len(pods) == 0 {
578 return
579 }
580
581 if len(taints) == 0 {
582 logger.V(4).Info("All taints were removed from the node. Cancelling all evictions...", "node", klog.KObj(node))
583 for i := range pods {
584 tc.cancelWorkWithEvent(logger, types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
585 }
586 return
587 }
588
589 now := time.Now()
590 for _, pod := range pods {
591 podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
592 tc.processPodOnNode(ctx, podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
593 }
594 }
595
596 func (tc *Controller) emitPodDeletionEvent(nsName types.NamespacedName) {
597 if tc.recorder == nil {
598 return
599 }
600 ref := &v1.ObjectReference{
601 APIVersion: "v1",
602 Kind: "Pod",
603 Name: nsName.Name,
604 Namespace: nsName.Namespace,
605 }
606 tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Marking for deletion Pod %s", nsName.String())
607 }
608
609 func (tc *Controller) emitCancelPodDeletionEvent(nsName types.NamespacedName) {
610 if tc.recorder == nil {
611 return
612 }
613 ref := &v1.ObjectReference{
614 APIVersion: "v1",
615 Kind: "Pod",
616 Name: nsName.Name,
617 Namespace: nsName.Namespace,
618 }
619 tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Cancelling deletion of Pod %s", nsName.String())
620 }
621
View as plain text