...

Source file src/k8s.io/kubernetes/pkg/controller/certificates/certificate_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/certificates

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package certificates implements an abstract controller that is useful for
    18  // building controllers that manage CSRs
    19  package certificates
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"time"
    25  
    26  	"golang.org/x/time/rate"
    27  
    28  	certificates "k8s.io/api/certificates/v1"
    29  	"k8s.io/apimachinery/pkg/api/errors"
    30  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	certificatesinformers "k8s.io/client-go/informers/certificates/v1"
    33  	clientset "k8s.io/client-go/kubernetes"
    34  	certificateslisters "k8s.io/client-go/listers/certificates/v1"
    35  	"k8s.io/client-go/tools/cache"
    36  	"k8s.io/client-go/util/workqueue"
    37  	"k8s.io/klog/v2"
    38  	"k8s.io/kubernetes/pkg/controller"
    39  )
    40  
    41  type CertificateController struct {
    42  	// name is an identifier for this particular controller instance.
    43  	name string
    44  
    45  	kubeClient clientset.Interface
    46  
    47  	csrLister  certificateslisters.CertificateSigningRequestLister
    48  	csrsSynced cache.InformerSynced
    49  
    50  	handler func(context.Context, *certificates.CertificateSigningRequest) error
    51  
    52  	queue workqueue.RateLimitingInterface
    53  }
    54  
    55  func NewCertificateController(
    56  	ctx context.Context,
    57  	name string,
    58  	kubeClient clientset.Interface,
    59  	csrInformer certificatesinformers.CertificateSigningRequestInformer,
    60  	handler func(context.Context, *certificates.CertificateSigningRequest) error,
    61  ) *CertificateController {
    62  	logger := klog.FromContext(ctx)
    63  	cc := &CertificateController{
    64  		name:       name,
    65  		kubeClient: kubeClient,
    66  		queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
    67  			workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second),
    68  			// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
    69  			&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
    70  		), "certificate"),
    71  		handler: handler,
    72  	}
    73  
    74  	// Manage the addition/update of certificate requests
    75  	csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    76  		AddFunc: func(obj interface{}) {
    77  			csr := obj.(*certificates.CertificateSigningRequest)
    78  			logger.V(4).Info("Adding certificate request", "csr", csr.Name)
    79  			cc.enqueueCertificateRequest(obj)
    80  		},
    81  		UpdateFunc: func(old, new interface{}) {
    82  			oldCSR := old.(*certificates.CertificateSigningRequest)
    83  			logger.V(4).Info("Updating certificate request", "old", oldCSR.Name)
    84  			cc.enqueueCertificateRequest(new)
    85  		},
    86  		DeleteFunc: func(obj interface{}) {
    87  			csr, ok := obj.(*certificates.CertificateSigningRequest)
    88  			if !ok {
    89  				tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
    90  				if !ok {
    91  					logger.V(2).Info("Couldn't get object from tombstone", "object", obj)
    92  					return
    93  				}
    94  				csr, ok = tombstone.Obj.(*certificates.CertificateSigningRequest)
    95  				if !ok {
    96  					logger.V(2).Info("Tombstone contained object that is not a CSR", "object", obj)
    97  					return
    98  				}
    99  			}
   100  			logger.V(4).Info("Deleting certificate request", "csr", csr.Name)
   101  			cc.enqueueCertificateRequest(obj)
   102  		},
   103  	})
   104  	cc.csrLister = csrInformer.Lister()
   105  	cc.csrsSynced = csrInformer.Informer().HasSynced
   106  	return cc
   107  }
   108  
   109  // Run the main goroutine responsible for watching and syncing jobs.
   110  func (cc *CertificateController) Run(ctx context.Context, workers int) {
   111  	defer utilruntime.HandleCrash()
   112  	defer cc.queue.ShutDown()
   113  
   114  	logger := klog.FromContext(ctx)
   115  	logger.Info("Starting certificate controller", "name", cc.name)
   116  	defer logger.Info("Shutting down certificate controller", "name", cc.name)
   117  
   118  	if !cache.WaitForNamedCacheSync(fmt.Sprintf("certificate-%s", cc.name), ctx.Done(), cc.csrsSynced) {
   119  		return
   120  	}
   121  
   122  	for i := 0; i < workers; i++ {
   123  		go wait.UntilWithContext(ctx, cc.worker, time.Second)
   124  	}
   125  
   126  	<-ctx.Done()
   127  }
   128  
   129  // worker runs a thread that dequeues CSRs, handles them, and marks them done.
   130  func (cc *CertificateController) worker(ctx context.Context) {
   131  	for cc.processNextWorkItem(ctx) {
   132  	}
   133  }
   134  
   135  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
   136  func (cc *CertificateController) processNextWorkItem(ctx context.Context) bool {
   137  	cKey, quit := cc.queue.Get()
   138  	if quit {
   139  		return false
   140  	}
   141  	defer cc.queue.Done(cKey)
   142  
   143  	if err := cc.syncFunc(ctx, cKey.(string)); err != nil {
   144  		cc.queue.AddRateLimited(cKey)
   145  		if _, ignorable := err.(ignorableError); !ignorable {
   146  			utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err))
   147  		} else {
   148  			klog.FromContext(ctx).V(4).Info("Sync certificate request failed", "csr", cKey, "err", err)
   149  		}
   150  		return true
   151  	}
   152  
   153  	cc.queue.Forget(cKey)
   154  	return true
   155  
   156  }
   157  
   158  func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) {
   159  	key, err := controller.KeyFunc(obj)
   160  	if err != nil {
   161  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
   162  		return
   163  	}
   164  	cc.queue.Add(key)
   165  }
   166  
   167  func (cc *CertificateController) syncFunc(ctx context.Context, key string) error {
   168  	logger := klog.FromContext(ctx)
   169  	startTime := time.Now()
   170  	defer func() {
   171  		logger.V(4).Info("Finished syncing certificate request", "csr", key, "elapsedTime", time.Since(startTime))
   172  	}()
   173  	csr, err := cc.csrLister.Get(key)
   174  	if errors.IsNotFound(err) {
   175  		logger.V(3).Info("csr has been deleted", "csr", key)
   176  		return nil
   177  	}
   178  	if err != nil {
   179  		return err
   180  	}
   181  
   182  	if len(csr.Status.Certificate) > 0 {
   183  		// no need to do anything because it already has a cert
   184  		return nil
   185  	}
   186  
   187  	// need to operate on a copy so we don't mutate the csr in the shared cache
   188  	csr = csr.DeepCopy()
   189  	return cc.handler(ctx, csr)
   190  }
   191  
   192  // IgnorableError returns an error that we shouldn't handle (i.e. log) because
   193  // it's spammy and usually user error. Instead we will log these errors at a
   194  // higher log level. We still need to throw these errors to signal that the
   195  // sync should be retried.
   196  func IgnorableError(s string, args ...interface{}) ignorableError {
   197  	return ignorableError(fmt.Sprintf(s, args...))
   198  }
   199  
   200  type ignorableError string
   201  
   202  func (e ignorableError) Error() string {
   203  	return string(e)
   204  }
   205  

View as plain text