...

Source file src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go

Documentation: k8s.io/kube-aggregator/pkg/apiserver

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiserver
    18  
    19  import (
    20  	"fmt"
    21  	"time"
    22  
    23  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    24  	"k8s.io/apimachinery/pkg/labels"
    25  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    26  	"k8s.io/apimachinery/pkg/util/wait"
    27  	"k8s.io/apiserver/pkg/server/dynamiccertificates"
    28  	"k8s.io/client-go/tools/cache"
    29  	"k8s.io/client-go/util/workqueue"
    30  	"k8s.io/klog/v2"
    31  
    32  	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    33  	informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
    34  	listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
    35  	"k8s.io/kube-aggregator/pkg/controllers"
    36  )
    37  
    38  // APIHandlerManager defines the behaviour that an API handler should have.
    39  type APIHandlerManager interface {
    40  	AddAPIService(apiService *v1.APIService) error
    41  	RemoveAPIService(apiServiceName string)
    42  }
    43  
    44  // APIServiceRegistrationController is responsible for registering and removing API services.
    45  type APIServiceRegistrationController struct {
    46  	apiHandlerManager APIHandlerManager
    47  
    48  	apiServiceLister listers.APIServiceLister
    49  	apiServiceSynced cache.InformerSynced
    50  
    51  	// To allow injection for testing.
    52  	syncFn func(key string) error
    53  
    54  	queue workqueue.RateLimitingInterface
    55  }
    56  
    57  var _ dynamiccertificates.Listener = &APIServiceRegistrationController{}
    58  
    59  // NewAPIServiceRegistrationController returns a new APIServiceRegistrationController.
    60  func NewAPIServiceRegistrationController(apiServiceInformer informers.APIServiceInformer, apiHandlerManager APIHandlerManager) *APIServiceRegistrationController {
    61  	c := &APIServiceRegistrationController{
    62  		apiHandlerManager: apiHandlerManager,
    63  		apiServiceLister:  apiServiceInformer.Lister(),
    64  		apiServiceSynced:  apiServiceInformer.Informer().HasSynced,
    65  		queue:             workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "APIServiceRegistrationController"),
    66  	}
    67  
    68  	apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    69  		AddFunc:    c.addAPIService,
    70  		UpdateFunc: c.updateAPIService,
    71  		DeleteFunc: c.deleteAPIService,
    72  	})
    73  
    74  	c.syncFn = c.sync
    75  
    76  	return c
    77  }
    78  
    79  func (c *APIServiceRegistrationController) sync(key string) error {
    80  	apiService, err := c.apiServiceLister.Get(key)
    81  	if apierrors.IsNotFound(err) {
    82  		c.apiHandlerManager.RemoveAPIService(key)
    83  		return nil
    84  	}
    85  	if err != nil {
    86  		return err
    87  	}
    88  
    89  	return c.apiHandlerManager.AddAPIService(apiService)
    90  }
    91  
    92  // Run starts APIServiceRegistrationController which will process all registration requests until stopCh is closed.
    93  func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}, handlerSyncedCh chan<- struct{}) {
    94  	defer utilruntime.HandleCrash()
    95  	defer c.queue.ShutDown()
    96  
    97  	klog.Info("Starting APIServiceRegistrationController")
    98  	defer klog.Info("Shutting down APIServiceRegistrationController")
    99  
   100  	if !controllers.WaitForCacheSync("APIServiceRegistrationController", stopCh, c.apiServiceSynced) {
   101  		return
   102  	}
   103  
   104  	/// initially sync all APIServices to make sure the proxy handler is complete
   105  	if err := wait.PollImmediateUntil(time.Second, func() (bool, error) {
   106  		services, err := c.apiServiceLister.List(labels.Everything())
   107  		if err != nil {
   108  			utilruntime.HandleError(fmt.Errorf("failed to initially list APIServices: %v", err))
   109  			return false, nil
   110  		}
   111  		for _, s := range services {
   112  			if err := c.apiHandlerManager.AddAPIService(s); err != nil {
   113  				utilruntime.HandleError(fmt.Errorf("failed to initially sync APIService %s: %v", s.Name, err))
   114  				return false, nil
   115  			}
   116  		}
   117  		return true, nil
   118  	}, stopCh); err == wait.ErrWaitTimeout {
   119  		utilruntime.HandleError(fmt.Errorf("timed out waiting for proxy handler to initialize"))
   120  		return
   121  	} else if err != nil {
   122  		panic(fmt.Errorf("unexpected error: %v", err))
   123  	}
   124  	close(handlerSyncedCh)
   125  
   126  	// only start one worker thread since its a slow moving API and the aggregation server adding bits
   127  	// aren't threadsafe
   128  	go wait.Until(c.runWorker, time.Second, stopCh)
   129  
   130  	<-stopCh
   131  }
   132  
   133  func (c *APIServiceRegistrationController) runWorker() {
   134  	for c.processNextWorkItem() {
   135  	}
   136  }
   137  
   138  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
   139  func (c *APIServiceRegistrationController) processNextWorkItem() bool {
   140  	key, quit := c.queue.Get()
   141  	if quit {
   142  		return false
   143  	}
   144  	defer c.queue.Done(key)
   145  
   146  	err := c.syncFn(key.(string))
   147  	if err == nil {
   148  		c.queue.Forget(key)
   149  		return true
   150  	}
   151  
   152  	utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
   153  	c.queue.AddRateLimited(key)
   154  
   155  	return true
   156  }
   157  
   158  func (c *APIServiceRegistrationController) enqueueInternal(obj *v1.APIService) {
   159  	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
   160  	if err != nil {
   161  		klog.Errorf("Couldn't get key for object %#v: %v", obj, err)
   162  		return
   163  	}
   164  
   165  	c.queue.Add(key)
   166  }
   167  
   168  func (c *APIServiceRegistrationController) addAPIService(obj interface{}) {
   169  	castObj := obj.(*v1.APIService)
   170  	klog.V(4).Infof("Adding %s", castObj.Name)
   171  	c.enqueueInternal(castObj)
   172  }
   173  
   174  func (c *APIServiceRegistrationController) updateAPIService(obj, _ interface{}) {
   175  	castObj := obj.(*v1.APIService)
   176  	klog.V(4).Infof("Updating %s", castObj.Name)
   177  	c.enqueueInternal(castObj)
   178  }
   179  
   180  func (c *APIServiceRegistrationController) deleteAPIService(obj interface{}) {
   181  	castObj, ok := obj.(*v1.APIService)
   182  	if !ok {
   183  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   184  		if !ok {
   185  			klog.Errorf("Couldn't get object from tombstone %#v", obj)
   186  			return
   187  		}
   188  		castObj, ok = tombstone.Obj.(*v1.APIService)
   189  		if !ok {
   190  			klog.Errorf("Tombstone contained object that is not expected %#v", obj)
   191  			return
   192  		}
   193  	}
   194  	klog.V(4).Infof("Deleting %q", castObj.Name)
   195  	c.enqueueInternal(castObj)
   196  }
   197  
   198  // Enqueue queues all apiservices to be rehandled.
   199  // This method is used by the controller to notify when the proxy cert content changes.
   200  func (c *APIServiceRegistrationController) Enqueue() {
   201  	apiServices, err := c.apiServiceLister.List(labels.Everything())
   202  	if err != nil {
   203  		utilruntime.HandleError(err)
   204  		return
   205  	}
   206  	for _, apiService := range apiServices {
   207  		c.addAPIService(apiService)
   208  	}
   209  }
   210  

View as plain text