...
1
16
17 package openapi
18
19 import (
20 "fmt"
21 "net/http"
22 "time"
23
24 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
25 "k8s.io/apimachinery/pkg/util/wait"
26 "k8s.io/client-go/util/workqueue"
27 "k8s.io/klog/v2"
28 v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
29 "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
30 )
31
32 const (
33 successfulUpdateDelay = time.Minute
34 successfulUpdateDelayLocal = time.Second
35 failedUpdateMaxExpDelay = time.Hour
36 )
37
38 type syncAction int
39
40 const (
41 syncRequeue syncAction = iota
42 syncRequeueRateLimited
43 syncNothing
44 )
45
46
47
48 type AggregationController struct {
49 openAPIAggregationManager aggregator.SpecAggregator
50 queue workqueue.RateLimitingInterface
51 downloader *aggregator.Downloader
52
53
54 syncHandler func(key string) (syncAction, error)
55 }
56
57
58 func NewAggregationController(downloader *aggregator.Downloader, openAPIAggregationManager aggregator.SpecAggregator) *AggregationController {
59 c := &AggregationController{
60 openAPIAggregationManager: openAPIAggregationManager,
61 queue: workqueue.NewNamedRateLimitingQueue(
62 workqueue.NewItemExponentialFailureRateLimiter(successfulUpdateDelay, failedUpdateMaxExpDelay),
63 "open_api_aggregation_controller",
64 ),
65 downloader: downloader,
66 }
67
68 c.syncHandler = c.sync
69
70 return c
71 }
72
73
74 func (c *AggregationController) Run(stopCh <-chan struct{}) {
75 defer utilruntime.HandleCrash()
76 defer c.queue.ShutDown()
77
78 klog.Info("Starting OpenAPI AggregationController")
79 defer klog.Info("Shutting down OpenAPI AggregationController")
80
81 go wait.Until(c.runWorker, time.Second, stopCh)
82
83 <-stopCh
84 }
85
86 func (c *AggregationController) runWorker() {
87 for c.processNextWorkItem() {
88 }
89 }
90
91
92 func (c *AggregationController) processNextWorkItem() bool {
93 key, quit := c.queue.Get()
94 defer c.queue.Done(key)
95 if quit {
96 return false
97 }
98 klog.V(4).Infof("OpenAPI AggregationController: Processing item %s", key)
99
100 action, err := c.syncHandler(key.(string))
101 if err != nil {
102 utilruntime.HandleError(fmt.Errorf("loading OpenAPI spec for %q failed with: %v", key, err))
103 }
104
105 switch action {
106 case syncRequeue:
107 c.queue.AddAfter(key, successfulUpdateDelay)
108 case syncRequeueRateLimited:
109 klog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key)
110 c.queue.AddRateLimited(key)
111 case syncNothing:
112 c.queue.Forget(key)
113 }
114
115 return true
116 }
117
118 func (c *AggregationController) sync(key string) (syncAction, error) {
119 if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key); err != nil {
120 if err == aggregator.ErrAPIServiceNotFound {
121 return syncNothing, nil
122 } else {
123 return syncRequeueRateLimited, err
124 }
125 }
126 return syncRequeue, nil
127 }
128
129
130 func (c *AggregationController) AddAPIService(handler http.Handler, apiService *v1.APIService) {
131 if apiService.Spec.Service == nil {
132 return
133 }
134 if err := c.openAPIAggregationManager.AddUpdateAPIService(apiService, handler); err != nil {
135 utilruntime.HandleError(fmt.Errorf("adding %q to AggregationController failed with: %v", apiService.Name, err))
136 }
137 c.queue.AddAfter(apiService.Name, time.Second)
138 }
139
140
141 func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *v1.APIService) {
142 if apiService.Spec.Service == nil {
143 return
144 }
145 if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(apiService.Name); err != nil {
146 utilruntime.HandleError(fmt.Errorf("Error updating APIService %q with err: %v", apiService.Name, err))
147 }
148 key := apiService.Name
149 if c.queue.NumRequeues(key) > 0 {
150
151
152 c.queue.Forget(key)
153 c.queue.AddAfter(key, time.Second)
154 }
155
156
157 }
158
159
160 func (c *AggregationController) RemoveAPIService(apiServiceName string) {
161 c.openAPIAggregationManager.RemoveAPIService(apiServiceName)
162
163
164 c.queue.Forget(apiServiceName)
165 }
166
View as plain text