    17  package apiserver
    19  import (
    20  	"errors"
    21  	"fmt"
    22  	"net/http"
    23  	"sync"
    24  	"time"
    26  	apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
    27  	apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
    28  	apidiscoveryv2conversion "k8s.io/apiserver/pkg/apis/apidiscovery/v2"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/runtime"
    32  	"k8s.io/apimachinery/pkg/runtime/schema"
    33  	"k8s.io/apimachinery/pkg/runtime/serializer"
    34  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    35  	"k8s.io/apimachinery/pkg/util/sets"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	"k8s.io/apiserver/pkg/authentication/user"
    38  	"k8s.io/apiserver/pkg/endpoints"
    39  	discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
    40  	"k8s.io/apiserver/pkg/endpoints/request"
    41  	"k8s.io/client-go/discovery"
    42  	"k8s.io/client-go/util/workqueue"
    43  	"k8s.io/klog/v2"
    44  	apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    45  	"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
    46  	"k8s.io/kube-aggregator/pkg/apiserver/scheme"
    47  )
    49  var APIRegistrationGroupVersion metav1.GroupVersion = metav1.GroupVersion{Group: "apiregistration.k8s.io", Version: "v1"}
    51  // Maximum is 20000. Set to higher than that so apiregistration always is listed
    52  // first (mirrors v1 discovery behavior)
    53  var APIRegistrationGroupPriority int = 20001
    55  // Aggregated discovery content-type GVK.
    56  var v2Beta1GVK = schema.GroupVersionKind{
    57  	Group:   "apidiscovery.k8s.io",
    58  	Version: "v2beta1",
    59  	Kind:    "APIGroupDiscoveryList",
    60  }
    62  var v2GVK = schema.GroupVersionKind{
    63  	Group:   "apidiscovery.k8s.io",
    64  	Version: "v2",
    65  	Kind:    "APIGroupDiscoveryList",
    66  }
    68  // Given a list of APIServices and proxyHandlers for contacting them,
    69  // DiscoveryManager caches a list of discovery documents for each server
    71  type DiscoveryAggregationController interface {
    72  	// Adds or Updates an APIService from the Aggregated Discovery Controller's
    73  	// knowledge base
    74  	// Thread-safe
    75  	AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler)
    77  	// Removes an APIService from the Aggregated Discovery Controller's Knowledge
    78  	// bank
    79  	// Thread-safe
    80  	RemoveAPIService(apiServiceName string)
    82  	// Spawns a worker which waits for added/updated apiservices and updates
    83  	// the unified discovery document by contacting the aggregated api services
    84  	Run(stopCh <-chan struct{}, discoverySyncedCh chan<- struct{})
    85  }
    87  type discoveryManager struct {
    88  	// Locks `apiServices`
    89  	servicesLock sync.RWMutex
    91  	// Map from APIService's name (or a unique string for local servers)
    92  	// to information about contacting that API Service
    93  	apiServices map[string]groupVersionInfo
    95  	// Locks cachedResults
    96  	resultsLock sync.RWMutex
    98  	// Map from APIService.Spec.Service to the previously fetched value
    99  	// (Note that many APIServices might use the same APIService.Spec.Service)
   100  	cachedResults map[serviceKey]cachedResult
   102  	// Queue of dirty apiServiceKey which need to be refreshed
   103  	// It is important that the reconciler for this queue does not excessively
   104  	// contact the apiserver if a key was enqueued before the server was last
   105  	// contacted.
   106  	dirtyAPIServiceQueue workqueue.RateLimitingInterface
   108  	// Merged handler which stores all known groupversions
   109  	mergedDiscoveryHandler discoveryendpoint.ResourceManager
   111  	// Codecs is the serializer used for decoding aggregated apiserver responses
   112  	codecs serializer.CodecFactory
   113  }
   115  // Version of Service/Spec with relevant fields for use as a cache key
   116  type serviceKey struct {
   117  	Namespace string
   118  	Name      string
   119  	Port      int32
   120  }
   122  // Human-readable String representation used for logs
   123  func (s serviceKey) String() string {
   124  	return fmt.Sprintf("%v/%v:%v", s.Namespace, s.Name, s.Port)
   125  }
   127  func newServiceKey(service apiregistrationv1.ServiceReference) serviceKey {
   128  	// Docs say. Defaults to 443 for compatibility reasons.
   129  	// BETA: Should this be a shared constant to avoid drifting with the
   130  	// implementation?
   131  	port := int32(443)
   132  	if service.Port != nil {
   133  		port = *service.Port
   134  	}
   136  	return serviceKey{
   137  		Name:      service.Name,
   138  		Namespace: service.Namespace,
   139  		Port:      port,
   140  	}
   141  }
   143  type cachedResult struct {
   144  	// Currently cached discovery document for this service
   145  	// Map from group name to version name to
   146  	discovery map[metav1.GroupVersion]apidiscoveryv2.APIVersionDiscovery
   148  	// ETag hash of the cached discoveryDocument
   149  	etag string
   151  	// Guaranteed to be a time less than the time the server responded with the
   152  	// discovery data.
   153  	lastUpdated time.Time
   154  }
   156  // Information about a specific APIService/GroupVersion
   157  type groupVersionInfo struct {
   158  	// Date this APIService was marked dirty.
   159  	// Guaranteed to be a time greater than the most recent time the APIService
   160  	// was known to be modified.
   161  	//
   162  	// Used for request deduplication to ensure the data used to reconcile each
   163  	// apiservice was retrieved after the time of the APIService change:
   164  	// real_apiservice_change_time < groupVersionInfo.lastMarkedDirty < cachedResult.lastUpdated < real_document_fresh_time
   165  	//
   166  	// This ensures that if the apiservice was changed after the last cached entry
   167  	// was stored, the discovery document will always be re-fetched.
   168  	lastMarkedDirty time.Time
   170  	// ServiceReference of this GroupVersion. This identifies the Service which
   171  	// describes how to contact the server responsible for this GroupVersion.
   172  	service serviceKey
   174  	// groupPriority describes the priority of the APIService's group for sorting
   175  	groupPriority int
   177  	// groupPriority describes the priority of the APIService version for sorting
   178  	versionPriority int
   180  	// Method for contacting the service
   181  	handler http.Handler
   182  }
   184  var _ DiscoveryAggregationController = &discoveryManager{}
   186  func NewDiscoveryManager(
   187  	target discoveryendpoint.ResourceManager,
   188  ) DiscoveryAggregationController {
   189  	discoveryScheme := runtime.NewScheme()
   190  	utilruntime.Must(apidiscoveryv2.AddToScheme(discoveryScheme))
   191  	utilruntime.Must(apidiscoveryv2beta1.AddToScheme(discoveryScheme))
   192  	// Register conversion for apidiscovery
   193  	utilruntime.Must(apidiscoveryv2conversion.RegisterConversions(discoveryScheme))
   194  	codecs := serializer.NewCodecFactory(discoveryScheme)
   196  	return &discoveryManager{
   197  		mergedDiscoveryHandler: target,
   198  		apiServices:            make(map[string]groupVersionInfo),
   199  		cachedResults:          make(map[serviceKey]cachedResult),
   200  		dirtyAPIServiceQueue:   workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "discovery-manager"),
   201  		codecs:                 codecs,
   202  	}
   203  }
   205  // Returns discovery data for the given apiservice.
   206  // Caches the result.
   207  // Returns the cached result if it is retrieved after the apiservice was last
   208  // marked dirty
   209  // If there was an error in fetching, returns the stale cached result if it exists,
   210  // and a non-nil error
   211  // If the result is current, returns nil error and non-nil result
   212  func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion, info groupVersionInfo) (*cachedResult, error) {
   213  	// Lookup last cached result for this apiservice's service.
   214  	cached, exists := dm.getCacheEntryForService(info.service)
   216  	// If entry exists and was updated after the given time, just stop now
   217  	if exists && cached.lastUpdated.After(info.lastMarkedDirty) {
   218  		return &cached, nil
   219  	}
   221  	// If we have a handler to contact the server for this APIService, and
   222  	// the cache entry is too old to use, refresh the cache entry now.
   223  	handler := http.TimeoutHandler(info.handler, 5*time.Second, "request timed out")
   224  	req, err := http.NewRequest("GET", "/apis", nil)
   225  	if err != nil {
   226  		// NewRequest should not fail, but if it does for some reason,
   227  		// log it and continue
   228  		return &cached, fmt.Errorf("failed to create http.Request: %v", err)
   229  	}
   231  	// Apply aggregator user to request
   232  	req = req.WithContext(
   233  		request.WithUser(
   234  			req.Context(), &user.DefaultInfo{Name: "system:kube-aggregator", Groups: []string{"system:masters"}}))
   235  	req = req.WithContext(request.WithRequestInfo(req.Context(), &request.RequestInfo{
   236  		Path:              req.URL.Path,
   237  		IsResourceRequest: false,
   238  	}))
   239  	req.Header.Add("Accept", discovery.AcceptV2+","+discovery.AcceptV2Beta1)
   241  	if exists && len(cached.etag) > 0 {
   242  		req.Header.Add("If-None-Match", cached.etag)
   243  	}
   245  	// Important that the time recorded in the data's "lastUpdated" is conservatively
   246  	// from BEFORE the request is dispatched so that lastUpdated can be used to
   247  	// de-duplicate requests.
   248  	now := time.Now()
   249  	writer := newInMemoryResponseWriter()
   250  	handler.ServeHTTP(writer, req)
   252  	isV2Beta1GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2Beta1GVK)
   253  	isV2GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2GVK)
   255  	switch {
   256  	case writer.respCode == http.StatusNotModified:
   257  		// Keep old entry, update timestamp
   258  		cached = cachedResult{
   259  			discovery:   cached.discovery,
   260  			etag:        cached.etag,
   261  			lastUpdated: now,
   262  		}
   264  		dm.setCacheEntryForService(info.service, cached)
   265  		return &cached, nil
   266  	case writer.respCode == http.StatusServiceUnavailable:
   267  		return nil, fmt.Errorf("service %s returned non-success response code: %v",
   268  			info.service.String(), writer.respCode)
   269  	case writer.respCode == http.StatusOK && (isV2GVK || isV2Beta1GVK):
   270  		parsed := &apidiscoveryv2.APIGroupDiscoveryList{}
   271  		if err := runtime.DecodeInto(dm.codecs.UniversalDecoder(), writer.data, parsed); err != nil {
   272  			return nil, err
   273  		}
   275  		klog.V(3).Infof("DiscoveryManager: Successfully downloaded discovery for %s", info.service.String())
   277  		// Convert discovery info into a map for convenient lookup later
   278  		discoMap := map[metav1.GroupVersion]apidiscoveryv2.APIVersionDiscovery{}
   279  		for _, g := range parsed.Items {
   280  			for _, v := range g.Versions {
   281  				discoMap[metav1.GroupVersion{Group: g.Name, Version: v.Version}] = v
   282  				for i := range v.Resources {
   283  					// avoid nil panics in v0.26.0-v0.26.3 client-go clients
   284  					// see https://github.com/kubernetes/kubernetes/issues/118361
   285  					if v.Resources[i].ResponseKind == nil {
   286  						v.Resources[i].ResponseKind = &metav1.GroupVersionKind{}
   287  					}
   288  					for j := range v.Resources[i].Subresources {
   289  						if v.Resources[i].Subresources[j].ResponseKind == nil {
   290  							v.Resources[i].Subresources[j].ResponseKind = &metav1.GroupVersionKind{}
   291  						}
   292  					}
   293  				}
   294  			}
   295  		}
   297  		// Save cached result
   298  		cached = cachedResult{
   299  			discovery:   discoMap,
   300  			etag:        writer.Header().Get("Etag"),
   301  			lastUpdated: now,
   302  		}
   303  		dm.setCacheEntryForService(info.service, cached)
   304  		return &cached, nil
   305  	default:
   306  		// Could not get acceptable response for Aggregated Discovery.
   307  		// Fall back to legacy discovery information
   308  		if len(gv.Version) == 0 {
   309  			return nil, errors.New("not found")
   310  		}
   312  		var path string
   313  		if len(gv.Group) == 0 {
   314  			path = "/api/" + gv.Version
   315  		} else {
   316  			path = "/apis/" + gv.Group + "/" + gv.Version
   317  		}
   319  		req, err := http.NewRequest("GET", path, nil)
   320  		if err != nil {
   321  			// NewRequest should not fail, but if it does for some reason,
   322  			// log it and continue
   323  			return nil, fmt.Errorf("failed to create http.Request: %v", err)
   324  		}
   326  		// Apply aggregator user to request
   327  		req = req.WithContext(
   328  			request.WithUser(
   329  				req.Context(), &user.DefaultInfo{Name: "system:kube-aggregator"}))
   331  		// req.Header.Add("Accept", runtime.ContentTypeProtobuf)
   332  		req.Header.Add("Accept", runtime.ContentTypeJSON)
   334  		if exists && len(cached.etag) > 0 {
   335  			req.Header.Add("If-None-Match", cached.etag)
   336  		}
   338  		writer := newInMemoryResponseWriter()
   339  		handler.ServeHTTP(writer, req)
   341  		if writer.respCode != http.StatusOK {
   342  			return nil, fmt.Errorf("failed to download legacy discovery for %s: %v", path, writer.String())
   343  		}
   345  		parsed := &metav1.APIResourceList{}
   346  		if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil {
   347  			return nil, err
   348  		}
   350  		// Create a discomap with single group-version
   351  		resources, err := endpoints.ConvertGroupVersionIntoToDiscovery(parsed.APIResources)
   352  		if err != nil {
   353  			return nil, err
   354  		}
   355  		klog.V(3).Infof("DiscoveryManager: Successfully downloaded legacy discovery for %s", info.service.String())
   357  		discoMap := map[metav1.GroupVersion]apidiscoveryv2.APIVersionDiscovery{
   358  			// Convert old-style APIGroupList to new information
   359  			gv: {
   360  				Version:   gv.Version,
   361  				Resources: resources,
   362  			},
   363  		}
   365  		cached = cachedResult{
   366  			discovery:   discoMap,
   367  			lastUpdated: now,
   368  		}
   370  		// Do not save the resolve as the legacy fallback only fetches
   371  		// one group version and an API Service may serve multiple
   372  		// group versions.
   373  		return &cached, nil
   374  	}
   375  }
   377  // Try to sync a single APIService.
   378  func (dm *discoveryManager) syncAPIService(apiServiceName string) error {
   379  	info, exists := dm.getInfoForAPIService(apiServiceName)
   381  	gv := helper.APIServiceNameToGroupVersion(apiServiceName)
   382  	mgv := metav1.GroupVersion{Group: gv.Group, Version: gv.Version}
   384  	if !exists {
   385  		// apiservice was removed. remove it from merged discovery
   386  		dm.mergedDiscoveryHandler.RemoveGroupVersion(mgv)
   387  		return nil
   388  	}
   390  	// Lookup last cached result for this apiservice's service.
   391  	cached, err := dm.fetchFreshDiscoveryForService(mgv, info)
   393  	var entry apidiscoveryv2.APIVersionDiscovery
   395  	// Extract the APIService's specific resource information from the
   396  	// groupversion
   397  	if cached == nil {
   398  		// There was an error fetching discovery for this APIService, and
   399  		// there is nothing in the cache for this GV.
   400  		//
   401  		// Just use empty GV to mark that GV exists, but no resources.
   402  		// Also mark that it is stale to indicate the fetch failed
   403  		// TODO: Maybe also stick in a status for the version the error?
   404  		entry = apidiscoveryv2.APIVersionDiscovery{
   405  			Version: gv.Version,
   406  		}
   407  	} else {
   408  		// Find our specific groupversion within the discovery document
   409  		entry, exists = cached.discovery[mgv]
   410  		if exists {
   411  			// The stale/fresh entry has our GV, so we can include it in the doc
   412  		} else {
   413  			// Successfully fetched discovery information from the server, but
   414  			// the server did not include this groupversion?
   415  			entry = apidiscoveryv2.APIVersionDiscovery{
   416  				Version: gv.Version,
   417  			}
   418  		}
   419  	}
   421  	// The entry's staleness depends upon if `fetchFreshDiscoveryForService`
   422  	// returned an error or not.
   423  	if err == nil {
   424  		entry.Freshness = apidiscoveryv2.DiscoveryFreshnessCurrent
   425  	} else {
   426  		entry.Freshness = apidiscoveryv2.DiscoveryFreshnessStale
   427  	}
   429  	dm.mergedDiscoveryHandler.AddGroupVersion(gv.Group, entry)
   430  	dm.mergedDiscoveryHandler.SetGroupVersionPriority(metav1.GroupVersion(gv), info.groupPriority, info.versionPriority)
   431  	return nil
   432  }
   434  func (dm *discoveryManager) getAPIServiceKeys() []string {
   435  	dm.servicesLock.RLock()
   436  	defer dm.servicesLock.RUnlock()
   437  	keys := []string{}
   438  	for key := range dm.apiServices {
   439  		keys = append(keys, key)
   440  	}
   441  	return keys
   442  }
   444  // Spawns a goroutine which waits for added/updated apiservices and updates
   445  // the discovery document accordingly
   446  func (dm *discoveryManager) Run(stopCh <-chan struct{}, discoverySyncedCh chan<- struct{}) {
   447  	klog.Info("Starting ResourceDiscoveryManager")
   449  	// Shutdown the queue since stopCh was signalled
   450  	defer dm.dirtyAPIServiceQueue.ShutDown()
   452  	// Ensure that apiregistration.k8s.io is the first group in the discovery group.
   453  	dm.mergedDiscoveryHandler.WithSource(discoveryendpoint.BuiltinSource).SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0)
   455  	// Ensure that all APIServices are present before readiness check succeeds
   456  	var wg sync.WaitGroup
   457  	// Iterate on a copy of the keys to be thread safe with syncAPIService
   458  	keys := dm.getAPIServiceKeys()
   460  	for _, key := range keys {
   461  		wg.Add(1)
   462  		go func(k string) {
   463  			defer wg.Done()
   464  			// If an error was returned, the APIService will still have been
   465  			// added but marked as stale. Ignore the return value here
   466  			_ = dm.syncAPIService(k)
   467  		}(key)
   468  	}
   469  	wg.Wait()
   471  	if discoverySyncedCh != nil {
   472  		close(discoverySyncedCh)
   473  	}
   475  	// Spawn workers
   476  	// These workers wait for APIServices to be marked dirty.
   477  	// Worker ensures the cached discovery document hosted by the ServiceReference of
   478  	// the APIService is at least as fresh as the APIService, then includes the
   479  	// APIService's groupversion into the merged document
   480  	for i := 0; i < 2; i++ {
   481  		go func() {
   482  			for {
   483  				next, shutdown := dm.dirtyAPIServiceQueue.Get()
   484  				if shutdown {
   485  					return
   486  				}
   488  				func() {
   489  					defer dm.dirtyAPIServiceQueue.Done(next)
   491  					if err := dm.syncAPIService(next.(string)); err != nil {
   492  						dm.dirtyAPIServiceQueue.AddRateLimited(next)
   493  					} else {
   494  						dm.dirtyAPIServiceQueue.Forget(next)
   495  					}
   496  				}()
   497  			}
   498  		}()
   499  	}
   501  	wait.PollUntil(1*time.Minute, func() (done bool, err error) {
   502  		dm.servicesLock.Lock()
   503  		defer dm.servicesLock.Unlock()
   505  		now := time.Now()
   507  		// Mark all non-local APIServices as dirty
   508  		for key, info := range dm.apiServices {
   509  			info.lastMarkedDirty = now
   510  			dm.apiServices[key] = info
   511  			dm.dirtyAPIServiceQueue.Add(key)
   512  		}
   513  		return false, nil
   514  	}, stopCh)
   515  }
   517  // Takes a snapshot of all currently used services by known APIServices and
   518  // purges the cache entries of those not present in the snapshot.
   519  func (dm *discoveryManager) removeUnusedServices() {
   520  	usedServiceKeys := sets.Set[serviceKey]{}
   522  	func() {
   523  		dm.servicesLock.Lock()
   524  		defer dm.servicesLock.Unlock()
   526  		// Mark all non-local APIServices as dirty
   527  		for _, info := range dm.apiServices {
   528  			usedServiceKeys.Insert(info.service)
   529  		}
   530  	}()
   532  	// Avoids double lock. It is okay if a service is added/removed between these
   533  	// functions. This is just a cache and that should be infrequent.
   535  	func() {
   536  		dm.resultsLock.Lock()
   537  		defer dm.resultsLock.Unlock()
   539  		for key := range dm.cachedResults {
   540  			if !usedServiceKeys.Has(key) {
   541  				delete(dm.cachedResults, key)
   542  			}
   543  		}
   544  	}()
   545  }
   547  // Adds an APIService to be tracked by the discovery manager. If the APIService
   548  // is already known
   549  func (dm *discoveryManager) AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler) {
   550  	// If service is nil then its information is contained by a local APIService
   551  	// which is has already been added to the manager.
   552  	if apiService.Spec.Service == nil {
   553  		return
   554  	}
   556  	// Add or update APIService record and mark it as dirty
   557  	dm.setInfoForAPIService(apiService.Name, &groupVersionInfo{
   558  		groupPriority:   int(apiService.Spec.GroupPriorityMinimum),
   559  		versionPriority: int(apiService.Spec.VersionPriority),
   560  		handler:         handler,
   561  		lastMarkedDirty: time.Now(),
   562  		service:         newServiceKey(*apiService.Spec.Service),
   563  	})
   564  	dm.removeUnusedServices()
   565  	dm.dirtyAPIServiceQueue.Add(apiService.Name)
   566  }
   568  func (dm *discoveryManager) RemoveAPIService(apiServiceName string) {
   569  	if dm.setInfoForAPIService(apiServiceName, nil) != nil {
   570  		// mark dirty if there was actually something deleted
   571  		dm.removeUnusedServices()
   572  		dm.dirtyAPIServiceQueue.Add(apiServiceName)
   573  	}
   574  }
   576  //
   577  // Lock-protected accessors
   578  //
   580  func (dm *discoveryManager) getCacheEntryForService(key serviceKey) (cachedResult, bool) {
   581  	dm.resultsLock.RLock()
   582  	defer dm.resultsLock.RUnlock()
   584  	result, ok := dm.cachedResults[key]
   585  	return result, ok
   586  }
   588  func (dm *discoveryManager) setCacheEntryForService(key serviceKey, result cachedResult) {
   589  	dm.resultsLock.Lock()
   590  	defer dm.resultsLock.Unlock()
   592  	dm.cachedResults[key] = result
   593  }
   595  func (dm *discoveryManager) getInfoForAPIService(name string) (groupVersionInfo, bool) {
   596  	dm.servicesLock.RLock()
   597  	defer dm.servicesLock.RUnlock()
   599  	result, ok := dm.apiServices[name]
   600  	return result, ok
   601  }
   603  func (dm *discoveryManager) setInfoForAPIService(name string, result *groupVersionInfo) (oldValueIfExisted *groupVersionInfo) {
   604  	dm.servicesLock.Lock()
   605  	defer dm.servicesLock.Unlock()
   607  	if oldValue, exists := dm.apiServices[name]; exists {
   608  		oldValueIfExisted = &oldValue
   609  	}
   611  	if result != nil {
   612  		dm.apiServices[name] = *result
   613  	} else {
   614  		delete(dm.apiServices, name)
   615  	}
   617  	return oldValueIfExisted
   618  }
   620  // !TODO: This was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go
   621  // which was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go
   622  // so we should find a home for this
   623  // inMemoryResponseWriter is a http.Writer that keep the response in memory.
   624  type inMemoryResponseWriter struct {
   625  	writeHeaderCalled bool
   626  	header            http.Header
   627  	respCode          int
   628  	data              []byte
   629  }
   631  func newInMemoryResponseWriter() *inMemoryResponseWriter {
   632  	return &inMemoryResponseWriter{header: http.Header{}}
   633  }
   635  func (r *inMemoryResponseWriter) Header() http.Header {
   636  	return r.header
   637  }
   639  func (r *inMemoryResponseWriter) WriteHeader(code int) {
   640  	r.writeHeaderCalled = true
   641  	r.respCode = code
   642  }
   644  func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
   645  	if !r.writeHeaderCalled {
   646  		r.WriteHeader(http.StatusOK)
   647  	}
   648  	r.data = append(r.data, in...)
   649  	return len(in), nil
   650  }
   652  func (r *inMemoryResponseWriter) String() string {
   653  	s := fmt.Sprintf("ResponseCode: %d", r.respCode)
   654  	if r.data != nil {
   655  		s += fmt.Sprintf(", Body: %s", string(r.data))
   656  	}
   657  	if r.header != nil {
   658  		s += fmt.Sprintf(", Header: %s", r.header)
   659  	}
   660  	return s
   661  }

