...

Source file src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/aggregator.go

Documentation: k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package aggregator
    18  
    19  import (
    20  	"crypto/sha256"
    21  	"errors"
    22  	"fmt"
    23  	"net/http"
    24  	"sync"
    25  	"time"
    26  
    27  	restful "github.com/emicklei/go-restful/v3"
    28  
    29  	"k8s.io/apiserver/pkg/server"
    30  	"k8s.io/klog/v2"
    31  	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    32  	"k8s.io/kube-openapi/pkg/aggregator"
    33  	"k8s.io/kube-openapi/pkg/builder"
    34  	"k8s.io/kube-openapi/pkg/cached"
    35  	"k8s.io/kube-openapi/pkg/common"
    36  	"k8s.io/kube-openapi/pkg/common/restfuladapter"
    37  	"k8s.io/kube-openapi/pkg/handler"
    38  	"k8s.io/kube-openapi/pkg/validation/spec"
    39  )
    40  
    41  var ErrAPIServiceNotFound = errors.New("resource not found")
    42  
    43  // SpecAggregator calls out to http handlers of APIServices and merges specs. It keeps state of the last
    44  // known specs including the http etag.
    45  type SpecAggregator interface {
    46  	AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error
    47  	// UpdateAPIServiceSpec updates the APIService. It returns ErrAPIServiceNotFound if the APIService doesn't exist.
    48  	UpdateAPIServiceSpec(apiServiceName string) error
    49  	RemoveAPIService(apiServiceName string)
    50  }
    51  
    52  const (
    53  	aggregatorUser                = "system:aggregator"
    54  	specDownloadTimeout           = time.Minute
    55  	localDelegateChainNamePattern = "k8s_internal_local_delegation_chain_%010d"
    56  
    57  	// A randomly generated UUID to differentiate local and remote eTags.
    58  	locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-"
    59  )
    60  
    61  // openAPISpecInfo is used to store OpenAPI specs.
    62  // The apiService object is used to sort specs with their priorities.
    63  type openAPISpecInfo struct {
    64  	apiService v1.APIService
    65  	// spec is the cached OpenAPI spec
    66  	spec cached.LastSuccess[*spec.Swagger]
    67  
    68  	// The downloader is used only for non-local apiservices to
    69  	// re-update the spec every so often.
    70  	// Calling Get() is not thread safe and should only be called by a single
    71  	// thread via the openapi controller.
    72  	downloader CacheableDownloader
    73  }
    74  
    75  type specAggregator struct {
    76  	// mutex protects the specsByAPIServiceName map and its contents.
    77  	mutex sync.Mutex
    78  
    79  	// Map of API Services' OpenAPI specs by their name
    80  	specsByAPIServiceName map[string]*openAPISpecInfo
    81  
    82  	// provided for dynamic OpenAPI spec
    83  	openAPIVersionedService *handler.OpenAPIService
    84  
    85  	downloader *Downloader
    86  }
    87  
    88  func buildAndRegisterSpecAggregatorForLocalServices(downloader *Downloader, aggregatorSpec *spec.Swagger, delegationHandlers []http.Handler, pathHandler common.PathHandler) *specAggregator {
    89  	s := &specAggregator{
    90  		downloader:            downloader,
    91  		specsByAPIServiceName: map[string]*openAPISpecInfo{},
    92  	}
    93  	cachedAggregatorSpec := cached.Static(aggregatorSpec, "never-changes")
    94  	s.addLocalSpec(fmt.Sprintf(localDelegateChainNamePattern, 0), cachedAggregatorSpec)
    95  	for i, handler := range delegationHandlers {
    96  		name := fmt.Sprintf(localDelegateChainNamePattern, i+1)
    97  
    98  		spec := NewCacheableDownloader(name, downloader, handler)
    99  		s.addLocalSpec(name, spec)
   100  	}
   101  
   102  	s.openAPIVersionedService = handler.NewOpenAPIServiceLazy(s.buildMergeSpecLocked())
   103  	s.openAPIVersionedService.RegisterOpenAPIVersionedService("/openapi/v2", pathHandler)
   104  	return s
   105  }
   106  
   107  // BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup.
   108  func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService,
   109  	config *common.Config, pathHandler common.PathHandler) (SpecAggregator, error) {
   110  
   111  	aggregatorOpenAPISpec, err := builder.BuildOpenAPISpecFromRoutes(restfuladapter.AdaptWebServices(webServices), config)
   112  	if err != nil {
   113  		return nil, err
   114  	}
   115  	aggregatorOpenAPISpec.Definitions = handler.PruneDefaults(aggregatorOpenAPISpec.Definitions)
   116  
   117  	var delegationHandlers []http.Handler
   118  
   119  	for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() {
   120  		handler := delegate.UnprotectedHandler()
   121  		if handler == nil {
   122  			continue
   123  		}
   124  		// ignore errors for the empty delegate we attach at the end the chain
   125  		// atm the empty delegate returns 503 when the server hasn't been fully initialized
   126  		// and the spec downloader only silences 404s
   127  		if len(delegate.ListedPaths()) == 0 && delegate.NextDelegate() == nil {
   128  			continue
   129  		}
   130  		delegationHandlers = append(delegationHandlers, handler)
   131  	}
   132  	s := buildAndRegisterSpecAggregatorForLocalServices(downloader, aggregatorOpenAPISpec, delegationHandlers, pathHandler)
   133  	return s, nil
   134  }
   135  
   136  func (s *specAggregator) addLocalSpec(name string, cachedSpec cached.Value[*spec.Swagger]) {
   137  	service := v1.APIService{}
   138  	service.Name = name
   139  	info := &openAPISpecInfo{
   140  		apiService: service,
   141  	}
   142  	info.spec.Store(cachedSpec)
   143  	s.specsByAPIServiceName[name] = info
   144  }
   145  
   146  // buildMergeSpecLocked creates a new cached mergeSpec from the list of cached specs.
   147  func (s *specAggregator) buildMergeSpecLocked() cached.Value[*spec.Swagger] {
   148  	apiServices := make([]*v1.APIService, 0, len(s.specsByAPIServiceName))
   149  	for k := range s.specsByAPIServiceName {
   150  		apiServices = append(apiServices, &s.specsByAPIServiceName[k].apiService)
   151  	}
   152  	sortByPriority(apiServices)
   153  	caches := make([]cached.Value[*spec.Swagger], len(apiServices))
   154  	for i, apiService := range apiServices {
   155  		caches[i] = &(s.specsByAPIServiceName[apiService.Name].spec)
   156  	}
   157  
   158  	return cached.MergeList(func(results []cached.Result[*spec.Swagger]) (*spec.Swagger, string, error) {
   159  		var merged *spec.Swagger
   160  		etags := make([]string, 0, len(results))
   161  		for _, specInfo := range results {
   162  			result, etag, err := specInfo.Get()
   163  			if err != nil {
   164  				// APIService name and err message will be included in
   165  				// the error message as part of decorateError
   166  				klog.Warning(err)
   167  				continue
   168  			}
   169  			if merged == nil {
   170  				merged = &spec.Swagger{}
   171  				*merged = *result
   172  				// Paths, Definitions and parameters are set by
   173  				// MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters
   174  				merged.Paths = nil
   175  				merged.Definitions = nil
   176  				merged.Parameters = nil
   177  			}
   178  			etags = append(etags, etag)
   179  			if err := aggregator.MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters(merged, result); err != nil {
   180  				return nil, "", fmt.Errorf("failed to build merge specs: %v", err)
   181  			}
   182  		}
   183  		// Printing the etags list is stable because it is sorted.
   184  		return merged, fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%#v", etags)))), nil
   185  	}, caches)
   186  }
   187  
   188  // updateServiceLocked updates the spec cache by downloading the latest
   189  // version of the spec.
   190  func (s *specAggregator) updateServiceLocked(name string) error {
   191  	specInfo, exists := s.specsByAPIServiceName[name]
   192  	if !exists {
   193  		return ErrAPIServiceNotFound
   194  	}
   195  	result, etag, err := specInfo.downloader.Get()
   196  	filteredResult := cached.Transform[*spec.Swagger](func(result *spec.Swagger, etag string, err error) (*spec.Swagger, string, error) {
   197  		if err != nil {
   198  			return nil, "", err
   199  		}
   200  		group := specInfo.apiService.Spec.Group
   201  		version := specInfo.apiService.Spec.Version
   202  		return aggregator.FilterSpecByPathsWithoutSideEffects(result, []string{"/apis/" + group + "/" + version + "/"}), etag, nil
   203  	}, cached.Result[*spec.Swagger]{Value: result, Etag: etag, Err: err})
   204  	specInfo.spec.Store(filteredResult)
   205  	return err
   206  }
   207  
   208  // UpdateAPIServiceSpec updates the api service. It is thread safe.
   209  func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string) error {
   210  	s.mutex.Lock()
   211  	defer s.mutex.Unlock()
   212  	return s.updateServiceLocked(apiServiceName)
   213  }
   214  
   215  // AddUpdateAPIService adds the api service. It is thread safe. If the
   216  // apiservice already exists, it will be updated.
   217  func (s *specAggregator) AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error {
   218  	if apiService.Spec.Service == nil {
   219  		return nil
   220  	}
   221  	s.mutex.Lock()
   222  	defer s.mutex.Unlock()
   223  
   224  	existingSpec, exists := s.specsByAPIServiceName[apiService.Name]
   225  	if !exists {
   226  		specInfo := &openAPISpecInfo{
   227  			apiService: *apiService,
   228  			downloader: NewCacheableDownloader(apiService.Name, s.downloader, handler),
   229  		}
   230  		specInfo.spec.Store(cached.Result[*spec.Swagger]{Err: fmt.Errorf("spec for apiservice %s is not yet available", apiService.Name)})
   231  		s.specsByAPIServiceName[apiService.Name] = specInfo
   232  		s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked())
   233  	} else {
   234  		existingSpec.apiService = *apiService
   235  		existingSpec.downloader.UpdateHandler(handler)
   236  	}
   237  
   238  	return nil
   239  }
   240  
   241  // RemoveAPIService removes an api service from OpenAPI aggregation. If it does not exist, no error is returned.
   242  // It is thread safe.
   243  func (s *specAggregator) RemoveAPIService(apiServiceName string) {
   244  	s.mutex.Lock()
   245  	defer s.mutex.Unlock()
   246  
   247  	if _, exists := s.specsByAPIServiceName[apiServiceName]; !exists {
   248  		return
   249  	}
   250  	delete(s.specsByAPIServiceName, apiServiceName)
   251  	// Re-create the mergeSpec for the new list of apiservices
   252  	s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked())
   253  }
   254  

View as plain text