1
16
17 package apiserver
18
19 import (
20 "context"
21 "fmt"
22 "net/http"
23 "net/url"
24 "reflect"
25 "sync"
26 "time"
27
28 v1 "k8s.io/api/core/v1"
29 "k8s.io/apimachinery/pkg/api/equality"
30 apierrors "k8s.io/apimachinery/pkg/api/errors"
31 "k8s.io/apimachinery/pkg/api/meta"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/labels"
34 "k8s.io/apimachinery/pkg/runtime"
35 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
36 "k8s.io/apimachinery/pkg/util/wait"
37 v1informers "k8s.io/client-go/informers/core/v1"
38 v1listers "k8s.io/client-go/listers/core/v1"
39 "k8s.io/client-go/tools/cache"
40 "k8s.io/client-go/transport"
41 "k8s.io/client-go/util/workqueue"
42 "k8s.io/component-base/metrics/legacyregistry"
43 "k8s.io/klog/v2"
44 apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
45 apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
46 apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
47 informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
48 listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
49 "k8s.io/kube-aggregator/pkg/controllers"
50 )
51
52
53 var registerIntoLegacyRegistryOnce sync.Once
54
55 type certKeyFunc func() ([]byte, []byte)
56
57
58 type ServiceResolver interface {
59 ResolveEndpoint(namespace, name string, port int32) (*url.URL, error)
60 }
61
62
63 type AvailableConditionController struct {
64 apiServiceClient apiregistrationclient.APIServicesGetter
65
66 apiServiceLister listers.APIServiceLister
67 apiServiceSynced cache.InformerSynced
68
69
70 serviceLister v1listers.ServiceLister
71 servicesSynced cache.InformerSynced
72
73 endpointsLister v1listers.EndpointsLister
74 endpointsSynced cache.InformerSynced
75
76
77 proxyTransportDial *transport.DialHolder
78 proxyCurrentCertKeyContent certKeyFunc
79 serviceResolver ServiceResolver
80
81
82 syncFn func(key string) error
83
84 queue workqueue.RateLimitingInterface
85
86 cache map[string]map[string][]string
87
88 cacheLock sync.RWMutex
89
90
91 metrics *availabilityMetrics
92 }
93
94
95 func NewAvailableConditionController(
96 apiServiceInformer informers.APIServiceInformer,
97 serviceInformer v1informers.ServiceInformer,
98 endpointsInformer v1informers.EndpointsInformer,
99 apiServiceClient apiregistrationclient.APIServicesGetter,
100 proxyTransportDial *transport.DialHolder,
101 proxyCurrentCertKeyContent certKeyFunc,
102 serviceResolver ServiceResolver,
103 ) (*AvailableConditionController, error) {
104 c := &AvailableConditionController{
105 apiServiceClient: apiServiceClient,
106 apiServiceLister: apiServiceInformer.Lister(),
107 serviceLister: serviceInformer.Lister(),
108 endpointsLister: endpointsInformer.Lister(),
109 serviceResolver: serviceResolver,
110 queue: workqueue.NewNamedRateLimitingQueue(
111
112
113
114 workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
115 "AvailableConditionController"),
116 proxyTransportDial: proxyTransportDial,
117 proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
118 metrics: newAvailabilityMetrics(),
119 }
120
121
122
123
124
125 apiServiceHandler, _ := apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod(
126 cache.ResourceEventHandlerFuncs{
127 AddFunc: c.addAPIService,
128 UpdateFunc: c.updateAPIService,
129 DeleteFunc: c.deleteAPIService,
130 },
131 30*time.Second)
132 c.apiServiceSynced = apiServiceHandler.HasSynced
133
134 serviceHandler, _ := serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
135 AddFunc: c.addService,
136 UpdateFunc: c.updateService,
137 DeleteFunc: c.deleteService,
138 })
139 c.servicesSynced = serviceHandler.HasSynced
140
141 endpointsHandler, _ := endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
142 AddFunc: c.addEndpoints,
143 UpdateFunc: c.updateEndpoints,
144 DeleteFunc: c.deleteEndpoints,
145 })
146 c.endpointsSynced = endpointsHandler.HasSynced
147
148 c.syncFn = c.sync
149
150
151 var err error
152 registerIntoLegacyRegistryOnce.Do(func() {
153 err = c.metrics.Register(legacyregistry.Register, legacyregistry.CustomRegister)
154 })
155 if err != nil {
156 return nil, err
157 }
158
159 return c, nil
160 }
161
162 func (c *AvailableConditionController) sync(key string) error {
163 originalAPIService, err := c.apiServiceLister.Get(key)
164 if apierrors.IsNotFound(err) {
165 c.metrics.ForgetAPIService(key)
166 return nil
167 }
168 if err != nil {
169 return err
170 }
171
172
173
174
175 transportConfig := &transport.Config{
176 TLS: transport.TLSConfig{
177 Insecure: true,
178 },
179 DialHolder: c.proxyTransportDial,
180 }
181
182 if c.proxyCurrentCertKeyContent != nil {
183 proxyClientCert, proxyClientKey := c.proxyCurrentCertKeyContent()
184
185 transportConfig.TLS.CertData = proxyClientCert
186 transportConfig.TLS.KeyData = proxyClientKey
187 }
188 restTransport, err := transport.New(transportConfig)
189 if err != nil {
190 return err
191 }
192 discoveryClient := &http.Client{
193 Transport: restTransport,
194
195 Timeout: 5 * time.Second,
196 CheckRedirect: func(req *http.Request, via []*http.Request) error {
197 return http.ErrUseLastResponse
198 },
199 }
200
201 apiService := originalAPIService.DeepCopy()
202
203 availableCondition := apiregistrationv1.APIServiceCondition{
204 Type: apiregistrationv1.Available,
205 Status: apiregistrationv1.ConditionTrue,
206 LastTransitionTime: metav1.Now(),
207 }
208
209
210 if apiService.Spec.Service == nil {
211 apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition())
212 _, err := c.updateAPIServiceStatus(originalAPIService, apiService)
213 return err
214 }
215
216 service, err := c.serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
217 if apierrors.IsNotFound(err) {
218 availableCondition.Status = apiregistrationv1.ConditionFalse
219 availableCondition.Reason = "ServiceNotFound"
220 availableCondition.Message = fmt.Sprintf("service/%s in %q is not present", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
221 apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
222 _, err := c.updateAPIServiceStatus(originalAPIService, apiService)
223 return err
224 } else if err != nil {
225 availableCondition.Status = apiregistrationv1.ConditionUnknown
226 availableCondition.Reason = "ServiceAccessError"
227 availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
228 apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
229 _, err := c.updateAPIServiceStatus(originalAPIService, apiService)
230 return err
231 }
232
233 if service.Spec.Type == v1.ServiceTypeClusterIP {
234
235 servicePort := apiService.Spec.Service.Port
236 portName := ""
237 foundPort := false
238 for _, port := range service.Spec.Ports {
239 if port.Port == *servicePort {
240 foundPort = true
241 portName = port.Name
242 break
243 }
244 }
245 if !foundPort {
246 availableCondition.Status = apiregistrationv1.ConditionFalse
247 availableCondition.Reason = "ServicePortError"
248 availableCondition.Message = fmt.Sprintf("service/%s in %q is not listening on port %d", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, *apiService.Spec.Service.Port)
249 apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
250 _, err := c.updateAPIServiceStatus(originalAPIService, apiService)
251 return err
252 }
253
254 endpoints, err := c.endpointsLister.Endpoints(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
255 if apierrors.IsNotFound(err) {
256 availableCondition.Status = apiregistrationv1.ConditionFalse
257 availableCondition.Reason = "EndpointsNotFound"
258 availableCondition.Message = fmt.Sprintf("cannot find endpoints for service/%s in %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
259 apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
260 _, err := c.updateAPIServiceStatus(originalAPIService, apiService)
261 return err
262 } else if err != nil {
263 availableCondition.Status = apiregistrationv1.ConditionUnknown
264 availableCondition.Reason = "EndpointsAccessError"
265 availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
266 apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
267 _, err := c.updateAPIServiceStatus(originalAPIService, apiService)
268 return err
269 }
270 hasActiveEndpoints := false
271 outer:
272 for _, subset := range endpoints.Subsets {
273 if len(subset.Addresses) == 0 {
274 continue
275 }
276 for _, endpointPort := range subset.Ports {
277 if endpointPort.Name == portName {
278 hasActiveEndpoints = true
279 break outer
280 }
281 }
282 }
283 if !hasActiveEndpoints {
284 availableCondition.Status = apiregistrationv1.ConditionFalse
285 availableCondition.Reason = "MissingEndpoints"
286 availableCondition.Message = fmt.Sprintf("endpoints for service/%s in %q have no addresses with port name %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, portName)
287 apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
288 _, err := c.updateAPIServiceStatus(originalAPIService, apiService)
289 return err
290 }
291 }
292
293 if apiService.Spec.Service != nil && c.serviceResolver != nil {
294 attempts := 5
295 results := make(chan error, attempts)
296 for i := 0; i < attempts; i++ {
297 go func() {
298 discoveryURL, err := c.serviceResolver.ResolveEndpoint(apiService.Spec.Service.Namespace, apiService.Spec.Service.Name, *apiService.Spec.Service.Port)
299 if err != nil {
300 results <- err
301 return
302 }
303
304 if apiService.Name == "v1." {
305 discoveryURL.Path = "/api/" + apiService.Spec.Version
306 } else {
307 discoveryURL.Path = "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
308 }
309
310 errCh := make(chan error, 1)
311 go func() {
312
313 newReq, err := http.NewRequest("GET", discoveryURL.String(), nil)
314 if err != nil {
315 errCh <- err
316 return
317 }
318
319
320 transport.SetAuthProxyHeaders(newReq, "system:kube-aggregator", []string{"system:masters"}, nil)
321 resp, err := discoveryClient.Do(newReq)
322 if resp != nil {
323 resp.Body.Close()
324
325 if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
326 errCh <- fmt.Errorf("bad status from %v: %v", discoveryURL, resp.StatusCode)
327 return
328 }
329 }
330
331 errCh <- err
332 }()
333
334 select {
335 case err = <-errCh:
336 if err != nil {
337 results <- fmt.Errorf("failing or missing response from %v: %v", discoveryURL, err)
338 return
339 }
340
341
342
343 case <-time.After(6 * time.Second):
344 results <- fmt.Errorf("timed out waiting for %v", discoveryURL)
345 return
346 }
347
348 results <- nil
349 }()
350 }
351
352 var lastError error
353 for i := 0; i < attempts; i++ {
354 lastError = <-results
355
356 if lastError == nil {
357 break
358 }
359 }
360
361 if lastError != nil {
362 availableCondition.Status = apiregistrationv1.ConditionFalse
363 availableCondition.Reason = "FailedDiscoveryCheck"
364 availableCondition.Message = lastError.Error()
365 apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
366 _, updateErr := c.updateAPIServiceStatus(originalAPIService, apiService)
367 if updateErr != nil {
368 return updateErr
369 }
370
371
372 return lastError
373 }
374 }
375
376 availableCondition.Reason = "Passed"
377 availableCondition.Message = "all checks passed"
378 apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
379 _, err = c.updateAPIServiceStatus(originalAPIService, apiService)
380 return err
381 }
382
383
384
385 func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) {
386
387 c.setUnavailableGauge(newAPIService)
388
389 if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) {
390 return newAPIService, nil
391 }
392
393 orig := apiregistrationv1apihelper.GetAPIServiceConditionByType(originalAPIService, apiregistrationv1.Available)
394 now := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available)
395 unknown := apiregistrationv1.APIServiceCondition{
396 Type: apiregistrationv1.Available,
397 Status: apiregistrationv1.ConditionUnknown,
398 }
399 if orig == nil {
400 orig = &unknown
401 }
402 if now == nil {
403 now = &unknown
404 }
405 if *orig != *now {
406 klog.V(2).InfoS("changing APIService availability", "name", newAPIService.Name, "oldStatus", orig.Status, "newStatus", now.Status, "message", now.Message, "reason", now.Reason)
407 }
408
409 newAPIService, err := c.apiServiceClient.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{})
410 if err != nil {
411 return nil, err
412 }
413
414 c.setUnavailableCounter(originalAPIService, newAPIService)
415 return newAPIService, nil
416 }
417
418
419 func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{}) {
420 defer utilruntime.HandleCrash()
421 defer c.queue.ShutDown()
422
423 klog.Info("Starting AvailableConditionController")
424 defer klog.Info("Shutting down AvailableConditionController")
425
426
427
428
429
430 if !controllers.WaitForCacheSync("AvailableConditionController", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointsSynced) {
431 return
432 }
433
434 for i := 0; i < workers; i++ {
435 go wait.Until(c.runWorker, time.Second, stopCh)
436 }
437
438 <-stopCh
439 }
440
441 func (c *AvailableConditionController) runWorker() {
442 for c.processNextWorkItem() {
443 }
444 }
445
446
447 func (c *AvailableConditionController) processNextWorkItem() bool {
448 key, quit := c.queue.Get()
449 if quit {
450 return false
451 }
452 defer c.queue.Done(key)
453
454 err := c.syncFn(key.(string))
455 if err == nil {
456 c.queue.Forget(key)
457 return true
458 }
459
460 utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
461 c.queue.AddRateLimited(key)
462
463 return true
464 }
465
466 func (c *AvailableConditionController) addAPIService(obj interface{}) {
467 castObj := obj.(*apiregistrationv1.APIService)
468 klog.V(4).Infof("Adding %s", castObj.Name)
469 if castObj.Spec.Service != nil {
470 c.rebuildAPIServiceCache()
471 }
472 c.queue.Add(castObj.Name)
473 }
474
475 func (c *AvailableConditionController) updateAPIService(oldObj, newObj interface{}) {
476 castObj := newObj.(*apiregistrationv1.APIService)
477 oldCastObj := oldObj.(*apiregistrationv1.APIService)
478 klog.V(4).Infof("Updating %s", oldCastObj.Name)
479 if !reflect.DeepEqual(castObj.Spec.Service, oldCastObj.Spec.Service) {
480 c.rebuildAPIServiceCache()
481 }
482 c.queue.Add(oldCastObj.Name)
483 }
484
485 func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
486 castObj, ok := obj.(*apiregistrationv1.APIService)
487 if !ok {
488 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
489 if !ok {
490 klog.Errorf("Couldn't get object from tombstone %#v", obj)
491 return
492 }
493 castObj, ok = tombstone.Obj.(*apiregistrationv1.APIService)
494 if !ok {
495 klog.Errorf("Tombstone contained object that is not expected %#v", obj)
496 return
497 }
498 }
499 klog.V(4).Infof("Deleting %q", castObj.Name)
500 if castObj.Spec.Service != nil {
501 c.rebuildAPIServiceCache()
502 }
503 c.queue.Add(castObj.Name)
504 }
505
506 func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []string {
507 metadata, err := meta.Accessor(obj)
508 if err != nil {
509 utilruntime.HandleError(err)
510 return nil
511 }
512 c.cacheLock.RLock()
513 defer c.cacheLock.RUnlock()
514 return c.cache[metadata.GetNamespace()][metadata.GetName()]
515 }
516
517
518
519
520 func (c *AvailableConditionController) rebuildAPIServiceCache() {
521 apiServiceList, _ := c.apiServiceLister.List(labels.Everything())
522 newCache := map[string]map[string][]string{}
523 for _, apiService := range apiServiceList {
524 if apiService.Spec.Service == nil {
525 continue
526 }
527 if newCache[apiService.Spec.Service.Namespace] == nil {
528 newCache[apiService.Spec.Service.Namespace] = map[string][]string{}
529 }
530 newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name] = append(newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name], apiService.Name)
531 }
532
533 c.cacheLock.Lock()
534 defer c.cacheLock.Unlock()
535 c.cache = newCache
536 }
537
538
539
540 func (c *AvailableConditionController) addService(obj interface{}) {
541 for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
542 c.queue.Add(apiService)
543 }
544 }
545
546 func (c *AvailableConditionController) updateService(obj, _ interface{}) {
547 for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
548 c.queue.Add(apiService)
549 }
550 }
551
552 func (c *AvailableConditionController) deleteService(obj interface{}) {
553 castObj, ok := obj.(*v1.Service)
554 if !ok {
555 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
556 if !ok {
557 klog.Errorf("Couldn't get object from tombstone %#v", obj)
558 return
559 }
560 castObj, ok = tombstone.Obj.(*v1.Service)
561 if !ok {
562 klog.Errorf("Tombstone contained object that is not expected %#v", obj)
563 return
564 }
565 }
566 for _, apiService := range c.getAPIServicesFor(castObj) {
567 c.queue.Add(apiService)
568 }
569 }
570
571 func (c *AvailableConditionController) addEndpoints(obj interface{}) {
572 for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
573 c.queue.Add(apiService)
574 }
575 }
576
577 func (c *AvailableConditionController) updateEndpoints(obj, _ interface{}) {
578 for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
579 c.queue.Add(apiService)
580 }
581 }
582
583 func (c *AvailableConditionController) deleteEndpoints(obj interface{}) {
584 castObj, ok := obj.(*v1.Endpoints)
585 if !ok {
586 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
587 if !ok {
588 klog.Errorf("Couldn't get object from tombstone %#v", obj)
589 return
590 }
591 castObj, ok = tombstone.Obj.(*v1.Endpoints)
592 if !ok {
593 klog.Errorf("Tombstone contained object that is not expected %#v", obj)
594 return
595 }
596 }
597 for _, apiService := range c.getAPIServicesFor(castObj) {
598 c.queue.Add(apiService)
599 }
600 }
601
602
603 func (c *AvailableConditionController) setUnavailableGauge(newAPIService *apiregistrationv1.APIService) {
604 if apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) {
605 c.metrics.SetAPIServiceAvailable(newAPIService.Name)
606 return
607 }
608
609 c.metrics.SetAPIServiceUnavailable(newAPIService.Name)
610 }
611
612
613 func (c *AvailableConditionController) setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) {
614 wasAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(originalAPIService, apiregistrationv1.Available)
615 isAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available)
616 statusChanged := isAvailable != wasAvailable
617
618 if statusChanged && !isAvailable {
619 reason := "UnknownReason"
620 if newCondition := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available); newCondition != nil {
621 reason = newCondition.Reason
622 }
623 c.metrics.UnavailableCounter(newAPIService.Name, reason).Inc()
624 }
625 }
626
View as plain text