...

Source file src/k8s.io/kube-aggregator/pkg/apiserver/handler_discovery.go

Documentation: k8s.io/kube-aggregator/pkg/apiserver

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiserver
    18  
    19  import (
    20  	"errors"
    21  	"fmt"
    22  	"net/http"
    23  	"sync"
    24  	"time"
    25  
    26  	apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
    27  	apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
    28  	apidiscoveryv2conversion "k8s.io/apiserver/pkg/apis/apidiscovery/v2"
    29  
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/runtime"
    32  	"k8s.io/apimachinery/pkg/runtime/schema"
    33  	"k8s.io/apimachinery/pkg/runtime/serializer"
    34  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    35  	"k8s.io/apimachinery/pkg/util/sets"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	"k8s.io/apiserver/pkg/authentication/user"
    38  	"k8s.io/apiserver/pkg/endpoints"
    39  	discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
    40  	"k8s.io/apiserver/pkg/endpoints/request"
    41  	"k8s.io/client-go/discovery"
    42  	"k8s.io/client-go/util/workqueue"
    43  	"k8s.io/klog/v2"
    44  	apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    45  	"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
    46  	"k8s.io/kube-aggregator/pkg/apiserver/scheme"
    47  )
    48  
    49  var APIRegistrationGroupVersion metav1.GroupVersion = metav1.GroupVersion{Group: "apiregistration.k8s.io", Version: "v1"}
    50  
    51  // Maximum is 20000. Set to higher than that so apiregistration always is listed
    52  // first (mirrors v1 discovery behavior)
    53  var APIRegistrationGroupPriority int = 20001
    54  
    55  // Aggregated discovery content-type GVK.
    56  var v2Beta1GVK = schema.GroupVersionKind{
    57  	Group:   "apidiscovery.k8s.io",
    58  	Version: "v2beta1",
    59  	Kind:    "APIGroupDiscoveryList",
    60  }
    61  
    62  var v2GVK = schema.GroupVersionKind{
    63  	Group:   "apidiscovery.k8s.io",
    64  	Version: "v2",
    65  	Kind:    "APIGroupDiscoveryList",
    66  }
    67  
    68  // Given a list of APIServices and proxyHandlers for contacting them,
    69  // DiscoveryManager caches a list of discovery documents for each server
    70  
    71  type DiscoveryAggregationController interface {
    72  	// Adds or Updates an APIService from the Aggregated Discovery Controller's
    73  	// knowledge base
    74  	// Thread-safe
    75  	AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler)
    76  
    77  	// Removes an APIService from the Aggregated Discovery Controller's Knowledge
    78  	// bank
    79  	// Thread-safe
    80  	RemoveAPIService(apiServiceName string)
    81  
    82  	// Spawns a worker which waits for added/updated apiservices and updates
    83  	// the unified discovery document by contacting the aggregated api services
    84  	Run(stopCh <-chan struct{}, discoverySyncedCh chan<- struct{})
    85  }
    86  
    87  type discoveryManager struct {
    88  	// Locks `apiServices`
    89  	servicesLock sync.RWMutex
    90  
    91  	// Map from APIService's name (or a unique string for local servers)
    92  	// to information about contacting that API Service
    93  	apiServices map[string]groupVersionInfo
    94  
    95  	// Locks cachedResults
    96  	resultsLock sync.RWMutex
    97  
    98  	// Map from APIService.Spec.Service to the previously fetched value
    99  	// (Note that many APIServices might use the same APIService.Spec.Service)
   100  	cachedResults map[serviceKey]cachedResult
   101  
   102  	// Queue of dirty apiServiceKey which need to be refreshed
   103  	// It is important that the reconciler for this queue does not excessively
   104  	// contact the apiserver if a key was enqueued before the server was last
   105  	// contacted.
   106  	dirtyAPIServiceQueue workqueue.RateLimitingInterface
   107  
   108  	// Merged handler which stores all known groupversions
   109  	mergedDiscoveryHandler discoveryendpoint.ResourceManager
   110  
   111  	// Codecs is the serializer used for decoding aggregated apiserver responses
   112  	codecs serializer.CodecFactory
   113  }
   114  
   115  // Version of Service/Spec with relevant fields for use as a cache key
   116  type serviceKey struct {
   117  	Namespace string
   118  	Name      string
   119  	Port      int32
   120  }
   121  
   122  // Human-readable String representation used for logs
   123  func (s serviceKey) String() string {
   124  	return fmt.Sprintf("%v/%v:%v", s.Namespace, s.Name, s.Port)
   125  }
   126  
   127  func newServiceKey(service apiregistrationv1.ServiceReference) serviceKey {
   128  	// Docs say. Defaults to 443 for compatibility reasons.
   129  	// BETA: Should this be a shared constant to avoid drifting with the
   130  	// implementation?
   131  	port := int32(443)
   132  	if service.Port != nil {
   133  		port = *service.Port
   134  	}
   135  
   136  	return serviceKey{
   137  		Name:      service.Name,
   138  		Namespace: service.Namespace,
   139  		Port:      port,
   140  	}
   141  }
   142  
   143  type cachedResult struct {
   144  	// Currently cached discovery document for this service
   145  	// Map from group name to version name to
   146  	discovery map[metav1.GroupVersion]apidiscoveryv2.APIVersionDiscovery
   147  
   148  	// ETag hash of the cached discoveryDocument
   149  	etag string
   150  
   151  	// Guaranteed to be a time less than the time the server responded with the
   152  	// discovery data.
   153  	lastUpdated time.Time
   154  }
   155  
   156  // Information about a specific APIService/GroupVersion
   157  type groupVersionInfo struct {
   158  	// Date this APIService was marked dirty.
   159  	// Guaranteed to be a time greater than the most recent time the APIService
   160  	// was known to be modified.
   161  	//
   162  	// Used for request deduplication to ensure the data used to reconcile each
   163  	// apiservice was retrieved after the time of the APIService change:
   164  	// real_apiservice_change_time < groupVersionInfo.lastMarkedDirty < cachedResult.lastUpdated < real_document_fresh_time
   165  	//
   166  	// This ensures that if the apiservice was changed after the last cached entry
   167  	// was stored, the discovery document will always be re-fetched.
   168  	lastMarkedDirty time.Time
   169  
   170  	// ServiceReference of this GroupVersion. This identifies the Service which
   171  	// describes how to contact the server responsible for this GroupVersion.
   172  	service serviceKey
   173  
   174  	// groupPriority describes the priority of the APIService's group for sorting
   175  	groupPriority int
   176  
   177  	// groupPriority describes the priority of the APIService version for sorting
   178  	versionPriority int
   179  
   180  	// Method for contacting the service
   181  	handler http.Handler
   182  }
   183  
   184  var _ DiscoveryAggregationController = &discoveryManager{}
   185  
   186  func NewDiscoveryManager(
   187  	target discoveryendpoint.ResourceManager,
   188  ) DiscoveryAggregationController {
   189  	discoveryScheme := runtime.NewScheme()
   190  	utilruntime.Must(apidiscoveryv2.AddToScheme(discoveryScheme))
   191  	utilruntime.Must(apidiscoveryv2beta1.AddToScheme(discoveryScheme))
   192  	// Register conversion for apidiscovery
   193  	utilruntime.Must(apidiscoveryv2conversion.RegisterConversions(discoveryScheme))
   194  	codecs := serializer.NewCodecFactory(discoveryScheme)
   195  
   196  	return &discoveryManager{
   197  		mergedDiscoveryHandler: target,
   198  		apiServices:            make(map[string]groupVersionInfo),
   199  		cachedResults:          make(map[serviceKey]cachedResult),
   200  		dirtyAPIServiceQueue:   workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "discovery-manager"),
   201  		codecs:                 codecs,
   202  	}
   203  }
   204  
   205  // Returns discovery data for the given apiservice.
   206  // Caches the result.
   207  // Returns the cached result if it is retrieved after the apiservice was last
   208  // marked dirty
   209  // If there was an error in fetching, returns the stale cached result if it exists,
   210  // and a non-nil error
   211  // If the result is current, returns nil error and non-nil result
   212  func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion, info groupVersionInfo) (*cachedResult, error) {
   213  	// Lookup last cached result for this apiservice's service.
   214  	cached, exists := dm.getCacheEntryForService(info.service)
   215  
   216  	// If entry exists and was updated after the given time, just stop now
   217  	if exists && cached.lastUpdated.After(info.lastMarkedDirty) {
   218  		return &cached, nil
   219  	}
   220  
   221  	// If we have a handler to contact the server for this APIService, and
   222  	// the cache entry is too old to use, refresh the cache entry now.
   223  	handler := http.TimeoutHandler(info.handler, 5*time.Second, "request timed out")
   224  	req, err := http.NewRequest("GET", "/apis", nil)
   225  	if err != nil {
   226  		// NewRequest should not fail, but if it does for some reason,
   227  		// log it and continue
   228  		return &cached, fmt.Errorf("failed to create http.Request: %v", err)
   229  	}
   230  
   231  	// Apply aggregator user to request
   232  	req = req.WithContext(
   233  		request.WithUser(
   234  			req.Context(), &user.DefaultInfo{Name: "system:kube-aggregator", Groups: []string{"system:masters"}}))
   235  	req = req.WithContext(request.WithRequestInfo(req.Context(), &request.RequestInfo{
   236  		Path:              req.URL.Path,
   237  		IsResourceRequest: false,
   238  	}))
   239  	req.Header.Add("Accept", discovery.AcceptV2+","+discovery.AcceptV2Beta1)
   240  
   241  	if exists && len(cached.etag) > 0 {
   242  		req.Header.Add("If-None-Match", cached.etag)
   243  	}
   244  
   245  	// Important that the time recorded in the data's "lastUpdated" is conservatively
   246  	// from BEFORE the request is dispatched so that lastUpdated can be used to
   247  	// de-duplicate requests.
   248  	now := time.Now()
   249  	writer := newInMemoryResponseWriter()
   250  	handler.ServeHTTP(writer, req)
   251  
   252  	isV2Beta1GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2Beta1GVK)
   253  	isV2GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2GVK)
   254  
   255  	switch {
   256  	case writer.respCode == http.StatusNotModified:
   257  		// Keep old entry, update timestamp
   258  		cached = cachedResult{
   259  			discovery:   cached.discovery,
   260  			etag:        cached.etag,
   261  			lastUpdated: now,
   262  		}
   263  
   264  		dm.setCacheEntryForService(info.service, cached)
   265  		return &cached, nil
   266  	case writer.respCode == http.StatusServiceUnavailable:
   267  		return nil, fmt.Errorf("service %s returned non-success response code: %v",
   268  			info.service.String(), writer.respCode)
   269  	case writer.respCode == http.StatusOK && (isV2GVK || isV2Beta1GVK):
   270  		parsed := &apidiscoveryv2.APIGroupDiscoveryList{}
   271  		if err := runtime.DecodeInto(dm.codecs.UniversalDecoder(), writer.data, parsed); err != nil {
   272  			return nil, err
   273  		}
   274  
   275  		klog.V(3).Infof("DiscoveryManager: Successfully downloaded discovery for %s", info.service.String())
   276  
   277  		// Convert discovery info into a map for convenient lookup later
   278  		discoMap := map[metav1.GroupVersion]apidiscoveryv2.APIVersionDiscovery{}
   279  		for _, g := range parsed.Items {
   280  			for _, v := range g.Versions {
   281  				discoMap[metav1.GroupVersion{Group: g.Name, Version: v.Version}] = v
   282  				for i := range v.Resources {
   283  					// avoid nil panics in v0.26.0-v0.26.3 client-go clients
   284  					// see https://github.com/kubernetes/kubernetes/issues/118361
   285  					if v.Resources[i].ResponseKind == nil {
   286  						v.Resources[i].ResponseKind = &metav1.GroupVersionKind{}
   287  					}
   288  					for j := range v.Resources[i].Subresources {
   289  						if v.Resources[i].Subresources[j].ResponseKind == nil {
   290  							v.Resources[i].Subresources[j].ResponseKind = &metav1.GroupVersionKind{}
   291  						}
   292  					}
   293  				}
   294  			}
   295  		}
   296  
   297  		// Save cached result
   298  		cached = cachedResult{
   299  			discovery:   discoMap,
   300  			etag:        writer.Header().Get("Etag"),
   301  			lastUpdated: now,
   302  		}
   303  		dm.setCacheEntryForService(info.service, cached)
   304  		return &cached, nil
   305  	default:
   306  		// Could not get acceptable response for Aggregated Discovery.
   307  		// Fall back to legacy discovery information
   308  		if len(gv.Version) == 0 {
   309  			return nil, errors.New("not found")
   310  		}
   311  
   312  		var path string
   313  		if len(gv.Group) == 0 {
   314  			path = "/api/" + gv.Version
   315  		} else {
   316  			path = "/apis/" + gv.Group + "/" + gv.Version
   317  		}
   318  
   319  		req, err := http.NewRequest("GET", path, nil)
   320  		if err != nil {
   321  			// NewRequest should not fail, but if it does for some reason,
   322  			// log it and continue
   323  			return nil, fmt.Errorf("failed to create http.Request: %v", err)
   324  		}
   325  
   326  		// Apply aggregator user to request
   327  		req = req.WithContext(
   328  			request.WithUser(
   329  				req.Context(), &user.DefaultInfo{Name: "system:kube-aggregator"}))
   330  
   331  		// req.Header.Add("Accept", runtime.ContentTypeProtobuf)
   332  		req.Header.Add("Accept", runtime.ContentTypeJSON)
   333  
   334  		if exists && len(cached.etag) > 0 {
   335  			req.Header.Add("If-None-Match", cached.etag)
   336  		}
   337  
   338  		writer := newInMemoryResponseWriter()
   339  		handler.ServeHTTP(writer, req)
   340  
   341  		if writer.respCode != http.StatusOK {
   342  			return nil, fmt.Errorf("failed to download legacy discovery for %s: %v", path, writer.String())
   343  		}
   344  
   345  		parsed := &metav1.APIResourceList{}
   346  		if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil {
   347  			return nil, err
   348  		}
   349  
   350  		// Create a discomap with single group-version
   351  		resources, err := endpoints.ConvertGroupVersionIntoToDiscovery(parsed.APIResources)
   352  		if err != nil {
   353  			return nil, err
   354  		}
   355  		klog.V(3).Infof("DiscoveryManager: Successfully downloaded legacy discovery for %s", info.service.String())
   356  
   357  		discoMap := map[metav1.GroupVersion]apidiscoveryv2.APIVersionDiscovery{
   358  			// Convert old-style APIGroupList to new information
   359  			gv: {
   360  				Version:   gv.Version,
   361  				Resources: resources,
   362  			},
   363  		}
   364  
   365  		cached = cachedResult{
   366  			discovery:   discoMap,
   367  			lastUpdated: now,
   368  		}
   369  
   370  		// Do not save the resolve as the legacy fallback only fetches
   371  		// one group version and an API Service may serve multiple
   372  		// group versions.
   373  		return &cached, nil
   374  	}
   375  }
   376  
   377  // Try to sync a single APIService.
   378  func (dm *discoveryManager) syncAPIService(apiServiceName string) error {
   379  	info, exists := dm.getInfoForAPIService(apiServiceName)
   380  
   381  	gv := helper.APIServiceNameToGroupVersion(apiServiceName)
   382  	mgv := metav1.GroupVersion{Group: gv.Group, Version: gv.Version}
   383  
   384  	if !exists {
   385  		// apiservice was removed. remove it from merged discovery
   386  		dm.mergedDiscoveryHandler.RemoveGroupVersion(mgv)
   387  		return nil
   388  	}
   389  
   390  	// Lookup last cached result for this apiservice's service.
   391  	cached, err := dm.fetchFreshDiscoveryForService(mgv, info)
   392  
   393  	var entry apidiscoveryv2.APIVersionDiscovery
   394  
   395  	// Extract the APIService's specific resource information from the
   396  	// groupversion
   397  	if cached == nil {
   398  		// There was an error fetching discovery for this APIService, and
   399  		// there is nothing in the cache for this GV.
   400  		//
   401  		// Just use empty GV to mark that GV exists, but no resources.
   402  		// Also mark that it is stale to indicate the fetch failed
   403  		// TODO: Maybe also stick in a status for the version the error?
   404  		entry = apidiscoveryv2.APIVersionDiscovery{
   405  			Version: gv.Version,
   406  		}
   407  	} else {
   408  		// Find our specific groupversion within the discovery document
   409  		entry, exists = cached.discovery[mgv]
   410  		if exists {
   411  			// The stale/fresh entry has our GV, so we can include it in the doc
   412  		} else {
   413  			// Successfully fetched discovery information from the server, but
   414  			// the server did not include this groupversion?
   415  			entry = apidiscoveryv2.APIVersionDiscovery{
   416  				Version: gv.Version,
   417  			}
   418  		}
   419  	}
   420  
   421  	// The entry's staleness depends upon if `fetchFreshDiscoveryForService`
   422  	// returned an error or not.
   423  	if err == nil {
   424  		entry.Freshness = apidiscoveryv2.DiscoveryFreshnessCurrent
   425  	} else {
   426  		entry.Freshness = apidiscoveryv2.DiscoveryFreshnessStale
   427  	}
   428  
   429  	dm.mergedDiscoveryHandler.AddGroupVersion(gv.Group, entry)
   430  	dm.mergedDiscoveryHandler.SetGroupVersionPriority(metav1.GroupVersion(gv), info.groupPriority, info.versionPriority)
   431  	return nil
   432  }
   433  
   434  func (dm *discoveryManager) getAPIServiceKeys() []string {
   435  	dm.servicesLock.RLock()
   436  	defer dm.servicesLock.RUnlock()
   437  	keys := []string{}
   438  	for key := range dm.apiServices {
   439  		keys = append(keys, key)
   440  	}
   441  	return keys
   442  }
   443  
   444  // Spawns a goroutine which waits for added/updated apiservices and updates
   445  // the discovery document accordingly
   446  func (dm *discoveryManager) Run(stopCh <-chan struct{}, discoverySyncedCh chan<- struct{}) {
   447  	klog.Info("Starting ResourceDiscoveryManager")
   448  
   449  	// Shutdown the queue since stopCh was signalled
   450  	defer dm.dirtyAPIServiceQueue.ShutDown()
   451  
   452  	// Ensure that apiregistration.k8s.io is the first group in the discovery group.
   453  	dm.mergedDiscoveryHandler.WithSource(discoveryendpoint.BuiltinSource).SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0)
   454  
   455  	// Ensure that all APIServices are present before readiness check succeeds
   456  	var wg sync.WaitGroup
   457  	// Iterate on a copy of the keys to be thread safe with syncAPIService
   458  	keys := dm.getAPIServiceKeys()
   459  
   460  	for _, key := range keys {
   461  		wg.Add(1)
   462  		go func(k string) {
   463  			defer wg.Done()
   464  			// If an error was returned, the APIService will still have been
   465  			// added but marked as stale. Ignore the return value here
   466  			_ = dm.syncAPIService(k)
   467  		}(key)
   468  	}
   469  	wg.Wait()
   470  
   471  	if discoverySyncedCh != nil {
   472  		close(discoverySyncedCh)
   473  	}
   474  
   475  	// Spawn workers
   476  	// These workers wait for APIServices to be marked dirty.
   477  	// Worker ensures the cached discovery document hosted by the ServiceReference of
   478  	// the APIService is at least as fresh as the APIService, then includes the
   479  	// APIService's groupversion into the merged document
   480  	for i := 0; i < 2; i++ {
   481  		go func() {
   482  			for {
   483  				next, shutdown := dm.dirtyAPIServiceQueue.Get()
   484  				if shutdown {
   485  					return
   486  				}
   487  
   488  				func() {
   489  					defer dm.dirtyAPIServiceQueue.Done(next)
   490  
   491  					if err := dm.syncAPIService(next.(string)); err != nil {
   492  						dm.dirtyAPIServiceQueue.AddRateLimited(next)
   493  					} else {
   494  						dm.dirtyAPIServiceQueue.Forget(next)
   495  					}
   496  				}()
   497  			}
   498  		}()
   499  	}
   500  
   501  	wait.PollUntil(1*time.Minute, func() (done bool, err error) {
   502  		dm.servicesLock.Lock()
   503  		defer dm.servicesLock.Unlock()
   504  
   505  		now := time.Now()
   506  
   507  		// Mark all non-local APIServices as dirty
   508  		for key, info := range dm.apiServices {
   509  			info.lastMarkedDirty = now
   510  			dm.apiServices[key] = info
   511  			dm.dirtyAPIServiceQueue.Add(key)
   512  		}
   513  		return false, nil
   514  	}, stopCh)
   515  }
   516  
   517  // Takes a snapshot of all currently used services by known APIServices and
   518  // purges the cache entries of those not present in the snapshot.
   519  func (dm *discoveryManager) removeUnusedServices() {
   520  	usedServiceKeys := sets.Set[serviceKey]{}
   521  
   522  	func() {
   523  		dm.servicesLock.Lock()
   524  		defer dm.servicesLock.Unlock()
   525  
   526  		// Mark all non-local APIServices as dirty
   527  		for _, info := range dm.apiServices {
   528  			usedServiceKeys.Insert(info.service)
   529  		}
   530  	}()
   531  
   532  	// Avoids double lock. It is okay if a service is added/removed between these
   533  	// functions. This is just a cache and that should be infrequent.
   534  
   535  	func() {
   536  		dm.resultsLock.Lock()
   537  		defer dm.resultsLock.Unlock()
   538  
   539  		for key := range dm.cachedResults {
   540  			if !usedServiceKeys.Has(key) {
   541  				delete(dm.cachedResults, key)
   542  			}
   543  		}
   544  	}()
   545  }
   546  
   547  // Adds an APIService to be tracked by the discovery manager. If the APIService
   548  // is already known
   549  func (dm *discoveryManager) AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler) {
   550  	// If service is nil then its information is contained by a local APIService
   551  	// which is has already been added to the manager.
   552  	if apiService.Spec.Service == nil {
   553  		return
   554  	}
   555  
   556  	// Add or update APIService record and mark it as dirty
   557  	dm.setInfoForAPIService(apiService.Name, &groupVersionInfo{
   558  		groupPriority:   int(apiService.Spec.GroupPriorityMinimum),
   559  		versionPriority: int(apiService.Spec.VersionPriority),
   560  		handler:         handler,
   561  		lastMarkedDirty: time.Now(),
   562  		service:         newServiceKey(*apiService.Spec.Service),
   563  	})
   564  	dm.removeUnusedServices()
   565  	dm.dirtyAPIServiceQueue.Add(apiService.Name)
   566  }
   567  
   568  func (dm *discoveryManager) RemoveAPIService(apiServiceName string) {
   569  	if dm.setInfoForAPIService(apiServiceName, nil) != nil {
   570  		// mark dirty if there was actually something deleted
   571  		dm.removeUnusedServices()
   572  		dm.dirtyAPIServiceQueue.Add(apiServiceName)
   573  	}
   574  }
   575  
   576  //
   577  // Lock-protected accessors
   578  //
   579  
   580  func (dm *discoveryManager) getCacheEntryForService(key serviceKey) (cachedResult, bool) {
   581  	dm.resultsLock.RLock()
   582  	defer dm.resultsLock.RUnlock()
   583  
   584  	result, ok := dm.cachedResults[key]
   585  	return result, ok
   586  }
   587  
   588  func (dm *discoveryManager) setCacheEntryForService(key serviceKey, result cachedResult) {
   589  	dm.resultsLock.Lock()
   590  	defer dm.resultsLock.Unlock()
   591  
   592  	dm.cachedResults[key] = result
   593  }
   594  
   595  func (dm *discoveryManager) getInfoForAPIService(name string) (groupVersionInfo, bool) {
   596  	dm.servicesLock.RLock()
   597  	defer dm.servicesLock.RUnlock()
   598  
   599  	result, ok := dm.apiServices[name]
   600  	return result, ok
   601  }
   602  
   603  func (dm *discoveryManager) setInfoForAPIService(name string, result *groupVersionInfo) (oldValueIfExisted *groupVersionInfo) {
   604  	dm.servicesLock.Lock()
   605  	defer dm.servicesLock.Unlock()
   606  
   607  	if oldValue, exists := dm.apiServices[name]; exists {
   608  		oldValueIfExisted = &oldValue
   609  	}
   610  
   611  	if result != nil {
   612  		dm.apiServices[name] = *result
   613  	} else {
   614  		delete(dm.apiServices, name)
   615  	}
   616  
   617  	return oldValueIfExisted
   618  }
   619  
   620  // !TODO: This was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go
   621  // which was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go
   622  // so we should find a home for this
   623  // inMemoryResponseWriter is a http.Writer that keep the response in memory.
   624  type inMemoryResponseWriter struct {
   625  	writeHeaderCalled bool
   626  	header            http.Header
   627  	respCode          int
   628  	data              []byte
   629  }
   630  
   631  func newInMemoryResponseWriter() *inMemoryResponseWriter {
   632  	return &inMemoryResponseWriter{header: http.Header{}}
   633  }
   634  
   635  func (r *inMemoryResponseWriter) Header() http.Header {
   636  	return r.header
   637  }
   638  
   639  func (r *inMemoryResponseWriter) WriteHeader(code int) {
   640  	r.writeHeaderCalled = true
   641  	r.respCode = code
   642  }
   643  
   644  func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
   645  	if !r.writeHeaderCalled {
   646  		r.WriteHeader(http.StatusOK)
   647  	}
   648  	r.data = append(r.data, in...)
   649  	return len(in), nil
   650  }
   651  
   652  func (r *inMemoryResponseWriter) String() string {
   653  	s := fmt.Sprintf("ResponseCode: %d", r.respCode)
   654  	if r.data != nil {
   655  		s += fmt.Sprintf(", Body: %s", string(r.data))
   656  	}
   657  	if r.header != nil {
   658  		s += fmt.Sprintf(", Header: %s", r.header)
   659  	}
   660  	return s
   661  }
   662  

View as plain text