...

Source file src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/aggregator.go

Documentation: k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package aggregator
    18  
    19  import (
    20  	"bytes"
    21  	"encoding/json"
    22  	"errors"
    23  	"fmt"
    24  	"net/http"
    25  	"strings"
    26  	"sync"
    27  	"time"
    28  
    29  	"github.com/emicklei/go-restful/v3"
    30  
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apiserver/pkg/endpoints/metrics"
    33  	"k8s.io/apiserver/pkg/server"
    34  	"k8s.io/apiserver/pkg/server/mux"
    35  	"k8s.io/apiserver/pkg/server/routes"
    36  	"k8s.io/klog/v2"
    37  	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    38  	"k8s.io/kube-openapi/pkg/common"
    39  	"k8s.io/kube-openapi/pkg/handler3"
    40  	"k8s.io/kube-openapi/pkg/openapiconv"
    41  
    42  	v2aggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
    43  )
    44  
    45  var ErrAPIServiceNotFound = errors.New("resource not found")
    46  
    47  // SpecProxier proxies OpenAPI V3 requests to their respective APIService
    48  type SpecProxier interface {
    49  	AddUpdateAPIService(handler http.Handler, apiService *v1.APIService)
    50  	// UpdateAPIServiceSpec updates the APIService. It returns ErrAPIServiceNotFound if the APIService doesn't exist.
    51  	UpdateAPIServiceSpec(apiServiceName string) error
    52  	RemoveAPIServiceSpec(apiServiceName string)
    53  	GetAPIServiceNames() []string
    54  }
    55  
    56  const (
    57  	aggregatorUser                = "system:aggregator"
    58  	specDownloadTimeout           = 60 * time.Second
    59  	localDelegateChainNamePrefix  = "k8s_internal_local_delegation_chain_"
    60  	localDelegateChainNamePattern = localDelegateChainNamePrefix + "%010d"
    61  	openAPIV2Converter            = "openapiv2converter"
    62  )
    63  
    64  // IsLocalAPIService returns true for local specs from delegates.
    65  func IsLocalAPIService(apiServiceName string) bool {
    66  	return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix)
    67  }
    68  
    69  // GetAPIServiceNames returns the names of APIServices recorded in apiServiceInfo.
    70  // We use this function to pass the names of local APIServices to the controller in this package,
    71  // so that the controller can periodically sync the OpenAPI spec from delegation API servers.
    72  func (s *specProxier) GetAPIServiceNames() []string {
    73  	s.rwMutex.RLock()
    74  	defer s.rwMutex.RUnlock()
    75  
    76  	names := make([]string, 0, len(s.apiServiceInfo))
    77  	for key := range s.apiServiceInfo {
    78  		names = append(names, key)
    79  	}
    80  	return names
    81  }
    82  
    83  // BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup.
    84  func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.DelegationTarget, aggregatorService *restful.Container, openAPIConfig *common.OpenAPIV3Config, pathHandler common.PathHandlerByGroupVersion) (SpecProxier, error) {
    85  	s := &specProxier{
    86  		apiServiceInfo: map[string]*openAPIV3APIServiceInfo{},
    87  		downloader:     downloader,
    88  	}
    89  
    90  	if aggregatorService != nil && openAPIConfig != nil {
    91  		// Make native types exposed by aggregator available to the aggregated
    92  		// OpenAPI (normal handle is disabled by skipOpenAPIInstallation option)
    93  		aggregatorLocalServiceName := "k8s_internal_local_kube_aggregator_types"
    94  		v3Mux := mux.NewPathRecorderMux(aggregatorLocalServiceName)
    95  		_ = routes.OpenAPI{
    96  			V3Config: openAPIConfig,
    97  		}.InstallV3(aggregatorService, v3Mux)
    98  
    99  		s.AddUpdateAPIService(v3Mux, &v1.APIService{
   100  			ObjectMeta: metav1.ObjectMeta{
   101  				Name: aggregatorLocalServiceName,
   102  			},
   103  		})
   104  		s.UpdateAPIServiceSpec(aggregatorLocalServiceName)
   105  	}
   106  
   107  	i := 1
   108  	for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() {
   109  		handler := delegate.UnprotectedHandler()
   110  		if handler == nil {
   111  			continue
   112  		}
   113  
   114  		apiServiceName := fmt.Sprintf(localDelegateChainNamePattern, i)
   115  		localAPIService := v1.APIService{}
   116  		localAPIService.Name = apiServiceName
   117  		s.AddUpdateAPIService(handler, &localAPIService)
   118  		s.UpdateAPIServiceSpec(apiServiceName)
   119  		i++
   120  	}
   121  
   122  	handler := handler3.NewOpenAPIService()
   123  	s.openAPIV2ConverterHandler = handler
   124  	openAPIV2ConverterMux := mux.NewPathRecorderMux(openAPIV2Converter)
   125  	s.openAPIV2ConverterHandler.RegisterOpenAPIV3VersionedService("/openapi/v3", openAPIV2ConverterMux)
   126  	openAPIV2ConverterAPIService := v1.APIService{}
   127  	openAPIV2ConverterAPIService.Name = openAPIV2Converter
   128  	s.AddUpdateAPIService(openAPIV2ConverterMux, &openAPIV2ConverterAPIService)
   129  	s.register(pathHandler)
   130  
   131  	return s, nil
   132  }
   133  
   134  // AddUpdateAPIService adds or updates the api service. It is thread safe.
   135  func (s *specProxier) AddUpdateAPIService(handler http.Handler, apiservice *v1.APIService) {
   136  	s.rwMutex.Lock()
   137  	defer s.rwMutex.Unlock()
   138  	// If the APIService is being updated, use the existing struct.
   139  	if apiServiceInfo, ok := s.apiServiceInfo[apiservice.Name]; ok {
   140  		apiServiceInfo.apiService = *apiservice
   141  		apiServiceInfo.handler = handler
   142  		return
   143  	}
   144  	s.apiServiceInfo[apiservice.Name] = &openAPIV3APIServiceInfo{
   145  		apiService: *apiservice,
   146  		handler:    handler,
   147  	}
   148  }
   149  
   150  func getGroupVersionStringFromAPIService(apiService v1.APIService) string {
   151  	if apiService.Spec.Group == "" && apiService.Spec.Version == "" {
   152  		return ""
   153  	}
   154  	return "apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
   155  }
   156  
   157  // UpdateAPIServiceSpec updates all the OpenAPI v3 specs that the APIService serves.
   158  // It is thread safe.
   159  func (s *specProxier) UpdateAPIServiceSpec(apiServiceName string) error {
   160  	s.rwMutex.Lock()
   161  	defer s.rwMutex.Unlock()
   162  	return s.updateAPIServiceSpecLocked(apiServiceName)
   163  }
   164  
   165  func (s *specProxier) updateAPIServiceSpecLocked(apiServiceName string) error {
   166  	apiService, exists := s.apiServiceInfo[apiServiceName]
   167  	if !exists {
   168  		return ErrAPIServiceNotFound
   169  	}
   170  
   171  	if !apiService.isLegacyAPIService {
   172  		gv, httpStatus, err := s.downloader.OpenAPIV3Root(apiService.handler)
   173  		if err != nil {
   174  			return err
   175  		}
   176  		if httpStatus == http.StatusNotFound {
   177  			apiService.isLegacyAPIService = true
   178  		} else {
   179  			s.apiServiceInfo[apiServiceName].discovery = gv
   180  			return nil
   181  		}
   182  	}
   183  
   184  	newDownloader := v2aggregator.Downloader{}
   185  	v2Spec, etag, httpStatus, err := newDownloader.Download(apiService.handler, apiService.etag)
   186  	if err != nil {
   187  		return err
   188  	}
   189  	apiService.etag = etag
   190  	if httpStatus == http.StatusOK {
   191  		v3Spec := openapiconv.ConvertV2ToV3(v2Spec)
   192  		s.openAPIV2ConverterHandler.UpdateGroupVersion(getGroupVersionStringFromAPIService(apiService.apiService), v3Spec)
   193  		s.updateAPIServiceSpecLocked(openAPIV2Converter)
   194  	}
   195  	return nil
   196  }
   197  
   198  type specProxier struct {
   199  	// mutex protects all members of this struct.
   200  	rwMutex sync.RWMutex
   201  
   202  	// OpenAPI V3 specs by APIService name
   203  	apiServiceInfo map[string]*openAPIV3APIServiceInfo
   204  
   205  	// For downloading the OpenAPI v3 specs from apiservices
   206  	downloader Downloader
   207  
   208  	openAPIV2ConverterHandler *handler3.OpenAPIService
   209  }
   210  
   211  var _ SpecProxier = &specProxier{}
   212  
   213  type openAPIV3APIServiceInfo struct {
   214  	apiService v1.APIService
   215  	handler    http.Handler
   216  	discovery  *handler3.OpenAPIV3Discovery
   217  
   218  	// These fields are only used if the /openapi/v3 endpoint is not served by an APIService
   219  	// Legacy APIService indicates that an APIService does not support OpenAPI V3, and the OpenAPI V2
   220  	// will be downloaded, converted to V3 (lossy), and served by the aggregator
   221  	etag               string
   222  	isLegacyAPIService bool
   223  }
   224  
   225  // RemoveAPIServiceSpec removes an api service from the OpenAPI map. If it does not exist, no error is returned.
   226  // It is thread safe.
   227  func (s *specProxier) RemoveAPIServiceSpec(apiServiceName string) {
   228  	s.rwMutex.Lock()
   229  	defer s.rwMutex.Unlock()
   230  	if apiServiceInfo, ok := s.apiServiceInfo[apiServiceName]; ok {
   231  		s.openAPIV2ConverterHandler.DeleteGroupVersion(getGroupVersionStringFromAPIService(apiServiceInfo.apiService))
   232  		_ = s.updateAPIServiceSpecLocked(openAPIV2Converter)
   233  		delete(s.apiServiceInfo, apiServiceName)
   234  	}
   235  }
   236  
   237  func (s *specProxier) getOpenAPIV3Root() handler3.OpenAPIV3Discovery {
   238  	s.rwMutex.RLock()
   239  	defer s.rwMutex.RUnlock()
   240  
   241  	merged := handler3.OpenAPIV3Discovery{
   242  		Paths: make(map[string]handler3.OpenAPIV3DiscoveryGroupVersion),
   243  	}
   244  
   245  	for _, apiServiceInfo := range s.apiServiceInfo {
   246  		if apiServiceInfo.discovery == nil {
   247  			continue
   248  		}
   249  
   250  		for key, item := range apiServiceInfo.discovery.Paths {
   251  			merged.Paths[key] = item
   252  		}
   253  	}
   254  	return merged
   255  }
   256  
   257  // handleDiscovery is the handler for OpenAPI V3 Discovery
   258  func (s *specProxier) handleDiscovery(w http.ResponseWriter, r *http.Request) {
   259  	merged := s.getOpenAPIV3Root()
   260  	j, err := json.Marshal(&merged)
   261  	if err != nil {
   262  		w.WriteHeader(500)
   263  		klog.Errorf("failed to created merged OpenAPIv3 discovery response: %s", err.Error())
   264  		return
   265  	}
   266  
   267  	http.ServeContent(w, r, "/openapi/v3", time.Now(), bytes.NewReader(j))
   268  }
   269  
   270  // handleGroupVersion is the OpenAPI V3 handler for a specified group/version
   271  func (s *specProxier) handleGroupVersion(w http.ResponseWriter, r *http.Request) {
   272  	s.rwMutex.RLock()
   273  	defer s.rwMutex.RUnlock()
   274  
   275  	// TODO: Import this logic from kube-openapi instead of duplicating
   276  	// URLs for OpenAPI V3 have the format /openapi/v3/<groupversionpath>
   277  	// SplitAfterN with 4 yields ["", "openapi", "v3", <groupversionpath>]
   278  	url := strings.SplitAfterN(r.URL.Path, "/", 4)
   279  	targetGV := url[3]
   280  
   281  	for _, apiServiceInfo := range s.apiServiceInfo {
   282  		if apiServiceInfo.discovery == nil {
   283  			continue
   284  		}
   285  
   286  		for key := range apiServiceInfo.discovery.Paths {
   287  			if targetGV == key {
   288  				apiServiceInfo.handler.ServeHTTP(w, r)
   289  				return
   290  			}
   291  		}
   292  	}
   293  	// No group-versions match the desired request
   294  	w.WriteHeader(404)
   295  }
   296  
   297  // Register registers the OpenAPI V3 Discovery and GroupVersion handlers
   298  func (s *specProxier) register(handler common.PathHandlerByGroupVersion) {
   299  	handler.Handle("/openapi/v3", metrics.InstrumentHandlerFunc("GET",
   300  		/* group = */ "",
   301  		/* version = */ "",
   302  		/* resource = */ "",
   303  		/* subresource = */ "openapi/v3",
   304  		/* scope = */ "",
   305  		/* component = */ "",
   306  		/* deprecated */ false,
   307  		/* removedRelease */ "",
   308  		http.HandlerFunc(s.handleDiscovery)))
   309  	handler.HandlePrefix("/openapi/v3/", metrics.InstrumentHandlerFunc("GET",
   310  		/* group = */ "",
   311  		/* version = */ "",
   312  		/* resource = */ "",
   313  		/* subresource = */ "openapi/v3/",
   314  		/* scope = */ "",
   315  		/* component = */ "",
   316  		/* deprecated */ false,
   317  		/* removedRelease */ "",
   318  		http.HandlerFunc(s.handleGroupVersion)))
   319  }
   320  

View as plain text