1
16
17 package aggregator
18
19 import (
20 "bytes"
21 "encoding/json"
22 "errors"
23 "fmt"
24 "net/http"
25 "strings"
26 "sync"
27 "time"
28
29 "github.com/emicklei/go-restful/v3"
30
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apiserver/pkg/endpoints/metrics"
33 "k8s.io/apiserver/pkg/server"
34 "k8s.io/apiserver/pkg/server/mux"
35 "k8s.io/apiserver/pkg/server/routes"
36 "k8s.io/klog/v2"
37 v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
38 "k8s.io/kube-openapi/pkg/common"
39 "k8s.io/kube-openapi/pkg/handler3"
40 "k8s.io/kube-openapi/pkg/openapiconv"
41
42 v2aggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
43 )
44
45 var ErrAPIServiceNotFound = errors.New("resource not found")
46
47
48 type SpecProxier interface {
49 AddUpdateAPIService(handler http.Handler, apiService *v1.APIService)
50
51 UpdateAPIServiceSpec(apiServiceName string) error
52 RemoveAPIServiceSpec(apiServiceName string)
53 GetAPIServiceNames() []string
54 }
55
56 const (
57 aggregatorUser = "system:aggregator"
58 specDownloadTimeout = 60 * time.Second
59 localDelegateChainNamePrefix = "k8s_internal_local_delegation_chain_"
60 localDelegateChainNamePattern = localDelegateChainNamePrefix + "%010d"
61 openAPIV2Converter = "openapiv2converter"
62 )
63
64
65 func IsLocalAPIService(apiServiceName string) bool {
66 return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix)
67 }
68
69
70
71
72 func (s *specProxier) GetAPIServiceNames() []string {
73 s.rwMutex.RLock()
74 defer s.rwMutex.RUnlock()
75
76 names := make([]string, 0, len(s.apiServiceInfo))
77 for key := range s.apiServiceInfo {
78 names = append(names, key)
79 }
80 return names
81 }
82
83
84 func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.DelegationTarget, aggregatorService *restful.Container, openAPIConfig *common.OpenAPIV3Config, pathHandler common.PathHandlerByGroupVersion) (SpecProxier, error) {
85 s := &specProxier{
86 apiServiceInfo: map[string]*openAPIV3APIServiceInfo{},
87 downloader: downloader,
88 }
89
90 if aggregatorService != nil && openAPIConfig != nil {
91
92
93 aggregatorLocalServiceName := "k8s_internal_local_kube_aggregator_types"
94 v3Mux := mux.NewPathRecorderMux(aggregatorLocalServiceName)
95 _ = routes.OpenAPI{
96 V3Config: openAPIConfig,
97 }.InstallV3(aggregatorService, v3Mux)
98
99 s.AddUpdateAPIService(v3Mux, &v1.APIService{
100 ObjectMeta: metav1.ObjectMeta{
101 Name: aggregatorLocalServiceName,
102 },
103 })
104 s.UpdateAPIServiceSpec(aggregatorLocalServiceName)
105 }
106
107 i := 1
108 for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() {
109 handler := delegate.UnprotectedHandler()
110 if handler == nil {
111 continue
112 }
113
114 apiServiceName := fmt.Sprintf(localDelegateChainNamePattern, i)
115 localAPIService := v1.APIService{}
116 localAPIService.Name = apiServiceName
117 s.AddUpdateAPIService(handler, &localAPIService)
118 s.UpdateAPIServiceSpec(apiServiceName)
119 i++
120 }
121
122 handler := handler3.NewOpenAPIService()
123 s.openAPIV2ConverterHandler = handler
124 openAPIV2ConverterMux := mux.NewPathRecorderMux(openAPIV2Converter)
125 s.openAPIV2ConverterHandler.RegisterOpenAPIV3VersionedService("/openapi/v3", openAPIV2ConverterMux)
126 openAPIV2ConverterAPIService := v1.APIService{}
127 openAPIV2ConverterAPIService.Name = openAPIV2Converter
128 s.AddUpdateAPIService(openAPIV2ConverterMux, &openAPIV2ConverterAPIService)
129 s.register(pathHandler)
130
131 return s, nil
132 }
133
134
135 func (s *specProxier) AddUpdateAPIService(handler http.Handler, apiservice *v1.APIService) {
136 s.rwMutex.Lock()
137 defer s.rwMutex.Unlock()
138
139 if apiServiceInfo, ok := s.apiServiceInfo[apiservice.Name]; ok {
140 apiServiceInfo.apiService = *apiservice
141 apiServiceInfo.handler = handler
142 return
143 }
144 s.apiServiceInfo[apiservice.Name] = &openAPIV3APIServiceInfo{
145 apiService: *apiservice,
146 handler: handler,
147 }
148 }
149
150 func getGroupVersionStringFromAPIService(apiService v1.APIService) string {
151 if apiService.Spec.Group == "" && apiService.Spec.Version == "" {
152 return ""
153 }
154 return "apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
155 }
156
157
158
159 func (s *specProxier) UpdateAPIServiceSpec(apiServiceName string) error {
160 s.rwMutex.Lock()
161 defer s.rwMutex.Unlock()
162 return s.updateAPIServiceSpecLocked(apiServiceName)
163 }
164
165 func (s *specProxier) updateAPIServiceSpecLocked(apiServiceName string) error {
166 apiService, exists := s.apiServiceInfo[apiServiceName]
167 if !exists {
168 return ErrAPIServiceNotFound
169 }
170
171 if !apiService.isLegacyAPIService {
172 gv, httpStatus, err := s.downloader.OpenAPIV3Root(apiService.handler)
173 if err != nil {
174 return err
175 }
176 if httpStatus == http.StatusNotFound {
177 apiService.isLegacyAPIService = true
178 } else {
179 s.apiServiceInfo[apiServiceName].discovery = gv
180 return nil
181 }
182 }
183
184 newDownloader := v2aggregator.Downloader{}
185 v2Spec, etag, httpStatus, err := newDownloader.Download(apiService.handler, apiService.etag)
186 if err != nil {
187 return err
188 }
189 apiService.etag = etag
190 if httpStatus == http.StatusOK {
191 v3Spec := openapiconv.ConvertV2ToV3(v2Spec)
192 s.openAPIV2ConverterHandler.UpdateGroupVersion(getGroupVersionStringFromAPIService(apiService.apiService), v3Spec)
193 s.updateAPIServiceSpecLocked(openAPIV2Converter)
194 }
195 return nil
196 }
197
198 type specProxier struct {
199
200 rwMutex sync.RWMutex
201
202
203 apiServiceInfo map[string]*openAPIV3APIServiceInfo
204
205
206 downloader Downloader
207
208 openAPIV2ConverterHandler *handler3.OpenAPIService
209 }
210
211 var _ SpecProxier = &specProxier{}
212
213 type openAPIV3APIServiceInfo struct {
214 apiService v1.APIService
215 handler http.Handler
216 discovery *handler3.OpenAPIV3Discovery
217
218
219
220
221 etag string
222 isLegacyAPIService bool
223 }
224
225
226
227 func (s *specProxier) RemoveAPIServiceSpec(apiServiceName string) {
228 s.rwMutex.Lock()
229 defer s.rwMutex.Unlock()
230 if apiServiceInfo, ok := s.apiServiceInfo[apiServiceName]; ok {
231 s.openAPIV2ConverterHandler.DeleteGroupVersion(getGroupVersionStringFromAPIService(apiServiceInfo.apiService))
232 _ = s.updateAPIServiceSpecLocked(openAPIV2Converter)
233 delete(s.apiServiceInfo, apiServiceName)
234 }
235 }
236
237 func (s *specProxier) getOpenAPIV3Root() handler3.OpenAPIV3Discovery {
238 s.rwMutex.RLock()
239 defer s.rwMutex.RUnlock()
240
241 merged := handler3.OpenAPIV3Discovery{
242 Paths: make(map[string]handler3.OpenAPIV3DiscoveryGroupVersion),
243 }
244
245 for _, apiServiceInfo := range s.apiServiceInfo {
246 if apiServiceInfo.discovery == nil {
247 continue
248 }
249
250 for key, item := range apiServiceInfo.discovery.Paths {
251 merged.Paths[key] = item
252 }
253 }
254 return merged
255 }
256
257
258 func (s *specProxier) handleDiscovery(w http.ResponseWriter, r *http.Request) {
259 merged := s.getOpenAPIV3Root()
260 j, err := json.Marshal(&merged)
261 if err != nil {
262 w.WriteHeader(500)
263 klog.Errorf("failed to created merged OpenAPIv3 discovery response: %s", err.Error())
264 return
265 }
266
267 http.ServeContent(w, r, "/openapi/v3", time.Now(), bytes.NewReader(j))
268 }
269
270
271 func (s *specProxier) handleGroupVersion(w http.ResponseWriter, r *http.Request) {
272 s.rwMutex.RLock()
273 defer s.rwMutex.RUnlock()
274
275
276
277
278 url := strings.SplitAfterN(r.URL.Path, "/", 4)
279 targetGV := url[3]
280
281 for _, apiServiceInfo := range s.apiServiceInfo {
282 if apiServiceInfo.discovery == nil {
283 continue
284 }
285
286 for key := range apiServiceInfo.discovery.Paths {
287 if targetGV == key {
288 apiServiceInfo.handler.ServeHTTP(w, r)
289 return
290 }
291 }
292 }
293
294 w.WriteHeader(404)
295 }
296
297
298 func (s *specProxier) register(handler common.PathHandlerByGroupVersion) {
299 handler.Handle("/openapi/v3", metrics.InstrumentHandlerFunc("GET",
300 "",
301 "",
302 "",
303 "openapi/v3",
304 "",
305 "",
306 false,
307 "",
308 http.HandlerFunc(s.handleDiscovery)))
309 handler.HandlePrefix("/openapi/v3/", metrics.InstrumentHandlerFunc("GET",
310 "",
311 "",
312 "",
313 "openapi/v3/",
314 "",
315 "",
316 false,
317 "",
318 http.HandlerFunc(s.handleGroupVersion)))
319 }
320
View as plain text