...

Source file src/k8s.io/kube-aggregator/pkg/controllers/openapi/controller.go

Documentation: k8s.io/kube-aggregator/pkg/controllers/openapi

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package openapi
    18  
    19  import (
    20  	"fmt"
    21  	"net/http"
    22  	"time"
    23  
    24  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    25  	"k8s.io/apimachinery/pkg/util/wait"
    26  	"k8s.io/client-go/util/workqueue"
    27  	"k8s.io/klog/v2"
    28  	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    29  	"k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
    30  )
    31  
    32  const (
    33  	successfulUpdateDelay      = time.Minute
    34  	successfulUpdateDelayLocal = time.Second
    35  	failedUpdateMaxExpDelay    = time.Hour
    36  )
    37  
    38  type syncAction int
    39  
    40  const (
    41  	syncRequeue syncAction = iota
    42  	syncRequeueRateLimited
    43  	syncNothing
    44  )
    45  
    46  // AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove
    47  // them if necessary.
    48  type AggregationController struct {
    49  	openAPIAggregationManager aggregator.SpecAggregator
    50  	queue                     workqueue.RateLimitingInterface
    51  	downloader                *aggregator.Downloader
    52  
    53  	// To allow injection for testing.
    54  	syncHandler func(key string) (syncAction, error)
    55  }
    56  
    57  // NewAggregationController creates new OpenAPI aggregation controller.
    58  func NewAggregationController(downloader *aggregator.Downloader, openAPIAggregationManager aggregator.SpecAggregator) *AggregationController {
    59  	c := &AggregationController{
    60  		openAPIAggregationManager: openAPIAggregationManager,
    61  		queue: workqueue.NewNamedRateLimitingQueue(
    62  			workqueue.NewItemExponentialFailureRateLimiter(successfulUpdateDelay, failedUpdateMaxExpDelay),
    63  			"open_api_aggregation_controller",
    64  		),
    65  		downloader: downloader,
    66  	}
    67  
    68  	c.syncHandler = c.sync
    69  
    70  	return c
    71  }
    72  
    73  // Run starts OpenAPI AggregationController
    74  func (c *AggregationController) Run(stopCh <-chan struct{}) {
    75  	defer utilruntime.HandleCrash()
    76  	defer c.queue.ShutDown()
    77  
    78  	klog.Info("Starting OpenAPI AggregationController")
    79  	defer klog.Info("Shutting down OpenAPI AggregationController")
    80  
    81  	go wait.Until(c.runWorker, time.Second, stopCh)
    82  
    83  	<-stopCh
    84  }
    85  
    86  func (c *AggregationController) runWorker() {
    87  	for c.processNextWorkItem() {
    88  	}
    89  }
    90  
    91  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
    92  func (c *AggregationController) processNextWorkItem() bool {
    93  	key, quit := c.queue.Get()
    94  	defer c.queue.Done(key)
    95  	if quit {
    96  		return false
    97  	}
    98  	klog.V(4).Infof("OpenAPI AggregationController: Processing item %s", key)
    99  
   100  	action, err := c.syncHandler(key.(string))
   101  	if err != nil {
   102  		utilruntime.HandleError(fmt.Errorf("loading OpenAPI spec for %q failed with: %v", key, err))
   103  	}
   104  
   105  	switch action {
   106  	case syncRequeue:
   107  		c.queue.AddAfter(key, successfulUpdateDelay)
   108  	case syncRequeueRateLimited:
   109  		klog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key)
   110  		c.queue.AddRateLimited(key)
   111  	case syncNothing:
   112  		c.queue.Forget(key)
   113  	}
   114  
   115  	return true
   116  }
   117  
   118  func (c *AggregationController) sync(key string) (syncAction, error) {
   119  	if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key); err != nil {
   120  		if err == aggregator.ErrAPIServiceNotFound {
   121  			return syncNothing, nil
   122  		} else {
   123  			return syncRequeueRateLimited, err
   124  		}
   125  	}
   126  	return syncRequeue, nil
   127  }
   128  
   129  // AddAPIService adds a new API Service to OpenAPI Aggregation.
   130  func (c *AggregationController) AddAPIService(handler http.Handler, apiService *v1.APIService) {
   131  	if apiService.Spec.Service == nil {
   132  		return
   133  	}
   134  	if err := c.openAPIAggregationManager.AddUpdateAPIService(apiService, handler); err != nil {
   135  		utilruntime.HandleError(fmt.Errorf("adding %q to AggregationController failed with: %v", apiService.Name, err))
   136  	}
   137  	c.queue.AddAfter(apiService.Name, time.Second)
   138  }
   139  
   140  // UpdateAPIService updates API Service's info and handler.
   141  func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *v1.APIService) {
   142  	if apiService.Spec.Service == nil {
   143  		return
   144  	}
   145  	if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(apiService.Name); err != nil {
   146  		utilruntime.HandleError(fmt.Errorf("Error updating APIService %q with err: %v", apiService.Name, err))
   147  	}
   148  	key := apiService.Name
   149  	if c.queue.NumRequeues(key) > 0 {
   150  		// The item has failed before. Remove it from failure queue and
   151  		// update it in a second
   152  		c.queue.Forget(key)
   153  		c.queue.AddAfter(key, time.Second)
   154  	}
   155  	// Else: The item has been succeeded before and it will be updated soon (after successfulUpdateDelay)
   156  	// we don't add it again as it will cause a duplication of items.
   157  }
   158  
   159  // RemoveAPIService removes API Service from OpenAPI Aggregation Controller.
   160  func (c *AggregationController) RemoveAPIService(apiServiceName string) {
   161  	c.openAPIAggregationManager.RemoveAPIService(apiServiceName)
   162  	// This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out
   163  	// and will not add it again to the queue.
   164  	c.queue.Forget(apiServiceName)
   165  }
   166  

View as plain text