1
16
17 package apiserver
18
19 import (
20 "errors"
21 "fmt"
22 "net/http"
23 "sync"
24 "time"
25
26 apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
27 apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
28 apidiscoveryv2conversion "k8s.io/apiserver/pkg/apis/apidiscovery/v2"
29
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/runtime"
32 "k8s.io/apimachinery/pkg/runtime/schema"
33 "k8s.io/apimachinery/pkg/runtime/serializer"
34 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
35 "k8s.io/apimachinery/pkg/util/sets"
36 "k8s.io/apimachinery/pkg/util/wait"
37 "k8s.io/apiserver/pkg/authentication/user"
38 "k8s.io/apiserver/pkg/endpoints"
39 discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
40 "k8s.io/apiserver/pkg/endpoints/request"
41 "k8s.io/client-go/discovery"
42 "k8s.io/client-go/util/workqueue"
43 "k8s.io/klog/v2"
44 apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
45 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
46 "k8s.io/kube-aggregator/pkg/apiserver/scheme"
47 )
48
49 var APIRegistrationGroupVersion metav1.GroupVersion = metav1.GroupVersion{Group: "apiregistration.k8s.io", Version: "v1"}
50
51
52
53 var APIRegistrationGroupPriority int = 20001
54
55
56 var v2Beta1GVK = schema.GroupVersionKind{
57 Group: "apidiscovery.k8s.io",
58 Version: "v2beta1",
59 Kind: "APIGroupDiscoveryList",
60 }
61
62 var v2GVK = schema.GroupVersionKind{
63 Group: "apidiscovery.k8s.io",
64 Version: "v2",
65 Kind: "APIGroupDiscoveryList",
66 }
67
68
69
70
71 type DiscoveryAggregationController interface {
72
73
74
75 AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler)
76
77
78
79
80 RemoveAPIService(apiServiceName string)
81
82
83
84 Run(stopCh <-chan struct{}, discoverySyncedCh chan<- struct{})
85 }
86
87 type discoveryManager struct {
88
89 servicesLock sync.RWMutex
90
91
92
93 apiServices map[string]groupVersionInfo
94
95
96 resultsLock sync.RWMutex
97
98
99
100 cachedResults map[serviceKey]cachedResult
101
102
103
104
105
106 dirtyAPIServiceQueue workqueue.RateLimitingInterface
107
108
109 mergedDiscoveryHandler discoveryendpoint.ResourceManager
110
111
112 codecs serializer.CodecFactory
113 }
114
115
116 type serviceKey struct {
117 Namespace string
118 Name string
119 Port int32
120 }
121
122
123 func (s serviceKey) String() string {
124 return fmt.Sprintf("%v/%v:%v", s.Namespace, s.Name, s.Port)
125 }
126
127 func newServiceKey(service apiregistrationv1.ServiceReference) serviceKey {
128
129
130
131 port := int32(443)
132 if service.Port != nil {
133 port = *service.Port
134 }
135
136 return serviceKey{
137 Name: service.Name,
138 Namespace: service.Namespace,
139 Port: port,
140 }
141 }
142
143 type cachedResult struct {
144
145
146 discovery map[metav1.GroupVersion]apidiscoveryv2.APIVersionDiscovery
147
148
149 etag string
150
151
152
153 lastUpdated time.Time
154 }
155
156
157 type groupVersionInfo struct {
158
159
160
161
162
163
164
165
166
167
168 lastMarkedDirty time.Time
169
170
171
172 service serviceKey
173
174
175 groupPriority int
176
177
178 versionPriority int
179
180
181 handler http.Handler
182 }
183
184 var _ DiscoveryAggregationController = &discoveryManager{}
185
186 func NewDiscoveryManager(
187 target discoveryendpoint.ResourceManager,
188 ) DiscoveryAggregationController {
189 discoveryScheme := runtime.NewScheme()
190 utilruntime.Must(apidiscoveryv2.AddToScheme(discoveryScheme))
191 utilruntime.Must(apidiscoveryv2beta1.AddToScheme(discoveryScheme))
192
193 utilruntime.Must(apidiscoveryv2conversion.RegisterConversions(discoveryScheme))
194 codecs := serializer.NewCodecFactory(discoveryScheme)
195
196 return &discoveryManager{
197 mergedDiscoveryHandler: target,
198 apiServices: make(map[string]groupVersionInfo),
199 cachedResults: make(map[serviceKey]cachedResult),
200 dirtyAPIServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "discovery-manager"),
201 codecs: codecs,
202 }
203 }
204
205
206
207
208
209
210
211
212 func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion, info groupVersionInfo) (*cachedResult, error) {
213
214 cached, exists := dm.getCacheEntryForService(info.service)
215
216
217 if exists && cached.lastUpdated.After(info.lastMarkedDirty) {
218 return &cached, nil
219 }
220
221
222
223 handler := http.TimeoutHandler(info.handler, 5*time.Second, "request timed out")
224 req, err := http.NewRequest("GET", "/apis", nil)
225 if err != nil {
226
227
228 return &cached, fmt.Errorf("failed to create http.Request: %v", err)
229 }
230
231
232 req = req.WithContext(
233 request.WithUser(
234 req.Context(), &user.DefaultInfo{Name: "system:kube-aggregator", Groups: []string{"system:masters"}}))
235 req = req.WithContext(request.WithRequestInfo(req.Context(), &request.RequestInfo{
236 Path: req.URL.Path,
237 IsResourceRequest: false,
238 }))
239 req.Header.Add("Accept", discovery.AcceptV2+","+discovery.AcceptV2Beta1)
240
241 if exists && len(cached.etag) > 0 {
242 req.Header.Add("If-None-Match", cached.etag)
243 }
244
245
246
247
248 now := time.Now()
249 writer := newInMemoryResponseWriter()
250 handler.ServeHTTP(writer, req)
251
252 isV2Beta1GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2Beta1GVK)
253 isV2GVK, _ := discovery.ContentTypeIsGVK(writer.Header().Get("Content-Type"), v2GVK)
254
255 switch {
256 case writer.respCode == http.StatusNotModified:
257
258 cached = cachedResult{
259 discovery: cached.discovery,
260 etag: cached.etag,
261 lastUpdated: now,
262 }
263
264 dm.setCacheEntryForService(info.service, cached)
265 return &cached, nil
266 case writer.respCode == http.StatusServiceUnavailable:
267 return nil, fmt.Errorf("service %s returned non-success response code: %v",
268 info.service.String(), writer.respCode)
269 case writer.respCode == http.StatusOK && (isV2GVK || isV2Beta1GVK):
270 parsed := &apidiscoveryv2.APIGroupDiscoveryList{}
271 if err := runtime.DecodeInto(dm.codecs.UniversalDecoder(), writer.data, parsed); err != nil {
272 return nil, err
273 }
274
275 klog.V(3).Infof("DiscoveryManager: Successfully downloaded discovery for %s", info.service.String())
276
277
278 discoMap := map[metav1.GroupVersion]apidiscoveryv2.APIVersionDiscovery{}
279 for _, g := range parsed.Items {
280 for _, v := range g.Versions {
281 discoMap[metav1.GroupVersion{Group: g.Name, Version: v.Version}] = v
282 for i := range v.Resources {
283
284
285 if v.Resources[i].ResponseKind == nil {
286 v.Resources[i].ResponseKind = &metav1.GroupVersionKind{}
287 }
288 for j := range v.Resources[i].Subresources {
289 if v.Resources[i].Subresources[j].ResponseKind == nil {
290 v.Resources[i].Subresources[j].ResponseKind = &metav1.GroupVersionKind{}
291 }
292 }
293 }
294 }
295 }
296
297
298 cached = cachedResult{
299 discovery: discoMap,
300 etag: writer.Header().Get("Etag"),
301 lastUpdated: now,
302 }
303 dm.setCacheEntryForService(info.service, cached)
304 return &cached, nil
305 default:
306
307
308 if len(gv.Version) == 0 {
309 return nil, errors.New("not found")
310 }
311
312 var path string
313 if len(gv.Group) == 0 {
314 path = "/api/" + gv.Version
315 } else {
316 path = "/apis/" + gv.Group + "/" + gv.Version
317 }
318
319 req, err := http.NewRequest("GET", path, nil)
320 if err != nil {
321
322
323 return nil, fmt.Errorf("failed to create http.Request: %v", err)
324 }
325
326
327 req = req.WithContext(
328 request.WithUser(
329 req.Context(), &user.DefaultInfo{Name: "system:kube-aggregator"}))
330
331
332 req.Header.Add("Accept", runtime.ContentTypeJSON)
333
334 if exists && len(cached.etag) > 0 {
335 req.Header.Add("If-None-Match", cached.etag)
336 }
337
338 writer := newInMemoryResponseWriter()
339 handler.ServeHTTP(writer, req)
340
341 if writer.respCode != http.StatusOK {
342 return nil, fmt.Errorf("failed to download legacy discovery for %s: %v", path, writer.String())
343 }
344
345 parsed := &metav1.APIResourceList{}
346 if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil {
347 return nil, err
348 }
349
350
351 resources, err := endpoints.ConvertGroupVersionIntoToDiscovery(parsed.APIResources)
352 if err != nil {
353 return nil, err
354 }
355 klog.V(3).Infof("DiscoveryManager: Successfully downloaded legacy discovery for %s", info.service.String())
356
357 discoMap := map[metav1.GroupVersion]apidiscoveryv2.APIVersionDiscovery{
358
359 gv: {
360 Version: gv.Version,
361 Resources: resources,
362 },
363 }
364
365 cached = cachedResult{
366 discovery: discoMap,
367 lastUpdated: now,
368 }
369
370
371
372
373 return &cached, nil
374 }
375 }
376
377
378 func (dm *discoveryManager) syncAPIService(apiServiceName string) error {
379 info, exists := dm.getInfoForAPIService(apiServiceName)
380
381 gv := helper.APIServiceNameToGroupVersion(apiServiceName)
382 mgv := metav1.GroupVersion{Group: gv.Group, Version: gv.Version}
383
384 if !exists {
385
386 dm.mergedDiscoveryHandler.RemoveGroupVersion(mgv)
387 return nil
388 }
389
390
391 cached, err := dm.fetchFreshDiscoveryForService(mgv, info)
392
393 var entry apidiscoveryv2.APIVersionDiscovery
394
395
396
397 if cached == nil {
398
399
400
401
402
403
404 entry = apidiscoveryv2.APIVersionDiscovery{
405 Version: gv.Version,
406 }
407 } else {
408
409 entry, exists = cached.discovery[mgv]
410 if exists {
411
412 } else {
413
414
415 entry = apidiscoveryv2.APIVersionDiscovery{
416 Version: gv.Version,
417 }
418 }
419 }
420
421
422
423 if err == nil {
424 entry.Freshness = apidiscoveryv2.DiscoveryFreshnessCurrent
425 } else {
426 entry.Freshness = apidiscoveryv2.DiscoveryFreshnessStale
427 }
428
429 dm.mergedDiscoveryHandler.AddGroupVersion(gv.Group, entry)
430 dm.mergedDiscoveryHandler.SetGroupVersionPriority(metav1.GroupVersion(gv), info.groupPriority, info.versionPriority)
431 return nil
432 }
433
434 func (dm *discoveryManager) getAPIServiceKeys() []string {
435 dm.servicesLock.RLock()
436 defer dm.servicesLock.RUnlock()
437 keys := []string{}
438 for key := range dm.apiServices {
439 keys = append(keys, key)
440 }
441 return keys
442 }
443
444
445
446 func (dm *discoveryManager) Run(stopCh <-chan struct{}, discoverySyncedCh chan<- struct{}) {
447 klog.Info("Starting ResourceDiscoveryManager")
448
449
450 defer dm.dirtyAPIServiceQueue.ShutDown()
451
452
453 dm.mergedDiscoveryHandler.WithSource(discoveryendpoint.BuiltinSource).SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0)
454
455
456 var wg sync.WaitGroup
457
458 keys := dm.getAPIServiceKeys()
459
460 for _, key := range keys {
461 wg.Add(1)
462 go func(k string) {
463 defer wg.Done()
464
465
466 _ = dm.syncAPIService(k)
467 }(key)
468 }
469 wg.Wait()
470
471 if discoverySyncedCh != nil {
472 close(discoverySyncedCh)
473 }
474
475
476
477
478
479
480 for i := 0; i < 2; i++ {
481 go func() {
482 for {
483 next, shutdown := dm.dirtyAPIServiceQueue.Get()
484 if shutdown {
485 return
486 }
487
488 func() {
489 defer dm.dirtyAPIServiceQueue.Done(next)
490
491 if err := dm.syncAPIService(next.(string)); err != nil {
492 dm.dirtyAPIServiceQueue.AddRateLimited(next)
493 } else {
494 dm.dirtyAPIServiceQueue.Forget(next)
495 }
496 }()
497 }
498 }()
499 }
500
501 wait.PollUntil(1*time.Minute, func() (done bool, err error) {
502 dm.servicesLock.Lock()
503 defer dm.servicesLock.Unlock()
504
505 now := time.Now()
506
507
508 for key, info := range dm.apiServices {
509 info.lastMarkedDirty = now
510 dm.apiServices[key] = info
511 dm.dirtyAPIServiceQueue.Add(key)
512 }
513 return false, nil
514 }, stopCh)
515 }
516
517
518
519 func (dm *discoveryManager) removeUnusedServices() {
520 usedServiceKeys := sets.Set[serviceKey]{}
521
522 func() {
523 dm.servicesLock.Lock()
524 defer dm.servicesLock.Unlock()
525
526
527 for _, info := range dm.apiServices {
528 usedServiceKeys.Insert(info.service)
529 }
530 }()
531
532
533
534
535 func() {
536 dm.resultsLock.Lock()
537 defer dm.resultsLock.Unlock()
538
539 for key := range dm.cachedResults {
540 if !usedServiceKeys.Has(key) {
541 delete(dm.cachedResults, key)
542 }
543 }
544 }()
545 }
546
547
548
549 func (dm *discoveryManager) AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler) {
550
551
552 if apiService.Spec.Service == nil {
553 return
554 }
555
556
557 dm.setInfoForAPIService(apiService.Name, &groupVersionInfo{
558 groupPriority: int(apiService.Spec.GroupPriorityMinimum),
559 versionPriority: int(apiService.Spec.VersionPriority),
560 handler: handler,
561 lastMarkedDirty: time.Now(),
562 service: newServiceKey(*apiService.Spec.Service),
563 })
564 dm.removeUnusedServices()
565 dm.dirtyAPIServiceQueue.Add(apiService.Name)
566 }
567
568 func (dm *discoveryManager) RemoveAPIService(apiServiceName string) {
569 if dm.setInfoForAPIService(apiServiceName, nil) != nil {
570
571 dm.removeUnusedServices()
572 dm.dirtyAPIServiceQueue.Add(apiServiceName)
573 }
574 }
575
576
577
578
579
580 func (dm *discoveryManager) getCacheEntryForService(key serviceKey) (cachedResult, bool) {
581 dm.resultsLock.RLock()
582 defer dm.resultsLock.RUnlock()
583
584 result, ok := dm.cachedResults[key]
585 return result, ok
586 }
587
588 func (dm *discoveryManager) setCacheEntryForService(key serviceKey, result cachedResult) {
589 dm.resultsLock.Lock()
590 defer dm.resultsLock.Unlock()
591
592 dm.cachedResults[key] = result
593 }
594
595 func (dm *discoveryManager) getInfoForAPIService(name string) (groupVersionInfo, bool) {
596 dm.servicesLock.RLock()
597 defer dm.servicesLock.RUnlock()
598
599 result, ok := dm.apiServices[name]
600 return result, ok
601 }
602
603 func (dm *discoveryManager) setInfoForAPIService(name string, result *groupVersionInfo) (oldValueIfExisted *groupVersionInfo) {
604 dm.servicesLock.Lock()
605 defer dm.servicesLock.Unlock()
606
607 if oldValue, exists := dm.apiServices[name]; exists {
608 oldValueIfExisted = &oldValue
609 }
610
611 if result != nil {
612 dm.apiServices[name] = *result
613 } else {
614 delete(dm.apiServices, name)
615 }
616
617 return oldValueIfExisted
618 }
619
620
621
622
623
624 type inMemoryResponseWriter struct {
625 writeHeaderCalled bool
626 header http.Header
627 respCode int
628 data []byte
629 }
630
631 func newInMemoryResponseWriter() *inMemoryResponseWriter {
632 return &inMemoryResponseWriter{header: http.Header{}}
633 }
634
635 func (r *inMemoryResponseWriter) Header() http.Header {
636 return r.header
637 }
638
639 func (r *inMemoryResponseWriter) WriteHeader(code int) {
640 r.writeHeaderCalled = true
641 r.respCode = code
642 }
643
644 func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
645 if !r.writeHeaderCalled {
646 r.WriteHeader(http.StatusOK)
647 }
648 r.data = append(r.data, in...)
649 return len(in), nil
650 }
651
652 func (r *inMemoryResponseWriter) String() string {
653 s := fmt.Sprintf("ResponseCode: %d", r.respCode)
654 if r.data != nil {
655 s += fmt.Sprintf(", Body: %s", string(r.data))
656 }
657 if r.header != nil {
658 s += fmt.Sprintf(", Header: %s", r.header)
659 }
660 return s
661 }
662
View as plain text