...

Source file src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/controller.go

Documentation: k8s.io/kube-aggregator/pkg/controllers/openapiv3

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package openapiv3
    18  
    19  import (
    20  	"fmt"
    21  	"net/http"
    22  	"time"
    23  
    24  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    25  	"k8s.io/apimachinery/pkg/util/wait"
    26  	"k8s.io/client-go/util/workqueue"
    27  	"k8s.io/klog/v2"
    28  	"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    29  	"k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator"
    30  )
    31  
    32  const (
    33  	successfulUpdateDelay      = time.Minute
    34  	successfulUpdateDelayLocal = time.Second
    35  	failedUpdateMaxExpDelay    = time.Hour
    36  )
    37  
    38  type syncAction int
    39  
    40  const (
    41  	syncRequeue syncAction = iota
    42  	syncRequeueRateLimited
    43  	syncNothing
    44  )
    45  
    46  // AggregationController periodically checks the list of group-versions handled by each APIService and updates the discovery page periodically
    47  type AggregationController struct {
    48  	openAPIAggregationManager aggregator.SpecProxier
    49  	queue                     workqueue.RateLimitingInterface
    50  
    51  	// To allow injection for testing.
    52  	syncHandler func(key string) (syncAction, error)
    53  }
    54  
    55  // NewAggregationController creates new OpenAPI aggregation controller.
    56  func NewAggregationController(openAPIAggregationManager aggregator.SpecProxier) *AggregationController {
    57  	c := &AggregationController{
    58  		openAPIAggregationManager: openAPIAggregationManager,
    59  		queue: workqueue.NewNamedRateLimitingQueue(
    60  			workqueue.NewItemExponentialFailureRateLimiter(successfulUpdateDelay, failedUpdateMaxExpDelay),
    61  			"open_api_v3_aggregation_controller",
    62  		),
    63  	}
    64  
    65  	c.syncHandler = c.sync
    66  
    67  	// update each service at least once, also those which are not coming from APIServices, namely local services
    68  	for _, name := range openAPIAggregationManager.GetAPIServiceNames() {
    69  		c.queue.AddAfter(name, time.Second)
    70  	}
    71  
    72  	return c
    73  }
    74  
    75  // Run starts OpenAPI AggregationController
    76  func (c *AggregationController) Run(stopCh <-chan struct{}) {
    77  	defer utilruntime.HandleCrash()
    78  	defer c.queue.ShutDown()
    79  
    80  	klog.Info("Starting OpenAPI V3 AggregationController")
    81  	defer klog.Info("Shutting down OpenAPI V3 AggregationController")
    82  
    83  	go wait.Until(c.runWorker, time.Second, stopCh)
    84  
    85  	<-stopCh
    86  }
    87  
    88  func (c *AggregationController) runWorker() {
    89  	for c.processNextWorkItem() {
    90  	}
    91  }
    92  
    93  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
    94  func (c *AggregationController) processNextWorkItem() bool {
    95  	key, quit := c.queue.Get()
    96  	defer c.queue.Done(key)
    97  	if quit {
    98  		return false
    99  	}
   100  
   101  	if aggregator.IsLocalAPIService(key.(string)) {
   102  		// for local delegation targets that are aggregated once per second, log at
   103  		// higher level to avoid flooding the log
   104  		klog.V(6).Infof("OpenAPI AggregationController: Processing item %s", key)
   105  	} else {
   106  		klog.V(4).Infof("OpenAPI AggregationController: Processing item %s", key)
   107  	}
   108  
   109  	action, err := c.syncHandler(key.(string))
   110  	if err == nil {
   111  		c.queue.Forget(key)
   112  	} else {
   113  		utilruntime.HandleError(fmt.Errorf("loading OpenAPI spec for %q failed with: %v", key, err))
   114  	}
   115  
   116  	switch action {
   117  	case syncRequeue:
   118  		if aggregator.IsLocalAPIService(key.(string)) {
   119  			klog.V(7).Infof("OpenAPI AggregationController: action for local item %s: Requeue after %s.", key, successfulUpdateDelayLocal)
   120  			c.queue.AddAfter(key, successfulUpdateDelayLocal)
   121  		} else {
   122  			klog.V(7).Infof("OpenAPI AggregationController: action for item %s: Requeue.", key)
   123  			c.queue.AddAfter(key, successfulUpdateDelay)
   124  		}
   125  	case syncRequeueRateLimited:
   126  		klog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key)
   127  		c.queue.AddRateLimited(key)
   128  	case syncNothing:
   129  		klog.Infof("OpenAPI AggregationController: action for item %s: Nothing (removed from the queue).", key)
   130  	}
   131  
   132  	return true
   133  }
   134  
   135  func (c *AggregationController) sync(key string) (syncAction, error) {
   136  	if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key); err != nil {
   137  		if err == aggregator.ErrAPIServiceNotFound {
   138  			return syncNothing, nil
   139  		}
   140  		return syncRequeueRateLimited, err
   141  	}
   142  	return syncRequeue, nil
   143  }
   144  
   145  // AddAPIService adds a new API Service to OpenAPI Aggregation.
   146  func (c *AggregationController) AddAPIService(handler http.Handler, apiService *v1.APIService) {
   147  	if apiService.Spec.Service == nil {
   148  		return
   149  	}
   150  	c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService)
   151  	c.queue.AddAfter(apiService.Name, time.Second)
   152  }
   153  
   154  // UpdateAPIService updates API Service's info and handler.
   155  func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *v1.APIService) {
   156  	if apiService.Spec.Service == nil {
   157  		return
   158  	}
   159  	c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService)
   160  	key := apiService.Name
   161  	if c.queue.NumRequeues(key) > 0 {
   162  		// The item has failed before. Remove it from failure queue and
   163  		// update it in a second
   164  		c.queue.Forget(key)
   165  		c.queue.AddAfter(key, time.Second)
   166  	}
   167  }
   168  
   169  // RemoveAPIService removes API Service from OpenAPI Aggregation Controller.
   170  func (c *AggregationController) RemoveAPIService(apiServiceName string) {
   171  	c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName)
   172  	// This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out
   173  	// and will not add it again to the queue.
   174  	c.queue.Forget(apiServiceName)
   175  }
   176  

View as plain text