...

Source file src/k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go

Documentation: k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package clusterauthenticationtrust
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"crypto/x509"
    23  	"encoding/json"
    24  	"encoding/pem"
    25  	"fmt"
    26  	"reflect"
    27  	"strings"
    28  	"time"
    29  
    30  	corev1 "k8s.io/api/core/v1"
    31  	"k8s.io/apimachinery/pkg/api/equality"
    32  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    35  	"k8s.io/apimachinery/pkg/util/sets"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	"k8s.io/apiserver/pkg/authentication/request/headerrequest"
    38  	"k8s.io/apiserver/pkg/server/dynamiccertificates"
    39  	corev1informers "k8s.io/client-go/informers/core/v1"
    40  	"k8s.io/client-go/kubernetes"
    41  	corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
    42  	corev1listers "k8s.io/client-go/listers/core/v1"
    43  	"k8s.io/client-go/tools/cache"
    44  	"k8s.io/client-go/util/cert"
    45  	"k8s.io/client-go/util/workqueue"
    46  	"k8s.io/klog/v2"
    47  )
    48  
    49  const (
    50  	configMapNamespace = "kube-system"
    51  	configMapName      = "extension-apiserver-authentication"
    52  )
    53  
    54  // Controller holds the running state for the controller
    55  type Controller struct {
    56  	requiredAuthenticationData ClusterAuthenticationInfo
    57  
    58  	configMapLister corev1listers.ConfigMapLister
    59  	configMapClient corev1client.ConfigMapsGetter
    60  	namespaceClient corev1client.NamespacesGetter
    61  
    62  	// queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors.
    63  	// we only ever place one entry in here, but it is keyed as usual: namespace/name
    64  	queue workqueue.RateLimitingInterface
    65  
    66  	// kubeSystemConfigMapInformer is tracked so that we can start these on Run
    67  	kubeSystemConfigMapInformer cache.SharedIndexInformer
    68  
    69  	// preRunCaches are the caches to sync before starting the work of this control loop
    70  	preRunCaches []cache.InformerSynced
    71  }
    72  
    73  // ClusterAuthenticationInfo holds the information that will included in public configmap.
    74  type ClusterAuthenticationInfo struct {
    75  	// ClientCA is the CA that can be used to verify the identity of normal clients
    76  	ClientCA dynamiccertificates.CAContentProvider
    77  
    78  	// RequestHeaderUsernameHeaders are the headers used by this kube-apiserver to determine username
    79  	RequestHeaderUsernameHeaders headerrequest.StringSliceProvider
    80  	// RequestHeaderGroupHeaders are the headers used by this kube-apiserver to determine groups
    81  	RequestHeaderGroupHeaders headerrequest.StringSliceProvider
    82  	// RequestHeaderExtraHeaderPrefixes are the headers used by this kube-apiserver to determine user.extra
    83  	RequestHeaderExtraHeaderPrefixes headerrequest.StringSliceProvider
    84  	// RequestHeaderAllowedNames are the sujbects allowed to act as a front proxy
    85  	RequestHeaderAllowedNames headerrequest.StringSliceProvider
    86  	// RequestHeaderCA is the CA that can be used to verify the front proxy
    87  	RequestHeaderCA dynamiccertificates.CAContentProvider
    88  }
    89  
    90  // NewClusterAuthenticationTrustController returns a controller that will maintain the kube-system configmap/extension-apiserver-authentication
    91  // that holds information about how to aggregated apiservers are recommended (but not required) to configure themselves.
    92  func NewClusterAuthenticationTrustController(requiredAuthenticationData ClusterAuthenticationInfo, kubeClient kubernetes.Interface) *Controller {
    93  	// we construct our own informer because we need such a small subset of the information available.  Just one namespace.
    94  	kubeSystemConfigMapInformer := corev1informers.NewConfigMapInformer(kubeClient, configMapNamespace, 12*time.Hour, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
    95  
    96  	c := &Controller{
    97  		requiredAuthenticationData:  requiredAuthenticationData,
    98  		configMapLister:             corev1listers.NewConfigMapLister(kubeSystemConfigMapInformer.GetIndexer()),
    99  		configMapClient:             kubeClient.CoreV1(),
   100  		namespaceClient:             kubeClient.CoreV1(),
   101  		queue:                       workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster_authentication_trust_controller"),
   102  		preRunCaches:                []cache.InformerSynced{kubeSystemConfigMapInformer.HasSynced},
   103  		kubeSystemConfigMapInformer: kubeSystemConfigMapInformer,
   104  	}
   105  
   106  	kubeSystemConfigMapInformer.AddEventHandler(cache.FilteringResourceEventHandler{
   107  		FilterFunc: func(obj interface{}) bool {
   108  			if cast, ok := obj.(*corev1.ConfigMap); ok {
   109  				return cast.Namespace == configMapNamespace && cast.Name == configMapName
   110  			}
   111  			if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
   112  				if cast, ok := tombstone.Obj.(*corev1.ConfigMap); ok {
   113  					return cast.Namespace == configMapNamespace && cast.Name == configMapName
   114  				}
   115  			}
   116  			return true // always return true just in case.  The checks are fairly cheap
   117  		},
   118  		Handler: cache.ResourceEventHandlerFuncs{
   119  			// we have a filter, so any time we're called, we may as well queue. We only ever check one configmap
   120  			// so we don't have to be choosy about our key.
   121  			AddFunc: func(obj interface{}) {
   122  				c.queue.Add(keyFn())
   123  			},
   124  			UpdateFunc: func(oldObj, newObj interface{}) {
   125  				c.queue.Add(keyFn())
   126  			},
   127  			DeleteFunc: func(obj interface{}) {
   128  				c.queue.Add(keyFn())
   129  			},
   130  		},
   131  	})
   132  
   133  	return c
   134  }
   135  
   136  func (c *Controller) syncConfigMap() error {
   137  	originalAuthConfigMap, err := c.configMapLister.ConfigMaps(configMapNamespace).Get(configMapName)
   138  	if apierrors.IsNotFound(err) {
   139  		originalAuthConfigMap = &corev1.ConfigMap{
   140  			ObjectMeta: metav1.ObjectMeta{Namespace: configMapNamespace, Name: configMapName},
   141  		}
   142  	} else if err != nil {
   143  		return err
   144  	}
   145  	// keep the original to diff against later before updating
   146  	authConfigMap := originalAuthConfigMap.DeepCopy()
   147  
   148  	existingAuthenticationInfo, err := getClusterAuthenticationInfoFor(originalAuthConfigMap.Data)
   149  	if err != nil {
   150  		return err
   151  	}
   152  	combinedInfo, err := combinedClusterAuthenticationInfo(existingAuthenticationInfo, c.requiredAuthenticationData)
   153  	if err != nil {
   154  		return err
   155  	}
   156  	authConfigMap.Data, err = getConfigMapDataFor(combinedInfo)
   157  	if err != nil {
   158  		return err
   159  	}
   160  
   161  	if equality.Semantic.DeepEqual(authConfigMap, originalAuthConfigMap) {
   162  		klog.V(5).Info("no changes to configmap")
   163  		return nil
   164  	}
   165  	klog.V(2).Infof("writing updated authentication info to  %s configmaps/%s", configMapNamespace, configMapName)
   166  
   167  	if err := createNamespaceIfNeeded(c.namespaceClient, authConfigMap.Namespace); err != nil {
   168  		return err
   169  	}
   170  	if err := writeConfigMap(c.configMapClient, authConfigMap); err != nil {
   171  		return err
   172  	}
   173  
   174  	return nil
   175  }
   176  
   177  func createNamespaceIfNeeded(nsClient corev1client.NamespacesGetter, ns string) error {
   178  	if _, err := nsClient.Namespaces().Get(context.TODO(), ns, metav1.GetOptions{}); err == nil {
   179  		// the namespace already exists
   180  		return nil
   181  	}
   182  	newNs := &corev1.Namespace{
   183  		ObjectMeta: metav1.ObjectMeta{
   184  			Name:      ns,
   185  			Namespace: "",
   186  		},
   187  	}
   188  	_, err := nsClient.Namespaces().Create(context.TODO(), newNs, metav1.CreateOptions{})
   189  	if err != nil && apierrors.IsAlreadyExists(err) {
   190  		err = nil
   191  	}
   192  	return err
   193  }
   194  
   195  func writeConfigMap(configMapClient corev1client.ConfigMapsGetter, required *corev1.ConfigMap) error {
   196  	_, err := configMapClient.ConfigMaps(required.Namespace).Update(context.TODO(), required, metav1.UpdateOptions{})
   197  	if apierrors.IsNotFound(err) {
   198  		_, err := configMapClient.ConfigMaps(required.Namespace).Create(context.TODO(), required, metav1.CreateOptions{})
   199  		return err
   200  	}
   201  
   202  	// If the configmap is too big, clear the entire thing and count on this controller (or another one) to add the correct data back.
   203  	// We return the original error which causes the controller to re-queue.
   204  	// Too big means
   205  	//   1. request is so big the generic request catcher finds it
   206  	//   2. the content is so large that that the server sends a validation error "Too long: must have at most 1048576 characters"
   207  	if apierrors.IsRequestEntityTooLargeError(err) || (apierrors.IsInvalid(err) && strings.Contains(err.Error(), "Too long")) {
   208  		if deleteErr := configMapClient.ConfigMaps(required.Namespace).Delete(context.TODO(), required.Name, metav1.DeleteOptions{}); deleteErr != nil {
   209  			return deleteErr
   210  		}
   211  		return err
   212  	}
   213  
   214  	return err
   215  }
   216  
   217  // combinedClusterAuthenticationInfo combines two sets of authentication information into a new one
   218  func combinedClusterAuthenticationInfo(lhs, rhs ClusterAuthenticationInfo) (ClusterAuthenticationInfo, error) {
   219  	ret := ClusterAuthenticationInfo{
   220  		RequestHeaderAllowedNames:        combineUniqueStringSlices(lhs.RequestHeaderAllowedNames, rhs.RequestHeaderAllowedNames),
   221  		RequestHeaderExtraHeaderPrefixes: combineUniqueStringSlices(lhs.RequestHeaderExtraHeaderPrefixes, rhs.RequestHeaderExtraHeaderPrefixes),
   222  		RequestHeaderGroupHeaders:        combineUniqueStringSlices(lhs.RequestHeaderGroupHeaders, rhs.RequestHeaderGroupHeaders),
   223  		RequestHeaderUsernameHeaders:     combineUniqueStringSlices(lhs.RequestHeaderUsernameHeaders, rhs.RequestHeaderUsernameHeaders),
   224  	}
   225  
   226  	var err error
   227  	ret.ClientCA, err = combineCertLists(lhs.ClientCA, rhs.ClientCA)
   228  	if err != nil {
   229  		return ClusterAuthenticationInfo{}, err
   230  	}
   231  	ret.RequestHeaderCA, err = combineCertLists(lhs.RequestHeaderCA, rhs.RequestHeaderCA)
   232  	if err != nil {
   233  		return ClusterAuthenticationInfo{}, err
   234  	}
   235  
   236  	return ret, nil
   237  }
   238  
   239  func getConfigMapDataFor(authenticationInfo ClusterAuthenticationInfo) (map[string]string, error) {
   240  	data := map[string]string{}
   241  	if authenticationInfo.ClientCA != nil {
   242  		if caBytes := authenticationInfo.ClientCA.CurrentCABundleContent(); len(caBytes) > 0 {
   243  			data["client-ca-file"] = string(caBytes)
   244  		}
   245  	}
   246  
   247  	if authenticationInfo.RequestHeaderCA == nil {
   248  		return data, nil
   249  	}
   250  
   251  	if caBytes := authenticationInfo.RequestHeaderCA.CurrentCABundleContent(); len(caBytes) > 0 {
   252  		var err error
   253  
   254  		// encoding errors aren't going to get better, so just fail on them.
   255  		data["requestheader-username-headers"], err = jsonSerializeStringSlice(authenticationInfo.RequestHeaderUsernameHeaders.Value())
   256  		if err != nil {
   257  			return nil, err
   258  		}
   259  		data["requestheader-group-headers"], err = jsonSerializeStringSlice(authenticationInfo.RequestHeaderGroupHeaders.Value())
   260  		if err != nil {
   261  			return nil, err
   262  		}
   263  		data["requestheader-extra-headers-prefix"], err = jsonSerializeStringSlice(authenticationInfo.RequestHeaderExtraHeaderPrefixes.Value())
   264  		if err != nil {
   265  			return nil, err
   266  		}
   267  
   268  		data["requestheader-client-ca-file"] = string(caBytes)
   269  		data["requestheader-allowed-names"], err = jsonSerializeStringSlice(authenticationInfo.RequestHeaderAllowedNames.Value())
   270  		if err != nil {
   271  			return nil, err
   272  		}
   273  	}
   274  
   275  	return data, nil
   276  }
   277  
   278  func getClusterAuthenticationInfoFor(data map[string]string) (ClusterAuthenticationInfo, error) {
   279  	ret := ClusterAuthenticationInfo{}
   280  
   281  	var err error
   282  	ret.RequestHeaderGroupHeaders, err = jsonDeserializeStringSlice(data["requestheader-group-headers"])
   283  	if err != nil {
   284  		return ClusterAuthenticationInfo{}, err
   285  	}
   286  	ret.RequestHeaderExtraHeaderPrefixes, err = jsonDeserializeStringSlice(data["requestheader-extra-headers-prefix"])
   287  	if err != nil {
   288  		return ClusterAuthenticationInfo{}, err
   289  	}
   290  	ret.RequestHeaderAllowedNames, err = jsonDeserializeStringSlice(data["requestheader-allowed-names"])
   291  	if err != nil {
   292  		return ClusterAuthenticationInfo{}, err
   293  	}
   294  	ret.RequestHeaderUsernameHeaders, err = jsonDeserializeStringSlice(data["requestheader-username-headers"])
   295  	if err != nil {
   296  		return ClusterAuthenticationInfo{}, err
   297  	}
   298  
   299  	if caBundle := data["requestheader-client-ca-file"]; len(caBundle) > 0 {
   300  		ret.RequestHeaderCA, err = dynamiccertificates.NewStaticCAContent("existing", []byte(caBundle))
   301  		if err != nil {
   302  			return ClusterAuthenticationInfo{}, err
   303  		}
   304  	}
   305  
   306  	if caBundle := data["client-ca-file"]; len(caBundle) > 0 {
   307  		ret.ClientCA, err = dynamiccertificates.NewStaticCAContent("existing", []byte(caBundle))
   308  		if err != nil {
   309  			return ClusterAuthenticationInfo{}, err
   310  		}
   311  	}
   312  
   313  	return ret, nil
   314  }
   315  
   316  func jsonSerializeStringSlice(in []string) (string, error) {
   317  	out, err := json.Marshal(in)
   318  	if err != nil {
   319  		return "", err
   320  	}
   321  	return string(out), err
   322  }
   323  
   324  func jsonDeserializeStringSlice(in string) (headerrequest.StringSliceProvider, error) {
   325  	if len(in) == 0 {
   326  		return nil, nil
   327  	}
   328  
   329  	out := []string{}
   330  	if err := json.Unmarshal([]byte(in), &out); err != nil {
   331  		return nil, err
   332  	}
   333  	return headerrequest.StaticStringSlice(out), nil
   334  }
   335  
   336  func combineUniqueStringSlices(lhs, rhs headerrequest.StringSliceProvider) headerrequest.StringSliceProvider {
   337  	ret := []string{}
   338  	present := sets.String{}
   339  
   340  	if lhs != nil {
   341  		for _, curr := range lhs.Value() {
   342  			if present.Has(curr) {
   343  				continue
   344  			}
   345  			ret = append(ret, curr)
   346  			present.Insert(curr)
   347  		}
   348  	}
   349  
   350  	if rhs != nil {
   351  		for _, curr := range rhs.Value() {
   352  			if present.Has(curr) {
   353  				continue
   354  			}
   355  			ret = append(ret, curr)
   356  			present.Insert(curr)
   357  		}
   358  	}
   359  
   360  	return headerrequest.StaticStringSlice(ret)
   361  }
   362  
   363  func combineCertLists(lhs, rhs dynamiccertificates.CAContentProvider) (dynamiccertificates.CAContentProvider, error) {
   364  	certificates := []*x509.Certificate{}
   365  
   366  	if lhs != nil {
   367  		lhsCABytes := lhs.CurrentCABundleContent()
   368  		lhsCAs, err := cert.ParseCertsPEM(lhsCABytes)
   369  		if err != nil {
   370  			return nil, err
   371  		}
   372  		certificates = append(certificates, lhsCAs...)
   373  	}
   374  	if rhs != nil {
   375  		rhsCABytes := rhs.CurrentCABundleContent()
   376  		rhsCAs, err := cert.ParseCertsPEM(rhsCABytes)
   377  		if err != nil {
   378  			return nil, err
   379  		}
   380  		certificates = append(certificates, rhsCAs...)
   381  	}
   382  
   383  	certificates = filterExpiredCerts(certificates...)
   384  
   385  	finalCertificates := []*x509.Certificate{}
   386  	// now check for duplicates. n^2, but super simple
   387  	for i := range certificates {
   388  		found := false
   389  		for j := range finalCertificates {
   390  			if reflect.DeepEqual(certificates[i].Raw, finalCertificates[j].Raw) {
   391  				found = true
   392  				break
   393  			}
   394  		}
   395  		if !found {
   396  			finalCertificates = append(finalCertificates, certificates[i])
   397  		}
   398  	}
   399  
   400  	finalCABytes, err := encodeCertificates(finalCertificates...)
   401  	if err != nil {
   402  		return nil, err
   403  	}
   404  
   405  	if len(finalCABytes) == 0 {
   406  		return nil, nil
   407  	}
   408  	// it makes sense for this list to be static because the combination of sources is only used just before writing and
   409  	// is recalculated
   410  	return dynamiccertificates.NewStaticCAContent("combined", finalCABytes)
   411  }
   412  
   413  // filterExpiredCerts checks are all certificates in the bundle valid, i.e. they have not expired.
   414  // The function returns new bundle with only valid certificates or error if no valid certificate is found.
   415  // We allow five minutes of slack for NotAfter comparisons
   416  func filterExpiredCerts(certs ...*x509.Certificate) []*x509.Certificate {
   417  	fiveMinutesAgo := time.Now().Add(-5 * time.Minute)
   418  
   419  	var validCerts []*x509.Certificate
   420  	for _, c := range certs {
   421  		if c.NotAfter.After(fiveMinutesAgo) {
   422  			validCerts = append(validCerts, c)
   423  		}
   424  	}
   425  
   426  	return validCerts
   427  }
   428  
   429  // Enqueue a method to allow separate control loops to cause the controller to trigger and reconcile content.
   430  func (c *Controller) Enqueue() {
   431  	c.queue.Add(keyFn())
   432  }
   433  
   434  // Run the controller until stopped.
   435  func (c *Controller) Run(ctx context.Context, workers int) {
   436  	defer utilruntime.HandleCrash()
   437  	// make sure the work queue is shutdown which will trigger workers to end
   438  	defer c.queue.ShutDown()
   439  
   440  	klog.Infof("Starting cluster_authentication_trust_controller controller")
   441  	defer klog.Infof("Shutting down cluster_authentication_trust_controller controller")
   442  
   443  	// we have a personal informer that is narrowly scoped, start it.
   444  	go c.kubeSystemConfigMapInformer.Run(ctx.Done())
   445  
   446  	// wait for your secondary caches to fill before starting your work
   447  	if !cache.WaitForNamedCacheSync("cluster_authentication_trust_controller", ctx.Done(), c.preRunCaches...) {
   448  		return
   449  	}
   450  
   451  	// only run one worker
   452  	go wait.Until(c.runWorker, time.Second, ctx.Done())
   453  
   454  	// checks are cheap.  run once a minute just to be sure we stay in sync in case fsnotify fails again
   455  	// start timer that rechecks every minute, just in case.  this also serves to prime the controller quickly.
   456  	_ = wait.PollImmediateUntil(1*time.Minute, func() (bool, error) {
   457  		c.queue.Add(keyFn())
   458  		return false, nil
   459  	}, ctx.Done())
   460  
   461  	// wait until we're told to stop
   462  	<-ctx.Done()
   463  }
   464  
   465  func (c *Controller) runWorker() {
   466  	// hot loop until we're told to stop.  processNextWorkItem will automatically wait until there's work
   467  	// available, so we don't worry about secondary waits
   468  	for c.processNextWorkItem() {
   469  	}
   470  }
   471  
   472  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
   473  func (c *Controller) processNextWorkItem() bool {
   474  	// pull the next work item from queue.  It should be a key we use to lookup something in a cache
   475  	key, quit := c.queue.Get()
   476  	if quit {
   477  		return false
   478  	}
   479  	// you always have to indicate to the queue that you've completed a piece of work
   480  	defer c.queue.Done(key)
   481  
   482  	// do your work on the key.  This method will contains your "do stuff" logic
   483  	err := c.syncConfigMap()
   484  	if err == nil {
   485  		// if you had no error, tell the queue to stop tracking history for your key.  This will
   486  		// reset things like failure counts for per-item rate limiting
   487  		c.queue.Forget(key)
   488  		return true
   489  	}
   490  
   491  	// there was a failure so be sure to report it.  This method allows for pluggable error handling
   492  	// which can be used for things like cluster-monitoring
   493  	utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
   494  	// since we failed, we should requeue the item to work on later.  This method will add a backoff
   495  	// to avoid hotlooping on particular items (they're probably still not going to work right away)
   496  	// and overall controller protection (everything I've done is broken, this controller needs to
   497  	// calm down or it can starve other useful work) cases.
   498  	c.queue.AddRateLimited(key)
   499  
   500  	return true
   501  }
   502  
   503  func keyFn() string {
   504  	// this format matches DeletionHandlingMetaNamespaceKeyFunc for our single key
   505  	return configMapNamespace + "/" + configMapName
   506  }
   507  
   508  func encodeCertificates(certs ...*x509.Certificate) ([]byte, error) {
   509  	b := bytes.Buffer{}
   510  	for _, cert := range certs {
   511  		if err := pem.Encode(&b, &pem.Block{Type: "CERTIFICATE", Bytes: cert.Raw}); err != nil {
   512  			return []byte{}, err
   513  		}
   514  	}
   515  	return b.Bytes(), nil
   516  }
   517  

View as plain text