    17  // The TTLController sets ttl annotations on nodes, based on cluster size.
    18  // The annotations are consumed by Kubelets as suggestions for how long
    19  // it can cache objects (e.g. secrets or config maps) before refetching
    20  // from apiserver again.
    21  //
    22  // TODO: This is a temporary workaround for the Kubelet not being able to
    23  // send "watch secrets attached to pods from my node" request. Once
    24  // sending such request will be possible, we will modify Kubelet to
    25  // use it and get rid of this controller completely.
    27  package ttl
    29  import (
    30  	"context"
    31  	"fmt"
    32  	"math"
    33  	"strconv"
    34  	"sync"
    35  	"time"
    37  	v1 "k8s.io/api/core/v1"
    38  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    39  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    40  	"k8s.io/apimachinery/pkg/types"
    41  	"k8s.io/apimachinery/pkg/util/json"
    42  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    43  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    44  	"k8s.io/apimachinery/pkg/util/wait"
    45  	informers "k8s.io/client-go/informers/core/v1"
    46  	clientset "k8s.io/client-go/kubernetes"
    47  	listers "k8s.io/client-go/listers/core/v1"
    48  	"k8s.io/client-go/tools/cache"
    49  	"k8s.io/client-go/util/workqueue"
    50  	"k8s.io/kubernetes/pkg/controller"
    52  	"k8s.io/klog/v2"
    53  )
    55  // Controller sets ttl annotations on nodes, based on cluster size.
    56  type Controller struct {
    57  	kubeClient clientset.Interface
    59  	// nodeStore is a local cache of nodes.
    60  	nodeStore listers.NodeLister
    62  	// Nodes that need to be synced.
    63  	queue workqueue.RateLimitingInterface
    65  	// Returns true if all underlying informers are synced.
    66  	hasSynced func() bool
    68  	lock sync.RWMutex
    70  	// Number of nodes in the cluster.
    71  	nodeCount int
    73  	// Desired TTL for all nodes in the cluster.
    74  	desiredTTLSeconds int
    76  	// In which interval of cluster size we currently are.
    77  	boundaryStep int
    78  }
    80  // NewTTLController creates a new TTLController
    81  func NewTTLController(ctx context.Context, nodeInformer informers.NodeInformer, kubeClient clientset.Interface) *Controller {
    82  	ttlc := &Controller{
    83  		kubeClient: kubeClient,
    84  		queue:      workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttlcontroller"),
    85  	}
    86  	logger := klog.FromContext(ctx)
    87  	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    88  		AddFunc: func(obj interface{}) {
    89  			ttlc.addNode(logger, obj)
    90  		},
    91  		UpdateFunc: func(old, newObj interface{}) {
    92  			ttlc.updateNode(logger, old, newObj)
    93  		},
    94  		DeleteFunc: ttlc.deleteNode,
    95  	})
    97  	ttlc.nodeStore = listers.NewNodeLister(nodeInformer.Informer().GetIndexer())
    98  	ttlc.hasSynced = nodeInformer.Informer().HasSynced
   100  	return ttlc
   101  }
   103  type ttlBoundary struct {
   104  	sizeMin    int
   105  	sizeMax    int
   106  	ttlSeconds int
   107  }
   109  var (
   110  	ttlBoundaries = []ttlBoundary{
   111  		{sizeMin: 0, sizeMax: 100, ttlSeconds: 0},
   112  		{sizeMin: 90, sizeMax: 500, ttlSeconds: 15},
   113  		{sizeMin: 450, sizeMax: 1000, ttlSeconds: 30},
   114  		{sizeMin: 900, sizeMax: 2000, ttlSeconds: 60},
   115  		{sizeMin: 1800, sizeMax: math.MaxInt32, ttlSeconds: 300},
   116  	}
   117  )
   119  // Run begins watching and syncing.
   120  func (ttlc *Controller) Run(ctx context.Context, workers int) {
   121  	defer utilruntime.HandleCrash()
   122  	defer ttlc.queue.ShutDown()
   123  	logger := klog.FromContext(ctx)
   124  	logger.Info("Starting TTL controller")
   125  	defer logger.Info("Shutting down TTL controller")
   127  	if !cache.WaitForNamedCacheSync("TTL", ctx.Done(), ttlc.hasSynced) {
   128  		return
   129  	}
   131  	for i := 0; i < workers; i++ {
   132  		go wait.UntilWithContext(ctx, ttlc.worker, time.Second)
   133  	}
   135  	<-ctx.Done()
   136  }
   138  func (ttlc *Controller) addNode(logger klog.Logger, obj interface{}) {
   139  	node, ok := obj.(*v1.Node)
   140  	if !ok {
   141  		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
   142  		return
   143  	}
   145  	func() {
   146  		ttlc.lock.Lock()
   147  		defer ttlc.lock.Unlock()
   148  		ttlc.nodeCount++
   149  		if ttlc.nodeCount > ttlBoundaries[ttlc.boundaryStep].sizeMax {
   150  			ttlc.boundaryStep++
   151  			ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
   152  		}
   153  	}()
   154  	ttlc.enqueueNode(logger, node)
   155  }
   157  func (ttlc *Controller) updateNode(logger klog.Logger, _, newObj interface{}) {
   158  	node, ok := newObj.(*v1.Node)
   159  	if !ok {
   160  		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
   161  		return
   162  	}
   163  	// Processing all updates of nodes guarantees that we will update
   164  	// the ttl annotation, when cluster size changes.
   165  	// We are relying on the fact that Kubelet is updating node status
   166  	// every 10s (or generally every X seconds), which means that whenever
   167  	// required, its ttl annotation should be updated within that period.
   168  	ttlc.enqueueNode(logger, node)
   169  }
   171  func (ttlc *Controller) deleteNode(obj interface{}) {
   172  	_, ok := obj.(*v1.Node)
   173  	if !ok {
   174  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   175  		if !ok {
   176  			utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
   177  			return
   178  		}
   179  		_, ok = tombstone.Obj.(*v1.Node)
   180  		if !ok {
   181  			utilruntime.HandleError(fmt.Errorf("unexpected object types: %v", obj))
   182  			return
   183  		}
   184  	}
   186  	func() {
   187  		ttlc.lock.Lock()
   188  		defer ttlc.lock.Unlock()
   189  		ttlc.nodeCount--
   190  		if ttlc.nodeCount < ttlBoundaries[ttlc.boundaryStep].sizeMin {
   191  			ttlc.boundaryStep--
   192  			ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
   193  		}
   194  	}()
   195  	// We are not processing the node, as it no longer exists.
   196  }
   198  func (ttlc *Controller) enqueueNode(logger klog.Logger, node *v1.Node) {
   199  	key, err := controller.KeyFunc(node)
   200  	if err != nil {
   201  		logger.Error(nil, "Couldn't get key for object", "object", klog.KObj(node))
   202  		return
   203  	}
   204  	ttlc.queue.Add(key)
   205  }
   207  func (ttlc *Controller) worker(ctx context.Context) {
   208  	for ttlc.processItem(ctx) {
   209  	}
   210  }
   212  func (ttlc *Controller) processItem(ctx context.Context) bool {
   213  	key, quit := ttlc.queue.Get()
   214  	if quit {
   215  		return false
   216  	}
   217  	defer ttlc.queue.Done(key)
   219  	err := ttlc.updateNodeIfNeeded(ctx, key.(string))
   220  	if err == nil {
   221  		ttlc.queue.Forget(key)
   222  		return true
   223  	}
   225  	ttlc.queue.AddRateLimited(key)
   226  	utilruntime.HandleError(err)
   227  	return true
   228  }
   230  func (ttlc *Controller) getDesiredTTLSeconds() int {
   231  	ttlc.lock.RLock()
   232  	defer ttlc.lock.RUnlock()
   233  	return ttlc.desiredTTLSeconds
   234  }
   236  func getIntFromAnnotation(ctx context.Context, node *v1.Node, annotationKey string) (int, bool) {
   237  	if node.Annotations == nil {
   238  		return 0, false
   239  	}
   240  	annotationValue, ok := node.Annotations[annotationKey]
   241  	if !ok {
   242  		return 0, false
   243  	}
   244  	intValue, err := strconv.Atoi(annotationValue)
   245  	if err != nil {
   246  		logger := klog.FromContext(ctx)
   247  		logger.Info("Could not convert the value with annotation key for the node", "annotationValue",
   248  			annotationValue, "annotationKey", annotationKey, "node", klog.KObj(node))
   249  		return 0, false
   250  	}
   251  	return intValue, true
   252  }
   254  func setIntAnnotation(node *v1.Node, annotationKey string, value int) {
   255  	if node.Annotations == nil {
   256  		node.Annotations = make(map[string]string)
   257  	}
   258  	node.Annotations[annotationKey] = strconv.Itoa(value)
   259  }
   261  func (ttlc *Controller) patchNodeWithAnnotation(ctx context.Context, node *v1.Node, annotationKey string, value int) error {
   262  	oldData, err := json.Marshal(node)
   263  	if err != nil {
   264  		return err
   265  	}
   266  	setIntAnnotation(node, annotationKey, value)
   267  	newData, err := json.Marshal(node)
   268  	if err != nil {
   269  		return err
   270  	}
   271  	patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
   272  	if err != nil {
   273  		return err
   274  	}
   275  	_, err = ttlc.kubeClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
   276  	logger := klog.FromContext(ctx)
   277  	if err != nil {
   278  		logger.V(2).Info("Failed to change ttl annotation for node", "node", klog.KObj(node), "err", err)
   279  		return err
   280  	}
   281  	logger.V(2).Info("Changed ttl annotation", "node", klog.KObj(node), "TTL", time.Duration(value)*time.Second)
   282  	return nil
   283  }
   285  func (ttlc *Controller) updateNodeIfNeeded(ctx context.Context, key string) error {
   286  	node, err := ttlc.nodeStore.Get(key)
   287  	if err != nil {
   288  		if apierrors.IsNotFound(err) {
   289  			return nil
   290  		}
   291  		return err
   292  	}
   294  	desiredTTL := ttlc.getDesiredTTLSeconds()
   295  	currentTTL, ok := getIntFromAnnotation(ctx, node, v1.ObjectTTLAnnotationKey)
   296  	if ok && currentTTL == desiredTTL {
   297  		return nil
   298  	}
   300  	return ttlc.patchNodeWithAnnotation(ctx, node.DeepCopy(), v1.ObjectTTLAnnotationKey, desiredTTL)
   301  }

