...

Source file src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go

Documentation: k8s.io/kube-aggregator/pkg/controllers/status

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiserver
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net/http"
    23  	"net/url"
    24  	"reflect"
    25  	"sync"
    26  	"time"
    27  
    28  	v1 "k8s.io/api/core/v1"
    29  	"k8s.io/apimachinery/pkg/api/equality"
    30  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    31  	"k8s.io/apimachinery/pkg/api/meta"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/labels"
    34  	"k8s.io/apimachinery/pkg/runtime"
    35  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	v1informers "k8s.io/client-go/informers/core/v1"
    38  	v1listers "k8s.io/client-go/listers/core/v1"
    39  	"k8s.io/client-go/tools/cache"
    40  	"k8s.io/client-go/transport"
    41  	"k8s.io/client-go/util/workqueue"
    42  	"k8s.io/component-base/metrics/legacyregistry"
    43  	"k8s.io/klog/v2"
    44  	apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    45  	apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
    46  	apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
    47  	informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
    48  	listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
    49  	"k8s.io/kube-aggregator/pkg/controllers"
    50  )
    51  
    52  // making sure we only register metrics once into legacy registry
    53  var registerIntoLegacyRegistryOnce sync.Once
    54  
    55  type certKeyFunc func() ([]byte, []byte)
    56  
    57  // ServiceResolver knows how to convert a service reference into an actual location.
    58  type ServiceResolver interface {
    59  	ResolveEndpoint(namespace, name string, port int32) (*url.URL, error)
    60  }
    61  
    62  // AvailableConditionController handles checking the availability of registered API services.
    63  type AvailableConditionController struct {
    64  	apiServiceClient apiregistrationclient.APIServicesGetter
    65  
    66  	apiServiceLister listers.APIServiceLister
    67  	apiServiceSynced cache.InformerSynced
    68  
    69  	// serviceLister is used to get the IP to create the transport for
    70  	serviceLister  v1listers.ServiceLister
    71  	servicesSynced cache.InformerSynced
    72  
    73  	endpointsLister v1listers.EndpointsLister
    74  	endpointsSynced cache.InformerSynced
    75  
    76  	// proxyTransportDial specifies the dial function for creating unencrypted TCP connections.
    77  	proxyTransportDial         *transport.DialHolder
    78  	proxyCurrentCertKeyContent certKeyFunc
    79  	serviceResolver            ServiceResolver
    80  
    81  	// To allow injection for testing.
    82  	syncFn func(key string) error
    83  
    84  	queue workqueue.RateLimitingInterface
    85  	// map from service-namespace -> service-name -> apiservice names
    86  	cache map[string]map[string][]string
    87  	// this lock protects operations on the above cache
    88  	cacheLock sync.RWMutex
    89  
    90  	// metrics registered into legacy registry
    91  	metrics *availabilityMetrics
    92  }
    93  
    94  // NewAvailableConditionController returns a new AvailableConditionController.
    95  func NewAvailableConditionController(
    96  	apiServiceInformer informers.APIServiceInformer,
    97  	serviceInformer v1informers.ServiceInformer,
    98  	endpointsInformer v1informers.EndpointsInformer,
    99  	apiServiceClient apiregistrationclient.APIServicesGetter,
   100  	proxyTransportDial *transport.DialHolder,
   101  	proxyCurrentCertKeyContent certKeyFunc,
   102  	serviceResolver ServiceResolver,
   103  ) (*AvailableConditionController, error) {
   104  	c := &AvailableConditionController{
   105  		apiServiceClient: apiServiceClient,
   106  		apiServiceLister: apiServiceInformer.Lister(),
   107  		serviceLister:    serviceInformer.Lister(),
   108  		endpointsLister:  endpointsInformer.Lister(),
   109  		serviceResolver:  serviceResolver,
   110  		queue: workqueue.NewNamedRateLimitingQueue(
   111  			// We want a fairly tight requeue time.  The controller listens to the API, but because it relies on the routability of the
   112  			// service network, it is possible for an external, non-watchable factor to affect availability.  This keeps
   113  			// the maximum disruption time to a minimum, but it does prevent hot loops.
   114  			workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
   115  			"AvailableConditionController"),
   116  		proxyTransportDial:         proxyTransportDial,
   117  		proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
   118  		metrics:                    newAvailabilityMetrics(),
   119  	}
   120  
   121  	// resync on this one because it is low cardinality and rechecking the actual discovery
   122  	// allows us to detect health in a more timely fashion when network connectivity to
   123  	// nodes is snipped, but the network still attempts to route there.  See
   124  	// https://github.com/openshift/origin/issues/17159#issuecomment-341798063
   125  	apiServiceHandler, _ := apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod(
   126  		cache.ResourceEventHandlerFuncs{
   127  			AddFunc:    c.addAPIService,
   128  			UpdateFunc: c.updateAPIService,
   129  			DeleteFunc: c.deleteAPIService,
   130  		},
   131  		30*time.Second)
   132  	c.apiServiceSynced = apiServiceHandler.HasSynced
   133  
   134  	serviceHandler, _ := serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   135  		AddFunc:    c.addService,
   136  		UpdateFunc: c.updateService,
   137  		DeleteFunc: c.deleteService,
   138  	})
   139  	c.servicesSynced = serviceHandler.HasSynced
   140  
   141  	endpointsHandler, _ := endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   142  		AddFunc:    c.addEndpoints,
   143  		UpdateFunc: c.updateEndpoints,
   144  		DeleteFunc: c.deleteEndpoints,
   145  	})
   146  	c.endpointsSynced = endpointsHandler.HasSynced
   147  
   148  	c.syncFn = c.sync
   149  
   150  	// TODO: decouple from legacyregistry
   151  	var err error
   152  	registerIntoLegacyRegistryOnce.Do(func() {
   153  		err = c.metrics.Register(legacyregistry.Register, legacyregistry.CustomRegister)
   154  	})
   155  	if err != nil {
   156  		return nil, err
   157  	}
   158  
   159  	return c, nil
   160  }
   161  
   162  func (c *AvailableConditionController) sync(key string) error {
   163  	originalAPIService, err := c.apiServiceLister.Get(key)
   164  	if apierrors.IsNotFound(err) {
   165  		c.metrics.ForgetAPIService(key)
   166  		return nil
   167  	}
   168  	if err != nil {
   169  		return err
   170  	}
   171  
   172  	// if a particular transport was specified, use that otherwise build one
   173  	// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
   174  	// that's not so bad) and sets a very short timeout.  This is a best effort GET that provides no additional information
   175  	transportConfig := &transport.Config{
   176  		TLS: transport.TLSConfig{
   177  			Insecure: true,
   178  		},
   179  		DialHolder: c.proxyTransportDial,
   180  	}
   181  
   182  	if c.proxyCurrentCertKeyContent != nil {
   183  		proxyClientCert, proxyClientKey := c.proxyCurrentCertKeyContent()
   184  
   185  		transportConfig.TLS.CertData = proxyClientCert
   186  		transportConfig.TLS.KeyData = proxyClientKey
   187  	}
   188  	restTransport, err := transport.New(transportConfig)
   189  	if err != nil {
   190  		return err
   191  	}
   192  	discoveryClient := &http.Client{
   193  		Transport: restTransport,
   194  		// the request should happen quickly.
   195  		Timeout: 5 * time.Second,
   196  		CheckRedirect: func(req *http.Request, via []*http.Request) error {
   197  			return http.ErrUseLastResponse
   198  		},
   199  	}
   200  
   201  	apiService := originalAPIService.DeepCopy()
   202  
   203  	availableCondition := apiregistrationv1.APIServiceCondition{
   204  		Type:               apiregistrationv1.Available,
   205  		Status:             apiregistrationv1.ConditionTrue,
   206  		LastTransitionTime: metav1.Now(),
   207  	}
   208  
   209  	// local API services are always considered available
   210  	if apiService.Spec.Service == nil {
   211  		apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition())
   212  		_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
   213  		return err
   214  	}
   215  
   216  	service, err := c.serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
   217  	if apierrors.IsNotFound(err) {
   218  		availableCondition.Status = apiregistrationv1.ConditionFalse
   219  		availableCondition.Reason = "ServiceNotFound"
   220  		availableCondition.Message = fmt.Sprintf("service/%s in %q is not present", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
   221  		apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
   222  		_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
   223  		return err
   224  	} else if err != nil {
   225  		availableCondition.Status = apiregistrationv1.ConditionUnknown
   226  		availableCondition.Reason = "ServiceAccessError"
   227  		availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
   228  		apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
   229  		_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
   230  		return err
   231  	}
   232  
   233  	if service.Spec.Type == v1.ServiceTypeClusterIP {
   234  		// if we have a cluster IP service, it must be listening on configured port and we can check that
   235  		servicePort := apiService.Spec.Service.Port
   236  		portName := ""
   237  		foundPort := false
   238  		for _, port := range service.Spec.Ports {
   239  			if port.Port == *servicePort {
   240  				foundPort = true
   241  				portName = port.Name
   242  				break
   243  			}
   244  		}
   245  		if !foundPort {
   246  			availableCondition.Status = apiregistrationv1.ConditionFalse
   247  			availableCondition.Reason = "ServicePortError"
   248  			availableCondition.Message = fmt.Sprintf("service/%s in %q is not listening on port %d", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, *apiService.Spec.Service.Port)
   249  			apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
   250  			_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
   251  			return err
   252  		}
   253  
   254  		endpoints, err := c.endpointsLister.Endpoints(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
   255  		if apierrors.IsNotFound(err) {
   256  			availableCondition.Status = apiregistrationv1.ConditionFalse
   257  			availableCondition.Reason = "EndpointsNotFound"
   258  			availableCondition.Message = fmt.Sprintf("cannot find endpoints for service/%s in %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
   259  			apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
   260  			_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
   261  			return err
   262  		} else if err != nil {
   263  			availableCondition.Status = apiregistrationv1.ConditionUnknown
   264  			availableCondition.Reason = "EndpointsAccessError"
   265  			availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
   266  			apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
   267  			_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
   268  			return err
   269  		}
   270  		hasActiveEndpoints := false
   271  	outer:
   272  		for _, subset := range endpoints.Subsets {
   273  			if len(subset.Addresses) == 0 {
   274  				continue
   275  			}
   276  			for _, endpointPort := range subset.Ports {
   277  				if endpointPort.Name == portName {
   278  					hasActiveEndpoints = true
   279  					break outer
   280  				}
   281  			}
   282  		}
   283  		if !hasActiveEndpoints {
   284  			availableCondition.Status = apiregistrationv1.ConditionFalse
   285  			availableCondition.Reason = "MissingEndpoints"
   286  			availableCondition.Message = fmt.Sprintf("endpoints for service/%s in %q have no addresses with port name %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, portName)
   287  			apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
   288  			_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
   289  			return err
   290  		}
   291  	}
   292  	// actually try to hit the discovery endpoint when it isn't local and when we're routing as a service.
   293  	if apiService.Spec.Service != nil && c.serviceResolver != nil {
   294  		attempts := 5
   295  		results := make(chan error, attempts)
   296  		for i := 0; i < attempts; i++ {
   297  			go func() {
   298  				discoveryURL, err := c.serviceResolver.ResolveEndpoint(apiService.Spec.Service.Namespace, apiService.Spec.Service.Name, *apiService.Spec.Service.Port)
   299  				if err != nil {
   300  					results <- err
   301  					return
   302  				}
   303  				// render legacyAPIService health check path when it is delegated to a service
   304  				if apiService.Name == "v1." {
   305  					discoveryURL.Path = "/api/" + apiService.Spec.Version
   306  				} else {
   307  					discoveryURL.Path = "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
   308  				}
   309  
   310  				errCh := make(chan error, 1)
   311  				go func() {
   312  					// be sure to check a URL that the aggregated API server is required to serve
   313  					newReq, err := http.NewRequest("GET", discoveryURL.String(), nil)
   314  					if err != nil {
   315  						errCh <- err
   316  						return
   317  					}
   318  
   319  					// setting the system-masters identity ensures that we will always have access rights
   320  					transport.SetAuthProxyHeaders(newReq, "system:kube-aggregator", []string{"system:masters"}, nil)
   321  					resp, err := discoveryClient.Do(newReq)
   322  					if resp != nil {
   323  						resp.Body.Close()
   324  						// we should always been in the 200s or 300s
   325  						if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
   326  							errCh <- fmt.Errorf("bad status from %v: %v", discoveryURL, resp.StatusCode)
   327  							return
   328  						}
   329  					}
   330  
   331  					errCh <- err
   332  				}()
   333  
   334  				select {
   335  				case err = <-errCh:
   336  					if err != nil {
   337  						results <- fmt.Errorf("failing or missing response from %v: %v", discoveryURL, err)
   338  						return
   339  					}
   340  
   341  					// we had trouble with slow dial and DNS responses causing us to wait too long.
   342  					// we added this as insurance
   343  				case <-time.After(6 * time.Second):
   344  					results <- fmt.Errorf("timed out waiting for %v", discoveryURL)
   345  					return
   346  				}
   347  
   348  				results <- nil
   349  			}()
   350  		}
   351  
   352  		var lastError error
   353  		for i := 0; i < attempts; i++ {
   354  			lastError = <-results
   355  			// if we had at least one success, we are successful overall and we can return now
   356  			if lastError == nil {
   357  				break
   358  			}
   359  		}
   360  
   361  		if lastError != nil {
   362  			availableCondition.Status = apiregistrationv1.ConditionFalse
   363  			availableCondition.Reason = "FailedDiscoveryCheck"
   364  			availableCondition.Message = lastError.Error()
   365  			apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
   366  			_, updateErr := c.updateAPIServiceStatus(originalAPIService, apiService)
   367  			if updateErr != nil {
   368  				return updateErr
   369  			}
   370  			// force a requeue to make it very obvious that this will be retried at some point in the future
   371  			// along with other requeues done via service change, endpoint change, and resync
   372  			return lastError
   373  		}
   374  	}
   375  
   376  	availableCondition.Reason = "Passed"
   377  	availableCondition.Message = "all checks passed"
   378  	apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
   379  	_, err = c.updateAPIServiceStatus(originalAPIService, apiService)
   380  	return err
   381  }
   382  
   383  // updateAPIServiceStatus only issues an update if a change is detected.  We have a tight resync loop to quickly detect dead
   384  // apiservices. Doing that means we don't want to quickly issue no-op updates.
   385  func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) {
   386  	// update this metric on every sync operation to reflect the actual state
   387  	c.setUnavailableGauge(newAPIService)
   388  
   389  	if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) {
   390  		return newAPIService, nil
   391  	}
   392  
   393  	orig := apiregistrationv1apihelper.GetAPIServiceConditionByType(originalAPIService, apiregistrationv1.Available)
   394  	now := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available)
   395  	unknown := apiregistrationv1.APIServiceCondition{
   396  		Type:   apiregistrationv1.Available,
   397  		Status: apiregistrationv1.ConditionUnknown,
   398  	}
   399  	if orig == nil {
   400  		orig = &unknown
   401  	}
   402  	if now == nil {
   403  		now = &unknown
   404  	}
   405  	if *orig != *now {
   406  		klog.V(2).InfoS("changing APIService availability", "name", newAPIService.Name, "oldStatus", orig.Status, "newStatus", now.Status, "message", now.Message, "reason", now.Reason)
   407  	}
   408  
   409  	newAPIService, err := c.apiServiceClient.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{})
   410  	if err != nil {
   411  		return nil, err
   412  	}
   413  
   414  	c.setUnavailableCounter(originalAPIService, newAPIService)
   415  	return newAPIService, nil
   416  }
   417  
   418  // Run starts the AvailableConditionController loop which manages the availability condition of API services.
   419  func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{}) {
   420  	defer utilruntime.HandleCrash()
   421  	defer c.queue.ShutDown()
   422  
   423  	klog.Info("Starting AvailableConditionController")
   424  	defer klog.Info("Shutting down AvailableConditionController")
   425  
   426  	// This waits not just for the informers to sync, but for our handlers
   427  	// to be called; since the handlers are three different ways of
   428  	// enqueueing the same thing, waiting for this permits the queue to
   429  	// maximally de-duplicate the entries.
   430  	if !controllers.WaitForCacheSync("AvailableConditionController", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointsSynced) {
   431  		return
   432  	}
   433  
   434  	for i := 0; i < workers; i++ {
   435  		go wait.Until(c.runWorker, time.Second, stopCh)
   436  	}
   437  
   438  	<-stopCh
   439  }
   440  
   441  func (c *AvailableConditionController) runWorker() {
   442  	for c.processNextWorkItem() {
   443  	}
   444  }
   445  
   446  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
   447  func (c *AvailableConditionController) processNextWorkItem() bool {
   448  	key, quit := c.queue.Get()
   449  	if quit {
   450  		return false
   451  	}
   452  	defer c.queue.Done(key)
   453  
   454  	err := c.syncFn(key.(string))
   455  	if err == nil {
   456  		c.queue.Forget(key)
   457  		return true
   458  	}
   459  
   460  	utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
   461  	c.queue.AddRateLimited(key)
   462  
   463  	return true
   464  }
   465  
   466  func (c *AvailableConditionController) addAPIService(obj interface{}) {
   467  	castObj := obj.(*apiregistrationv1.APIService)
   468  	klog.V(4).Infof("Adding %s", castObj.Name)
   469  	if castObj.Spec.Service != nil {
   470  		c.rebuildAPIServiceCache()
   471  	}
   472  	c.queue.Add(castObj.Name)
   473  }
   474  
   475  func (c *AvailableConditionController) updateAPIService(oldObj, newObj interface{}) {
   476  	castObj := newObj.(*apiregistrationv1.APIService)
   477  	oldCastObj := oldObj.(*apiregistrationv1.APIService)
   478  	klog.V(4).Infof("Updating %s", oldCastObj.Name)
   479  	if !reflect.DeepEqual(castObj.Spec.Service, oldCastObj.Spec.Service) {
   480  		c.rebuildAPIServiceCache()
   481  	}
   482  	c.queue.Add(oldCastObj.Name)
   483  }
   484  
   485  func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
   486  	castObj, ok := obj.(*apiregistrationv1.APIService)
   487  	if !ok {
   488  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   489  		if !ok {
   490  			klog.Errorf("Couldn't get object from tombstone %#v", obj)
   491  			return
   492  		}
   493  		castObj, ok = tombstone.Obj.(*apiregistrationv1.APIService)
   494  		if !ok {
   495  			klog.Errorf("Tombstone contained object that is not expected %#v", obj)
   496  			return
   497  		}
   498  	}
   499  	klog.V(4).Infof("Deleting %q", castObj.Name)
   500  	if castObj.Spec.Service != nil {
   501  		c.rebuildAPIServiceCache()
   502  	}
   503  	c.queue.Add(castObj.Name)
   504  }
   505  
   506  func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []string {
   507  	metadata, err := meta.Accessor(obj)
   508  	if err != nil {
   509  		utilruntime.HandleError(err)
   510  		return nil
   511  	}
   512  	c.cacheLock.RLock()
   513  	defer c.cacheLock.RUnlock()
   514  	return c.cache[metadata.GetNamespace()][metadata.GetName()]
   515  }
   516  
   517  // if the service/endpoint handler wins the race against the cache rebuilding, it may queue a no-longer-relevant apiservice
   518  // (which will get processed an extra time - this doesn't matter),
   519  // and miss a newly relevant apiservice (which will get queued by the apiservice handler)
   520  func (c *AvailableConditionController) rebuildAPIServiceCache() {
   521  	apiServiceList, _ := c.apiServiceLister.List(labels.Everything())
   522  	newCache := map[string]map[string][]string{}
   523  	for _, apiService := range apiServiceList {
   524  		if apiService.Spec.Service == nil {
   525  			continue
   526  		}
   527  		if newCache[apiService.Spec.Service.Namespace] == nil {
   528  			newCache[apiService.Spec.Service.Namespace] = map[string][]string{}
   529  		}
   530  		newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name] = append(newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name], apiService.Name)
   531  	}
   532  
   533  	c.cacheLock.Lock()
   534  	defer c.cacheLock.Unlock()
   535  	c.cache = newCache
   536  }
   537  
   538  // TODO, think of a way to avoid checking on every service manipulation
   539  
   540  func (c *AvailableConditionController) addService(obj interface{}) {
   541  	for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
   542  		c.queue.Add(apiService)
   543  	}
   544  }
   545  
   546  func (c *AvailableConditionController) updateService(obj, _ interface{}) {
   547  	for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
   548  		c.queue.Add(apiService)
   549  	}
   550  }
   551  
   552  func (c *AvailableConditionController) deleteService(obj interface{}) {
   553  	castObj, ok := obj.(*v1.Service)
   554  	if !ok {
   555  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   556  		if !ok {
   557  			klog.Errorf("Couldn't get object from tombstone %#v", obj)
   558  			return
   559  		}
   560  		castObj, ok = tombstone.Obj.(*v1.Service)
   561  		if !ok {
   562  			klog.Errorf("Tombstone contained object that is not expected %#v", obj)
   563  			return
   564  		}
   565  	}
   566  	for _, apiService := range c.getAPIServicesFor(castObj) {
   567  		c.queue.Add(apiService)
   568  	}
   569  }
   570  
   571  func (c *AvailableConditionController) addEndpoints(obj interface{}) {
   572  	for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
   573  		c.queue.Add(apiService)
   574  	}
   575  }
   576  
   577  func (c *AvailableConditionController) updateEndpoints(obj, _ interface{}) {
   578  	for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
   579  		c.queue.Add(apiService)
   580  	}
   581  }
   582  
   583  func (c *AvailableConditionController) deleteEndpoints(obj interface{}) {
   584  	castObj, ok := obj.(*v1.Endpoints)
   585  	if !ok {
   586  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   587  		if !ok {
   588  			klog.Errorf("Couldn't get object from tombstone %#v", obj)
   589  			return
   590  		}
   591  		castObj, ok = tombstone.Obj.(*v1.Endpoints)
   592  		if !ok {
   593  			klog.Errorf("Tombstone contained object that is not expected %#v", obj)
   594  			return
   595  		}
   596  	}
   597  	for _, apiService := range c.getAPIServicesFor(castObj) {
   598  		c.queue.Add(apiService)
   599  	}
   600  }
   601  
   602  // setUnavailableGauge set the metrics so that it reflect the current state base on availability of the given service
   603  func (c *AvailableConditionController) setUnavailableGauge(newAPIService *apiregistrationv1.APIService) {
   604  	if apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) {
   605  		c.metrics.SetAPIServiceAvailable(newAPIService.Name)
   606  		return
   607  	}
   608  
   609  	c.metrics.SetAPIServiceUnavailable(newAPIService.Name)
   610  }
   611  
   612  // setUnavailableCounter increases the metrics only if the given service is unavailable and its APIServiceCondition has changed
   613  func (c *AvailableConditionController) setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) {
   614  	wasAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(originalAPIService, apiregistrationv1.Available)
   615  	isAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available)
   616  	statusChanged := isAvailable != wasAvailable
   617  
   618  	if statusChanged && !isAvailable {
   619  		reason := "UnknownReason"
   620  		if newCondition := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available); newCondition != nil {
   621  			reason = newCondition.Reason
   622  		}
   623  		c.metrics.UnavailableCounter(newAPIService.Name, reason).Inc()
   624  	}
   625  }
   626  

View as plain text