...

Source file src/k8s.io/kubernetes/pkg/controller/ttl/ttl_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/ttl

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // The TTLController sets ttl annotations on nodes, based on cluster size.
    18  // The annotations are consumed by Kubelets as suggestions for how long
    19  // it can cache objects (e.g. secrets or config maps) before refetching
    20  // from apiserver again.
    21  //
    22  // TODO: This is a temporary workaround for the Kubelet not being able to
    23  // send "watch secrets attached to pods from my node" request. Once
    24  // sending such request will be possible, we will modify Kubelet to
    25  // use it and get rid of this controller completely.
    26  
    27  package ttl
    28  
    29  import (
    30  	"context"
    31  	"fmt"
    32  	"math"
    33  	"strconv"
    34  	"sync"
    35  	"time"
    36  
    37  	v1 "k8s.io/api/core/v1"
    38  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    39  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    40  	"k8s.io/apimachinery/pkg/types"
    41  	"k8s.io/apimachinery/pkg/util/json"
    42  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    43  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    44  	"k8s.io/apimachinery/pkg/util/wait"
    45  	informers "k8s.io/client-go/informers/core/v1"
    46  	clientset "k8s.io/client-go/kubernetes"
    47  	listers "k8s.io/client-go/listers/core/v1"
    48  	"k8s.io/client-go/tools/cache"
    49  	"k8s.io/client-go/util/workqueue"
    50  	"k8s.io/kubernetes/pkg/controller"
    51  
    52  	"k8s.io/klog/v2"
    53  )
    54  
    55  // Controller sets ttl annotations on nodes, based on cluster size.
    56  type Controller struct {
    57  	kubeClient clientset.Interface
    58  
    59  	// nodeStore is a local cache of nodes.
    60  	nodeStore listers.NodeLister
    61  
    62  	// Nodes that need to be synced.
    63  	queue workqueue.RateLimitingInterface
    64  
    65  	// Returns true if all underlying informers are synced.
    66  	hasSynced func() bool
    67  
    68  	lock sync.RWMutex
    69  
    70  	// Number of nodes in the cluster.
    71  	nodeCount int
    72  
    73  	// Desired TTL for all nodes in the cluster.
    74  	desiredTTLSeconds int
    75  
    76  	// In which interval of cluster size we currently are.
    77  	boundaryStep int
    78  }
    79  
    80  // NewTTLController creates a new TTLController
    81  func NewTTLController(ctx context.Context, nodeInformer informers.NodeInformer, kubeClient clientset.Interface) *Controller {
    82  	ttlc := &Controller{
    83  		kubeClient: kubeClient,
    84  		queue:      workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttlcontroller"),
    85  	}
    86  	logger := klog.FromContext(ctx)
    87  	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    88  		AddFunc: func(obj interface{}) {
    89  			ttlc.addNode(logger, obj)
    90  		},
    91  		UpdateFunc: func(old, newObj interface{}) {
    92  			ttlc.updateNode(logger, old, newObj)
    93  		},
    94  		DeleteFunc: ttlc.deleteNode,
    95  	})
    96  
    97  	ttlc.nodeStore = listers.NewNodeLister(nodeInformer.Informer().GetIndexer())
    98  	ttlc.hasSynced = nodeInformer.Informer().HasSynced
    99  
   100  	return ttlc
   101  }
   102  
   103  type ttlBoundary struct {
   104  	sizeMin    int
   105  	sizeMax    int
   106  	ttlSeconds int
   107  }
   108  
   109  var (
   110  	ttlBoundaries = []ttlBoundary{
   111  		{sizeMin: 0, sizeMax: 100, ttlSeconds: 0},
   112  		{sizeMin: 90, sizeMax: 500, ttlSeconds: 15},
   113  		{sizeMin: 450, sizeMax: 1000, ttlSeconds: 30},
   114  		{sizeMin: 900, sizeMax: 2000, ttlSeconds: 60},
   115  		{sizeMin: 1800, sizeMax: math.MaxInt32, ttlSeconds: 300},
   116  	}
   117  )
   118  
   119  // Run begins watching and syncing.
   120  func (ttlc *Controller) Run(ctx context.Context, workers int) {
   121  	defer utilruntime.HandleCrash()
   122  	defer ttlc.queue.ShutDown()
   123  	logger := klog.FromContext(ctx)
   124  	logger.Info("Starting TTL controller")
   125  	defer logger.Info("Shutting down TTL controller")
   126  
   127  	if !cache.WaitForNamedCacheSync("TTL", ctx.Done(), ttlc.hasSynced) {
   128  		return
   129  	}
   130  
   131  	for i := 0; i < workers; i++ {
   132  		go wait.UntilWithContext(ctx, ttlc.worker, time.Second)
   133  	}
   134  
   135  	<-ctx.Done()
   136  }
   137  
   138  func (ttlc *Controller) addNode(logger klog.Logger, obj interface{}) {
   139  	node, ok := obj.(*v1.Node)
   140  	if !ok {
   141  		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
   142  		return
   143  	}
   144  
   145  	func() {
   146  		ttlc.lock.Lock()
   147  		defer ttlc.lock.Unlock()
   148  		ttlc.nodeCount++
   149  		if ttlc.nodeCount > ttlBoundaries[ttlc.boundaryStep].sizeMax {
   150  			ttlc.boundaryStep++
   151  			ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
   152  		}
   153  	}()
   154  	ttlc.enqueueNode(logger, node)
   155  }
   156  
   157  func (ttlc *Controller) updateNode(logger klog.Logger, _, newObj interface{}) {
   158  	node, ok := newObj.(*v1.Node)
   159  	if !ok {
   160  		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
   161  		return
   162  	}
   163  	// Processing all updates of nodes guarantees that we will update
   164  	// the ttl annotation, when cluster size changes.
   165  	// We are relying on the fact that Kubelet is updating node status
   166  	// every 10s (or generally every X seconds), which means that whenever
   167  	// required, its ttl annotation should be updated within that period.
   168  	ttlc.enqueueNode(logger, node)
   169  }
   170  
   171  func (ttlc *Controller) deleteNode(obj interface{}) {
   172  	_, ok := obj.(*v1.Node)
   173  	if !ok {
   174  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   175  		if !ok {
   176  			utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
   177  			return
   178  		}
   179  		_, ok = tombstone.Obj.(*v1.Node)
   180  		if !ok {
   181  			utilruntime.HandleError(fmt.Errorf("unexpected object types: %v", obj))
   182  			return
   183  		}
   184  	}
   185  
   186  	func() {
   187  		ttlc.lock.Lock()
   188  		defer ttlc.lock.Unlock()
   189  		ttlc.nodeCount--
   190  		if ttlc.nodeCount < ttlBoundaries[ttlc.boundaryStep].sizeMin {
   191  			ttlc.boundaryStep--
   192  			ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
   193  		}
   194  	}()
   195  	// We are not processing the node, as it no longer exists.
   196  }
   197  
   198  func (ttlc *Controller) enqueueNode(logger klog.Logger, node *v1.Node) {
   199  	key, err := controller.KeyFunc(node)
   200  	if err != nil {
   201  		logger.Error(nil, "Couldn't get key for object", "object", klog.KObj(node))
   202  		return
   203  	}
   204  	ttlc.queue.Add(key)
   205  }
   206  
   207  func (ttlc *Controller) worker(ctx context.Context) {
   208  	for ttlc.processItem(ctx) {
   209  	}
   210  }
   211  
   212  func (ttlc *Controller) processItem(ctx context.Context) bool {
   213  	key, quit := ttlc.queue.Get()
   214  	if quit {
   215  		return false
   216  	}
   217  	defer ttlc.queue.Done(key)
   218  
   219  	err := ttlc.updateNodeIfNeeded(ctx, key.(string))
   220  	if err == nil {
   221  		ttlc.queue.Forget(key)
   222  		return true
   223  	}
   224  
   225  	ttlc.queue.AddRateLimited(key)
   226  	utilruntime.HandleError(err)
   227  	return true
   228  }
   229  
   230  func (ttlc *Controller) getDesiredTTLSeconds() int {
   231  	ttlc.lock.RLock()
   232  	defer ttlc.lock.RUnlock()
   233  	return ttlc.desiredTTLSeconds
   234  }
   235  
   236  func getIntFromAnnotation(ctx context.Context, node *v1.Node, annotationKey string) (int, bool) {
   237  	if node.Annotations == nil {
   238  		return 0, false
   239  	}
   240  	annotationValue, ok := node.Annotations[annotationKey]
   241  	if !ok {
   242  		return 0, false
   243  	}
   244  	intValue, err := strconv.Atoi(annotationValue)
   245  	if err != nil {
   246  		logger := klog.FromContext(ctx)
   247  		logger.Info("Could not convert the value with annotation key for the node", "annotationValue",
   248  			annotationValue, "annotationKey", annotationKey, "node", klog.KObj(node))
   249  		return 0, false
   250  	}
   251  	return intValue, true
   252  }
   253  
   254  func setIntAnnotation(node *v1.Node, annotationKey string, value int) {
   255  	if node.Annotations == nil {
   256  		node.Annotations = make(map[string]string)
   257  	}
   258  	node.Annotations[annotationKey] = strconv.Itoa(value)
   259  }
   260  
   261  func (ttlc *Controller) patchNodeWithAnnotation(ctx context.Context, node *v1.Node, annotationKey string, value int) error {
   262  	oldData, err := json.Marshal(node)
   263  	if err != nil {
   264  		return err
   265  	}
   266  	setIntAnnotation(node, annotationKey, value)
   267  	newData, err := json.Marshal(node)
   268  	if err != nil {
   269  		return err
   270  	}
   271  	patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
   272  	if err != nil {
   273  		return err
   274  	}
   275  	_, err = ttlc.kubeClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
   276  	logger := klog.FromContext(ctx)
   277  	if err != nil {
   278  		logger.V(2).Info("Failed to change ttl annotation for node", "node", klog.KObj(node), "err", err)
   279  		return err
   280  	}
   281  	logger.V(2).Info("Changed ttl annotation", "node", klog.KObj(node), "TTL", time.Duration(value)*time.Second)
   282  	return nil
   283  }
   284  
   285  func (ttlc *Controller) updateNodeIfNeeded(ctx context.Context, key string) error {
   286  	node, err := ttlc.nodeStore.Get(key)
   287  	if err != nil {
   288  		if apierrors.IsNotFound(err) {
   289  			return nil
   290  		}
   291  		return err
   292  	}
   293  
   294  	desiredTTL := ttlc.getDesiredTTLSeconds()
   295  	currentTTL, ok := getIntFromAnnotation(ctx, node, v1.ObjectTTLAnnotationKey)
   296  	if ok && currentTTL == desiredTTL {
   297  		return nil
   298  	}
   299  
   300  	return ttlc.patchNodeWithAnnotation(ctx, node.DeepCopy(), v1.ObjectTTLAnnotationKey, desiredTTL)
   301  }
   302  

View as plain text