...

Source file src/k8s.io/kubernetes/pkg/controller/certificates/rootcacertpublisher/publisher.go

Documentation: k8s.io/kubernetes/pkg/controller/certificates/rootcacertpublisher

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package rootcacertpublisher
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    29  	"k8s.io/apimachinery/pkg/util/wait"
    30  	coreinformers "k8s.io/client-go/informers/core/v1"
    31  	clientset "k8s.io/client-go/kubernetes"
    32  	corelisters "k8s.io/client-go/listers/core/v1"
    33  	"k8s.io/client-go/tools/cache"
    34  	"k8s.io/client-go/util/workqueue"
    35  	"k8s.io/klog/v2"
    36  )
    37  
    38  // RootCACertConfigMapName is name of the configmap which stores certificates
    39  // to access api-server
    40  const (
    41  	RootCACertConfigMapName = "kube-root-ca.crt"
    42  	DescriptionAnnotation   = "kubernetes.io/description"
    43  	Description             = "Contains a CA bundle that can be used to verify the kube-apiserver when using internal endpoints such as the internal service IP or kubernetes.default.svc. " +
    44  		"No other usage is guaranteed across distributions of Kubernetes clusters."
    45  )
    46  
    47  func init() {
    48  	registerMetrics()
    49  }
    50  
    51  // NewPublisher construct a new controller which would manage the configmap
    52  // which stores certificates in each namespace. It will make sure certificate
    53  // configmap exists in each namespace.
    54  func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinformers.NamespaceInformer, cl clientset.Interface, rootCA []byte) (*Publisher, error) {
    55  	e := &Publisher{
    56  		client: cl,
    57  		rootCA: rootCA,
    58  		queue:  workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "root_ca_cert_publisher"),
    59  	}
    60  
    61  	cmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    62  		DeleteFunc: e.configMapDeleted,
    63  		UpdateFunc: e.configMapUpdated,
    64  	})
    65  	e.cmLister = cmInformer.Lister()
    66  	e.cmListerSynced = cmInformer.Informer().HasSynced
    67  
    68  	nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    69  		AddFunc:    e.namespaceAdded,
    70  		UpdateFunc: e.namespaceUpdated,
    71  	})
    72  	e.nsListerSynced = nsInformer.Informer().HasSynced
    73  
    74  	e.syncHandler = e.syncNamespace
    75  
    76  	return e, nil
    77  
    78  }
    79  
    80  // Publisher manages certificate ConfigMap objects inside Namespaces
    81  type Publisher struct {
    82  	client clientset.Interface
    83  	rootCA []byte
    84  
    85  	// To allow injection for testing.
    86  	syncHandler func(ctx context.Context, key string) error
    87  
    88  	cmLister       corelisters.ConfigMapLister
    89  	cmListerSynced cache.InformerSynced
    90  
    91  	nsListerSynced cache.InformerSynced
    92  
    93  	queue workqueue.RateLimitingInterface
    94  }
    95  
    96  // Run starts process
    97  func (c *Publisher) Run(ctx context.Context, workers int) {
    98  	defer utilruntime.HandleCrash()
    99  	defer c.queue.ShutDown()
   100  
   101  	logger := klog.FromContext(ctx)
   102  	logger.Info("Starting root CA cert publisher controller")
   103  	defer logger.Info("Shutting down root CA cert publisher controller")
   104  
   105  	if !cache.WaitForNamedCacheSync("crt configmap", ctx.Done(), c.cmListerSynced) {
   106  		return
   107  	}
   108  
   109  	for i := 0; i < workers; i++ {
   110  		go wait.UntilWithContext(ctx, c.runWorker, time.Second)
   111  	}
   112  
   113  	<-ctx.Done()
   114  }
   115  
   116  func (c *Publisher) configMapDeleted(obj interface{}) {
   117  	cm, err := convertToCM(obj)
   118  	if err != nil {
   119  		utilruntime.HandleError(err)
   120  		return
   121  	}
   122  	if cm.Name != RootCACertConfigMapName {
   123  		return
   124  	}
   125  	c.queue.Add(cm.Namespace)
   126  }
   127  
   128  func (c *Publisher) configMapUpdated(_, newObj interface{}) {
   129  	cm, err := convertToCM(newObj)
   130  	if err != nil {
   131  		utilruntime.HandleError(err)
   132  		return
   133  	}
   134  	if cm.Name != RootCACertConfigMapName {
   135  		return
   136  	}
   137  	c.queue.Add(cm.Namespace)
   138  }
   139  
   140  func (c *Publisher) namespaceAdded(obj interface{}) {
   141  	namespace := obj.(*v1.Namespace)
   142  	c.queue.Add(namespace.Name)
   143  }
   144  
   145  func (c *Publisher) namespaceUpdated(oldObj interface{}, newObj interface{}) {
   146  	newNamespace := newObj.(*v1.Namespace)
   147  	if newNamespace.Status.Phase != v1.NamespaceActive {
   148  		return
   149  	}
   150  	c.queue.Add(newNamespace.Name)
   151  }
   152  
   153  func (c *Publisher) runWorker(ctx context.Context) {
   154  	for c.processNextWorkItem(ctx) {
   155  	}
   156  }
   157  
   158  // processNextWorkItem deals with one key off the queue. It returns false when
   159  // it's time to quit.
   160  func (c *Publisher) processNextWorkItem(ctx context.Context) bool {
   161  	key, quit := c.queue.Get()
   162  	if quit {
   163  		return false
   164  	}
   165  	defer c.queue.Done(key)
   166  
   167  	if err := c.syncHandler(ctx, key.(string)); err != nil {
   168  		utilruntime.HandleError(fmt.Errorf("syncing %q failed: %v", key, err))
   169  		c.queue.AddRateLimited(key)
   170  		return true
   171  	}
   172  
   173  	c.queue.Forget(key)
   174  	return true
   175  }
   176  
   177  func (c *Publisher) syncNamespace(ctx context.Context, ns string) (err error) {
   178  	startTime := time.Now()
   179  	defer func() {
   180  		recordMetrics(startTime, err)
   181  		klog.FromContext(ctx).V(4).Info("Finished syncing namespace", "namespace", ns, "elapsedTime", time.Since(startTime))
   182  	}()
   183  
   184  	cm, err := c.cmLister.ConfigMaps(ns).Get(RootCACertConfigMapName)
   185  	switch {
   186  	case apierrors.IsNotFound(err):
   187  		_, err = c.client.CoreV1().ConfigMaps(ns).Create(ctx, &v1.ConfigMap{
   188  			ObjectMeta: metav1.ObjectMeta{
   189  				Name:        RootCACertConfigMapName,
   190  				Annotations: map[string]string{DescriptionAnnotation: Description},
   191  			},
   192  			Data: map[string]string{
   193  				"ca.crt": string(c.rootCA),
   194  			},
   195  		}, metav1.CreateOptions{})
   196  		// don't retry a create if the namespace doesn't exist or is terminating
   197  		if apierrors.IsNotFound(err) || apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
   198  			return nil
   199  		}
   200  		return err
   201  	case err != nil:
   202  		return err
   203  	}
   204  
   205  	data := map[string]string{
   206  		"ca.crt": string(c.rootCA),
   207  	}
   208  
   209  	// ensure the data and the one annotation describing usage of this configmap match.
   210  	if reflect.DeepEqual(cm.Data, data) && len(cm.Annotations[DescriptionAnnotation]) > 0 {
   211  		return nil
   212  	}
   213  
   214  	// copy so we don't modify the cache's instance of the configmap
   215  	cm = cm.DeepCopy()
   216  	cm.Data = data
   217  	if cm.Annotations == nil {
   218  		cm.Annotations = map[string]string{}
   219  	}
   220  	cm.Annotations[DescriptionAnnotation] = Description
   221  
   222  	_, err = c.client.CoreV1().ConfigMaps(ns).Update(ctx, cm, metav1.UpdateOptions{})
   223  	return err
   224  }
   225  
   226  func convertToCM(obj interface{}) (*v1.ConfigMap, error) {
   227  	cm, ok := obj.(*v1.ConfigMap)
   228  	if !ok {
   229  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   230  		if !ok {
   231  			return nil, fmt.Errorf("couldn't get object from tombstone %#v", obj)
   232  		}
   233  		cm, ok = tombstone.Obj.(*v1.ConfigMap)
   234  		if !ok {
   235  			return nil, fmt.Errorf("tombstone contained object that is not a ConfigMap %#v", obj)
   236  		}
   237  	}
   238  	return cm, nil
   239  }
   240  

View as plain text