...

Source file src/k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking/controller.go

Documentation: k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package legacytokentracking
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	"golang.org/x/time/rate"
    25  
    26  	corev1 "k8s.io/api/core/v1"
    27  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/fields"
    30  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	corev1informers "k8s.io/client-go/informers/core/v1"
    33  	"k8s.io/client-go/kubernetes"
    34  	corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
    35  	"k8s.io/client-go/tools/cache"
    36  	"k8s.io/client-go/util/workqueue"
    37  	"k8s.io/klog/v2"
    38  	"k8s.io/utils/clock"
    39  )
    40  
    41  const (
    42  	ConfigMapName    = "kube-apiserver-legacy-service-account-token-tracking"
    43  	ConfigMapDataKey = "since"
    44  	dateFormat       = "2006-01-02"
    45  )
    46  
    47  var (
    48  	queueKey = metav1.NamespaceSystem + "/" + ConfigMapName
    49  )
    50  
    51  // Controller maintains a timestamp value configmap `kube-apiserver-legacy-service-account-token-tracking`
    52  // in `kube-system` to indicates if the tracking for legacy tokens is enabled in
    53  // the cluster. For HA clusters, the configmap will be eventually created after
    54  // all controller instances have enabled the feature. When disabling this
    55  // feature, existing configmap will be deleted.
    56  type Controller struct {
    57  	configMapClient   corev1client.ConfigMapsGetter
    58  	configMapInformer cache.SharedIndexInformer
    59  	configMapCache    cache.Indexer
    60  	configMapSynced   cache.InformerSynced
    61  	queue             workqueue.RateLimitingInterface
    62  
    63  	// rate limiter controls the rate limit of the creation of the configmap.
    64  	// this is useful in multi-apiserver cluster to prevent config existing in a
    65  	// cluster with mixed enabled/disabled controllers. otherwise, those
    66  	// apiservers will fight to create/delete until all apiservers are enabled
    67  	// or disabled.
    68  	creationRatelimiter *rate.Limiter
    69  	clock               clock.Clock
    70  }
    71  
    72  // NewController returns a Controller struct.
    73  func NewController(cs kubernetes.Interface) *Controller {
    74  	return newController(cs, clock.RealClock{}, rate.NewLimiter(rate.Every(30*time.Minute), 1))
    75  }
    76  
    77  func newController(cs kubernetes.Interface, cl clock.Clock, limiter *rate.Limiter) *Controller {
    78  	informer := corev1informers.NewFilteredConfigMapInformer(cs, metav1.NamespaceSystem, 12*time.Hour, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, func(options *metav1.ListOptions) {
    79  		options.FieldSelector = fields.OneTermEqualSelector("metadata.name", ConfigMapName).String()
    80  	})
    81  
    82  	c := &Controller{
    83  		configMapClient:     cs.CoreV1(),
    84  		queue:               workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "legacy_token_tracking_controller"),
    85  		configMapInformer:   informer,
    86  		configMapCache:      informer.GetIndexer(),
    87  		configMapSynced:     informer.HasSynced,
    88  		creationRatelimiter: limiter,
    89  		clock:               cl,
    90  	}
    91  
    92  	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    93  		AddFunc: func(obj interface{}) {
    94  			c.enqueue()
    95  		},
    96  		UpdateFunc: func(oldObj, newObj interface{}) {
    97  			c.enqueue()
    98  		},
    99  		DeleteFunc: func(obj interface{}) {
   100  			c.enqueue()
   101  		},
   102  	})
   103  
   104  	return c
   105  }
   106  
   107  func (c *Controller) enqueue() {
   108  	c.queue.Add(queueKey)
   109  }
   110  
   111  // Run starts the controller sync loop.
   112  func (c *Controller) Run(stopCh <-chan struct{}) {
   113  	defer utilruntime.HandleCrash()
   114  	defer c.queue.ShutDown()
   115  
   116  	klog.Info("Starting legacy_token_tracking_controller")
   117  	defer klog.Infof("Shutting down legacy_token_tracking_controller")
   118  
   119  	go c.configMapInformer.Run(stopCh)
   120  	if !cache.WaitForNamedCacheSync("configmaps", stopCh, c.configMapSynced) {
   121  		return
   122  	}
   123  
   124  	go wait.Until(c.runWorker, time.Second, stopCh)
   125  
   126  	c.queue.Add(queueKey)
   127  
   128  	<-stopCh
   129  	klog.Info("Ending legacy_token_tracking_controller")
   130  }
   131  
   132  func (c *Controller) runWorker() {
   133  	for c.processNext() {
   134  	}
   135  }
   136  
   137  func (c *Controller) processNext() bool {
   138  	key, quit := c.queue.Get()
   139  	if quit {
   140  		return false
   141  	}
   142  	defer c.queue.Done(key)
   143  
   144  	if err := c.syncConfigMap(); err != nil {
   145  		utilruntime.HandleError(fmt.Errorf("while syncing ConfigMap %q, err: %w", key, err))
   146  		c.queue.AddRateLimited(key)
   147  		return true
   148  	}
   149  	c.queue.Forget(key)
   150  	return true
   151  }
   152  
   153  func (c *Controller) syncConfigMap() error {
   154  	obj, exists, err := c.configMapCache.GetByKey(queueKey)
   155  	if err != nil {
   156  		return err
   157  	}
   158  
   159  	now := c.clock.Now()
   160  	if !exists {
   161  		r := c.creationRatelimiter.ReserveN(now, 1)
   162  		if delay := r.DelayFrom(now); delay > 0 {
   163  			c.queue.AddAfter(queueKey, delay)
   164  			r.CancelAt(now)
   165  			return nil
   166  		}
   167  
   168  		if _, err = c.configMapClient.ConfigMaps(metav1.NamespaceSystem).Create(context.TODO(), &corev1.ConfigMap{
   169  			ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem, Name: ConfigMapName},
   170  			Data:       map[string]string{ConfigMapDataKey: now.UTC().Format(dateFormat)},
   171  		}, metav1.CreateOptions{}); err != nil {
   172  			if apierrors.IsAlreadyExists(err) {
   173  				return nil
   174  			}
   175  			// don't consume the creationRatelimiter for an unsuccessful attempt
   176  			r.CancelAt(now)
   177  			return err
   178  		}
   179  	} else {
   180  		configMap := obj.(*corev1.ConfigMap)
   181  		if _, err = time.Parse(dateFormat, configMap.Data[ConfigMapDataKey]); err != nil {
   182  			configMap := configMap.DeepCopy()
   183  			if configMap.Data == nil {
   184  				configMap.Data = map[string]string{}
   185  			}
   186  			configMap.Data[ConfigMapDataKey] = now.UTC().Format(dateFormat)
   187  			if _, err = c.configMapClient.ConfigMaps(metav1.NamespaceSystem).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil {
   188  				if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
   189  					return nil
   190  				}
   191  				return err
   192  			}
   193  		}
   194  	}
   195  
   196  	return nil
   197  }
   198  

View as plain text